diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc index 0bdc14d3..7b63514d 100644 --- a/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -43,7 +43,9 @@ using arrow::PoolBuffer; using arrow::PrimitiveArray; using arrow::Status; using arrow::Table; +using arrow::TimeUnit; +using ArrowId = ::arrow::Type; using ParquetType = parquet::Type; using parquet::schema::GroupNode; using parquet::schema::NodePtr; @@ -58,13 +60,98 @@ const int LARGE_SIZE = 10000; constexpr uint32_t kDefaultSeed = 0; +LogicalType::type get_logical_type(const ::arrow::DataType& type) { + switch (type.id()) { + case ArrowId::UINT8: + return LogicalType::UINT_8; + case ArrowId::INT8: + return LogicalType::INT_8; + case ArrowId::UINT16: + return LogicalType::UINT_16; + case ArrowId::INT16: + return LogicalType::INT_16; + case ArrowId::UINT32: + return LogicalType::UINT_32; + case ArrowId::INT32: + return LogicalType::INT_32; + case ArrowId::UINT64: + return LogicalType::UINT_64; + case ArrowId::INT64: + return LogicalType::INT_64; + case ArrowId::STRING: + return LogicalType::UTF8; + case ArrowId::DATE32: + return LogicalType::DATE; + case ArrowId::DATE64: + return LogicalType::DATE; + case ArrowId::TIMESTAMP: { + const auto& ts_type = static_cast(type); + switch (ts_type.unit()) { + case TimeUnit::MILLI: + return LogicalType::TIMESTAMP_MILLIS; + case TimeUnit::MICRO: + return LogicalType::TIMESTAMP_MICROS; + default: + DCHECK(false) << "Only MILLI and MICRO units supported for Arrow timestamps " + "with Parquet."; + } + } + case ArrowId::TIME32: + return LogicalType::TIME_MILLIS; + case ArrowId::TIME64: + return LogicalType::TIME_MICROS; + default: + break; + } + return LogicalType::NONE; +} + +ParquetType::type get_physical_type(const ::arrow::DataType& type) { + switch (type.id()) { + case ArrowId::BOOL: + return ParquetType::BOOLEAN; + case ArrowId::UINT8: + case ArrowId::INT8: + case ArrowId::UINT16: + case ArrowId::INT16: + case ArrowId::UINT32: + case ArrowId::INT32: + return ParquetType::INT32; + case ArrowId::UINT64: + case ArrowId::INT64: + return ParquetType::INT64; + case ArrowId::FLOAT: + return ParquetType::FLOAT; + case ArrowId::DOUBLE: + return ParquetType::DOUBLE; + case ArrowId::BINARY: + return ParquetType::BYTE_ARRAY; + case ArrowId::STRING: + return ParquetType::BYTE_ARRAY; + case ArrowId::DATE32: + return ParquetType::INT32; + case ArrowId::DATE64: + // Convert to date32 internally + return ParquetType::INT32; + case ArrowId::TIME32: + return ParquetType::INT32; + case ArrowId::TIME64: + return ParquetType::INT64; + case ArrowId::TIMESTAMP: + return ParquetType::INT64; + default: + break; + } + DCHECK(false) << "cannot reach this code"; + return ParquetType::INT32; +} + template struct test_traits {}; template <> struct test_traits<::arrow::BooleanType> { static constexpr ParquetType::type parquet_enum = ParquetType::BOOLEAN; - static constexpr LogicalType::type logical_enum = LogicalType::NONE; static uint8_t const value; }; @@ -73,7 +160,6 @@ const uint8_t test_traits<::arrow::BooleanType>::value(1); template <> struct test_traits<::arrow::UInt8Type> { static constexpr ParquetType::type parquet_enum = ParquetType::INT32; - static constexpr LogicalType::type logical_enum = LogicalType::UINT_8; static uint8_t const value; }; @@ -82,7 +168,6 @@ const uint8_t test_traits<::arrow::UInt8Type>::value(64); template <> struct test_traits<::arrow::Int8Type> { static constexpr ParquetType::type parquet_enum = ParquetType::INT32; - static constexpr LogicalType::type logical_enum = LogicalType::INT_8; static int8_t const value; }; @@ -91,7 +176,6 @@ const int8_t test_traits<::arrow::Int8Type>::value(-64); template <> struct test_traits<::arrow::UInt16Type> { static constexpr ParquetType::type parquet_enum = ParquetType::INT32; - static constexpr LogicalType::type logical_enum = LogicalType::UINT_16; static uint16_t const value; }; @@ -100,7 +184,6 @@ const uint16_t test_traits<::arrow::UInt16Type>::value(1024); template <> struct test_traits<::arrow::Int16Type> { static constexpr ParquetType::type parquet_enum = ParquetType::INT32; - static constexpr LogicalType::type logical_enum = LogicalType::INT_16; static int16_t const value; }; @@ -109,7 +192,6 @@ const int16_t test_traits<::arrow::Int16Type>::value(-1024); template <> struct test_traits<::arrow::UInt32Type> { static constexpr ParquetType::type parquet_enum = ParquetType::INT32; - static constexpr LogicalType::type logical_enum = LogicalType::UINT_32; static uint32_t const value; }; @@ -118,7 +200,6 @@ const uint32_t test_traits<::arrow::UInt32Type>::value(1024); template <> struct test_traits<::arrow::Int32Type> { static constexpr ParquetType::type parquet_enum = ParquetType::INT32; - static constexpr LogicalType::type logical_enum = LogicalType::NONE; static int32_t const value; }; @@ -127,7 +208,6 @@ const int32_t test_traits<::arrow::Int32Type>::value(-1024); template <> struct test_traits<::arrow::UInt64Type> { static constexpr ParquetType::type parquet_enum = ParquetType::INT64; - static constexpr LogicalType::type logical_enum = LogicalType::UINT_64; static uint64_t const value; }; @@ -136,7 +216,6 @@ const uint64_t test_traits<::arrow::UInt64Type>::value(1024); template <> struct test_traits<::arrow::Int64Type> { static constexpr ParquetType::type parquet_enum = ParquetType::INT64; - static constexpr LogicalType::type logical_enum = LogicalType::NONE; static int64_t const value; }; @@ -145,25 +224,22 @@ const int64_t test_traits<::arrow::Int64Type>::value(-1024); template <> struct test_traits<::arrow::TimestampType> { static constexpr ParquetType::type parquet_enum = ParquetType::INT64; - static constexpr LogicalType::type logical_enum = LogicalType::TIMESTAMP_MILLIS; static int64_t const value; }; const int64_t test_traits<::arrow::TimestampType>::value(14695634030000); template <> -struct test_traits<::arrow::Date64Type> { +struct test_traits<::arrow::Date32Type> { static constexpr ParquetType::type parquet_enum = ParquetType::INT32; - static constexpr LogicalType::type logical_enum = LogicalType::DATE; - static int64_t const value; + static int32_t const value; }; -const int64_t test_traits<::arrow::Date64Type>::value(14688000000000); +const int32_t test_traits<::arrow::Date32Type>::value(170000); template <> struct test_traits<::arrow::FloatType> { static constexpr ParquetType::type parquet_enum = ParquetType::FLOAT; - static constexpr LogicalType::type logical_enum = LogicalType::NONE; static float const value; }; @@ -172,7 +248,6 @@ const float test_traits<::arrow::FloatType>::value(2.1f); template <> struct test_traits<::arrow::DoubleType> { static constexpr ParquetType::type parquet_enum = ParquetType::DOUBLE; - static constexpr LogicalType::type logical_enum = LogicalType::NONE; static double const value; }; @@ -181,14 +256,12 @@ const double test_traits<::arrow::DoubleType>::value(4.2); template <> struct test_traits<::arrow::StringType> { static constexpr ParquetType::type parquet_enum = ParquetType::BYTE_ARRAY; - static constexpr LogicalType::type logical_enum = LogicalType::UTF8; static std::string const value; }; template <> struct test_traits<::arrow::BinaryType> { static constexpr ParquetType::type parquet_enum = ParquetType::BYTE_ARRAY; - static constexpr LogicalType::type logical_enum = LogicalType::NONE; static std::string const value; }; @@ -231,19 +304,20 @@ void DoSimpleRoundtrip(const std::shared_ptr& table, int num_threads, } } +static std::shared_ptr MakeSimpleSchema( + const ::arrow::DataType& type, Repetition::type repetition) { + auto pnode = PrimitiveNode::Make( + "column1", repetition, get_physical_type(type), get_logical_type(type)); + NodePtr node_ = + GroupNode::Make("schema", Repetition::REQUIRED, std::vector({pnode})); + return std::static_pointer_cast(node_); +} + template class TestParquetIO : public ::testing::Test { public: virtual void SetUp() {} - std::shared_ptr MakeSchema(Repetition::type repetition) { - auto pnode = PrimitiveNode::Make("column1", repetition, - test_traits::parquet_enum, test_traits::logical_enum); - NodePtr node_ = - GroupNode::Make("schema", Repetition::REQUIRED, std::vector({pnode})); - return std::static_pointer_cast(node_); - } - std::unique_ptr MakeWriter( const std::shared_ptr& schema) { sink_ = std::make_shared(); @@ -348,8 +422,8 @@ class TestParquetIO : public ::testing::Test { typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type, ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::Int32Type, ::arrow::UInt64Type, - ::arrow::Int64Type, ::arrow::TimestampType, ::arrow::Date64Type, ::arrow::FloatType, - ::arrow::DoubleType, ::arrow::StringType, ::arrow::BinaryType> + ::arrow::Int64Type, ::arrow::Date32Type, ::arrow::FloatType, ::arrow::DoubleType, + ::arrow::StringType, ::arrow::BinaryType> TestTypes; TYPED_TEST_CASE(TestParquetIO, TestTypes); @@ -358,7 +432,8 @@ TYPED_TEST(TestParquetIO, SingleColumnRequiredWrite) { std::shared_ptr values; ASSERT_OK(NonNullArray(SMALL_SIZE, &values)); - std::shared_ptr schema = this->MakeSchema(Repetition::REQUIRED); + std::shared_ptr schema = + MakeSimpleSchema(*values->type(), Repetition::REQUIRED); this->WriteColumn(schema, values); this->ReadAndCheckSingleColumnFile(values.get()); @@ -390,7 +465,8 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) { ASSERT_OK(NullableArray(SMALL_SIZE, 10, kDefaultSeed, &values)); - std::shared_ptr schema = this->MakeSchema(Repetition::OPTIONAL); + std::shared_ptr schema = + MakeSimpleSchema(*values->type(), Repetition::OPTIONAL); this->WriteColumn(schema, values); this->ReadAndCheckSingleColumnFile(values.get()); @@ -399,7 +475,8 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) { TYPED_TEST(TestParquetIO, SingleColumnRequiredSliceWrite) { std::shared_ptr values; ASSERT_OK(NonNullArray(2 * SMALL_SIZE, &values)); - std::shared_ptr schema = this->MakeSchema(Repetition::REQUIRED); + std::shared_ptr schema = + MakeSimpleSchema(*values->type(), Repetition::REQUIRED); std::shared_ptr sliced_values = values->Slice(SMALL_SIZE / 2, SMALL_SIZE); this->WriteColumn(schema, sliced_values); @@ -414,7 +491,8 @@ TYPED_TEST(TestParquetIO, SingleColumnRequiredSliceWrite) { TYPED_TEST(TestParquetIO, SingleColumnOptionalSliceWrite) { std::shared_ptr values; ASSERT_OK(NullableArray(2 * SMALL_SIZE, SMALL_SIZE, kDefaultSeed, &values)); - std::shared_ptr schema = this->MakeSchema(Repetition::OPTIONAL); + std::shared_ptr schema = + MakeSimpleSchema(*values->type(), Repetition::OPTIONAL); std::shared_ptr sliced_values = values->Slice(SMALL_SIZE / 2, SMALL_SIZE); this->WriteColumn(schema, sliced_values); @@ -470,7 +548,8 @@ TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedWrite) { ASSERT_OK(NonNullArray(SMALL_SIZE, &values)); int64_t chunk_size = values->length() / 4; - std::shared_ptr schema = this->MakeSchema(Repetition::REQUIRED); + std::shared_ptr schema = + MakeSimpleSchema(*values->type(), Repetition::REQUIRED); FileWriter writer(default_memory_pool(), this->MakeWriter(schema)); for (int i = 0; i < 4; i++) { ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size)); @@ -531,7 +610,8 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) { ASSERT_OK(NullableArray(SMALL_SIZE, 10, kDefaultSeed, &values)); - std::shared_ptr schema = this->MakeSchema(Repetition::OPTIONAL); + std::shared_ptr schema = + MakeSimpleSchema(*values->type(), Repetition::OPTIONAL); FileWriter writer(::arrow::default_memory_pool(), this->MakeWriter(schema)); for (int i = 0; i < 4; i++) { ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size)); @@ -601,7 +681,7 @@ TEST_F(TestInt96ParquetIO, ReadIntoTimestamp) { writer->Close(); ::arrow::TimestampBuilder builder( - default_memory_pool(), ::arrow::timestamp(::arrow::TimeUnit::NANO)); + default_memory_pool(), ::arrow::timestamp(TimeUnit::NANO)); builder.Append(val); std::shared_ptr values; ASSERT_OK(builder.Finish(&values)); @@ -715,7 +795,9 @@ class TestPrimitiveParquetIO : public TestParquetIO { void MakeTestFile( std::vector& values, int num_chunks, std::unique_ptr* reader) { - std::shared_ptr schema = this->MakeSchema(Repetition::REQUIRED); + TestType dummy; + + std::shared_ptr schema = MakeSimpleSchema(dummy, Repetition::REQUIRED); std::unique_ptr file_writer = this->MakeWriter(schema); size_t chunk_size = values.size() / num_chunks; // Convert to Parquet's expected physical type @@ -787,6 +869,89 @@ TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedTableRead) { this->CheckSingleColumnRequiredTableRead(4); } +void MakeDateTimeTypesTable(std::shared_ptr
* out) { + using ::arrow::ArrayFromVector; + + std::vector is_valid = {true, true, true, false, true, true}; + + // These are only types that roundtrip without modification + auto f0 = field("f0", ::arrow::date32()); + auto f1 = field("f1", ::arrow::timestamp(TimeUnit::MILLI)); + auto f2 = field("f2", ::arrow::timestamp(TimeUnit::MICRO)); + auto f3 = field("f3", ::arrow::time32(TimeUnit::MILLI)); + auto f4 = field("f4", ::arrow::time64(TimeUnit::MICRO)); + std::shared_ptr<::arrow::Schema> schema(new ::arrow::Schema({f0, f1, f2, f3, f4})); + + std::vector t32_values = { + 1489269000, 1489270000, 1489271000, 1489272000, 1489272000, 1489273000}; + std::vector t64_values = {1489269000000, 1489270000000, 1489271000000, + 1489272000000, 1489272000000, 1489273000000}; + + std::shared_ptr a0, a1, a2, a3, a4; + ArrayFromVector<::arrow::Date32Type, int32_t>(f0->type(), is_valid, t32_values, &a0); + ArrayFromVector<::arrow::TimestampType, int64_t>(f1->type(), is_valid, t64_values, &a1); + ArrayFromVector<::arrow::TimestampType, int64_t>(f2->type(), is_valid, t64_values, &a2); + ArrayFromVector<::arrow::Time32Type, int32_t>(f3->type(), is_valid, t32_values, &a3); + ArrayFromVector<::arrow::Time64Type, int64_t>(f4->type(), is_valid, t64_values, &a4); + + std::vector> columns = { + std::make_shared("f0", a0), std::make_shared("f1", a1), + std::make_shared("f2", a2), std::make_shared("f3", a3), + std::make_shared("f4", a4)}; + *out = std::make_shared<::arrow::Table>(schema, columns); +} + +TEST(TestArrowReadWrite, DateTimeTypes) { + std::shared_ptr
table; + MakeDateTimeTypesTable(&table); + + std::shared_ptr
result; + DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result); + + ASSERT_TRUE(table->Equals(*result)); +} + +TEST(TestArrowReadWrite, ConvertedDateTimeTypes) { + using ::arrow::ArrayFromVector; + + std::vector is_valid = {true, true, true, false, true, true}; + + auto f0 = field("f0", ::arrow::date64()); + auto f1 = field("f1", ::arrow::time32(TimeUnit::SECOND)); + std::shared_ptr<::arrow::Schema> schema(new ::arrow::Schema({f0, f1})); + + std::vector a0_values = {1489190400000, 1489276800000, 1489363200000, + 1489449600000, 1489536000000, 1489622400000}; + std::vector a1_values = {0, 1, 2, 3, 4, 5}; + + std::shared_ptr a0, a1, x0, x1; + ArrayFromVector<::arrow::Date64Type, int64_t>(f0->type(), is_valid, a0_values, &a0); + ArrayFromVector<::arrow::Time32Type, int32_t>(f1->type(), is_valid, a1_values, &a1); + + std::vector> columns = { + std::make_shared("f0", a0), std::make_shared("f1", a1)}; + auto table = std::make_shared<::arrow::Table>(schema, columns); + + // Expected schema and values + auto e0 = field("f0", ::arrow::date32()); + auto e1 = field("f1", ::arrow::time32(TimeUnit::MILLI)); + std::shared_ptr<::arrow::Schema> ex_schema(new ::arrow::Schema({e0, e1})); + + std::vector x0_values = {17236, 17237, 17238, 17239, 17240, 17241}; + std::vector x1_values = {0, 1000, 2000, 3000, 4000, 5000}; + ArrayFromVector<::arrow::Date32Type, int32_t>(e0->type(), is_valid, x0_values, &x0); + ArrayFromVector<::arrow::Time32Type, int32_t>(e1->type(), is_valid, x1_values, &x1); + + std::vector> ex_columns = { + std::make_shared("f0", x0), std::make_shared("f1", x1)}; + auto ex_table = std::make_shared<::arrow::Table>(ex_schema, ex_columns); + + std::shared_ptr
result; + DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result); + + ASSERT_TRUE(result->Equals(*ex_table)); +} + void MakeDoubleTable( int num_columns, int num_rows, int nchunks, std::shared_ptr
* out) { std::shared_ptr<::arrow::Column> column; diff --git a/src/parquet/arrow/arrow-schema-test.cc b/src/parquet/arrow/arrow-schema-test.cc index 0f6b4556..20425665 100644 --- a/src/parquet/arrow/arrow-schema-test.cc +++ b/src/parquet/arrow/arrow-schema-test.cc @@ -26,6 +26,7 @@ #include "arrow/test-util.h" using arrow::Field; +using arrow::TimeUnit; using ParquetType = parquet::Type; using parquet::LogicalType; @@ -45,11 +46,9 @@ const auto INT64 = ::arrow::int64(); const auto FLOAT = ::arrow::float32(); const auto DOUBLE = ::arrow::float64(); const auto UTF8 = ::arrow::utf8(); -const auto TIMESTAMP_MS = ::arrow::timestamp(::arrow::TimeUnit::MILLI); -const auto TIMESTAMP_NS = ::arrow::timestamp(::arrow::TimeUnit::NANO); - -// TODO: This requires parquet-cpp implementing the MICROS enum value -// const auto TIMESTAMP_US = std::make_shared(TimestampType::Unit::MICRO); +const auto TIMESTAMP_MS = ::arrow::timestamp(TimeUnit::MILLI); +const auto TIMESTAMP_US = ::arrow::timestamp(TimeUnit::MICRO); +const auto TIMESTAMP_NS = ::arrow::timestamp(TimeUnit::NANO); const auto BINARY = ::arrow::binary(); const auto DECIMAL_8_4 = std::make_shared<::arrow::DecimalType>(8, 4); @@ -62,8 +61,8 @@ class TestConvertParquetSchema : public ::testing::Test { for (int i = 0; i < expected_schema->num_fields(); ++i) { auto lhs = result_schema_->field(i); auto rhs = expected_schema->field(i); - EXPECT_TRUE(lhs->Equals(rhs)) - << i << " " << lhs->ToString() << " != " << rhs->ToString(); + EXPECT_TRUE(lhs->Equals(rhs)) << i << " " << lhs->ToString() + << " != " << rhs->ToString(); } } @@ -105,9 +104,23 @@ TEST_F(TestConvertParquetSchema, ParquetFlatPrimitives) { ParquetType::INT64, LogicalType::TIMESTAMP_MILLIS)); arrow_fields.push_back(std::make_shared("timestamp", TIMESTAMP_MS, false)); + parquet_fields.push_back(PrimitiveNode::Make("timestamp[us]", Repetition::REQUIRED, + ParquetType::INT64, LogicalType::TIMESTAMP_MICROS)); + arrow_fields.push_back(std::make_shared("timestamp[us]", TIMESTAMP_US, false)); + parquet_fields.push_back(PrimitiveNode::Make( "date", Repetition::REQUIRED, ParquetType::INT32, LogicalType::DATE)); - arrow_fields.push_back(std::make_shared("date", ::arrow::date64(), false)); + arrow_fields.push_back(std::make_shared("date", ::arrow::date32(), false)); + + parquet_fields.push_back(PrimitiveNode::Make( + "time32", Repetition::REQUIRED, ParquetType::INT32, LogicalType::TIME_MILLIS)); + arrow_fields.push_back(std::make_shared( + "time32", ::arrow::time32(TimeUnit::MILLI), false)); + + parquet_fields.push_back(PrimitiveNode::Make( + "time64", Repetition::REQUIRED, ParquetType::INT64, LogicalType::TIME_MICROS)); + arrow_fields.push_back(std::make_shared( + "time64", ::arrow::time64(TimeUnit::MICRO), false)); parquet_fields.push_back( PrimitiveNode::Make("timestamp96", Repetition::REQUIRED, ParquetType::INT96)); @@ -568,7 +581,11 @@ TEST_F(TestConvertArrowSchema, ParquetFlatPrimitives) { parquet_fields.push_back(PrimitiveNode::Make( "date", Repetition::REQUIRED, ParquetType::INT32, LogicalType::DATE)); - arrow_fields.push_back(std::make_shared("date", ::arrow::date64(), false)); + arrow_fields.push_back(std::make_shared("date", ::arrow::date32(), false)); + + parquet_fields.push_back(PrimitiveNode::Make( + "date64", Repetition::REQUIRED, ParquetType::INT32, LogicalType::DATE)); + arrow_fields.push_back(std::make_shared("date64", ::arrow::date64(), false)); parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED, ParquetType::INT64, LogicalType::TIMESTAMP_MILLIS)); @@ -644,6 +661,16 @@ TEST_F(TestConvertArrowSchema, ParquetLists) { CheckFlatSchema(parquet_fields); } +TEST_F(TestConvertArrowSchema, UnsupportedTypes) { + std::vector> unsupported_fields = { + ::arrow::field("f0", ::arrow::time64(TimeUnit::NANO)) + }; + + for (const auto& field : unsupported_fields) { + ASSERT_RAISES(NotImplemented, ConvertSchema({field})); + } +} + TEST_F(TestConvertArrowSchema, ParquetFlatDecimals) { std::vector parquet_fields; std::vector> arrow_fields; @@ -655,8 +682,5 @@ TEST_F(TestConvertArrowSchema, ParquetFlatDecimals) { CheckFlatSchema(parquet_fields); } -TEST(TestNodeConversion, DateAndTime) {} - } // namespace arrow - } // namespace parquet diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc index 38d5583a..852649ae 100644 --- a/src/parquet/arrow/reader.cc +++ b/src/parquet/arrow/reader.cc @@ -502,6 +502,10 @@ NONNULLABLE_BATCH_FAST_PATH(::arrow::Int32Type, Int32Type, int32_t) NONNULLABLE_BATCH_FAST_PATH(::arrow::Int64Type, Int64Type, int64_t) NONNULLABLE_BATCH_FAST_PATH(::arrow::FloatType, FloatType, float) NONNULLABLE_BATCH_FAST_PATH(::arrow::DoubleType, DoubleType, double) +NONNULLABLE_BATCH_FAST_PATH(::arrow::Date32Type, Int32Type, int32_t) +NONNULLABLE_BATCH_FAST_PATH(::arrow::TimestampType, Int64Type, int64_t) +NONNULLABLE_BATCH_FAST_PATH(::arrow::Time32Type, Int32Type, int32_t) +NONNULLABLE_BATCH_FAST_PATH(::arrow::Time64Type, Int64Type, int64_t) template <> Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::TimestampType, Int96Type>( @@ -607,6 +611,10 @@ NULLABLE_BATCH_FAST_PATH(::arrow::Int32Type, Int32Type, int32_t) NULLABLE_BATCH_FAST_PATH(::arrow::Int64Type, Int64Type, int64_t) NULLABLE_BATCH_FAST_PATH(::arrow::FloatType, FloatType, float) NULLABLE_BATCH_FAST_PATH(::arrow::DoubleType, DoubleType, double) +NULLABLE_BATCH_FAST_PATH(::arrow::Date32Type, Int32Type, int32_t) +NULLABLE_BATCH_FAST_PATH(::arrow::TimestampType, Int64Type, int64_t) +NULLABLE_BATCH_FAST_PATH(::arrow::Time32Type, Int32Type, int32_t) +NULLABLE_BATCH_FAST_PATH(::arrow::Time64Type, Int64Type, int64_t) template <> Status ColumnReader::Impl::ReadNullableBatch<::arrow::TimestampType, Int96Type>( @@ -1036,13 +1044,14 @@ Status ColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr* out TYPED_BATCH_CASE(INT16, ::arrow::Int16Type, Int32Type) TYPED_BATCH_CASE(UINT32, ::arrow::UInt32Type, Int32Type) TYPED_BATCH_CASE(INT32, ::arrow::Int32Type, Int32Type) - TYPED_BATCH_CASE(DATE64, ::arrow::Date64Type, Int32Type) TYPED_BATCH_CASE(UINT64, ::arrow::UInt64Type, Int64Type) TYPED_BATCH_CASE(INT64, ::arrow::Int64Type, Int64Type) TYPED_BATCH_CASE(FLOAT, ::arrow::FloatType, FloatType) TYPED_BATCH_CASE(DOUBLE, ::arrow::DoubleType, DoubleType) TYPED_BATCH_CASE(STRING, ::arrow::StringType, ByteArrayType) TYPED_BATCH_CASE(BINARY, ::arrow::BinaryType, ByteArrayType) + TYPED_BATCH_CASE(DATE32, ::arrow::Date32Type, Int32Type) + TYPED_BATCH_CASE(DATE64, ::arrow::Date64Type, Int32Type) case ::arrow::Type::TIMESTAMP: { ::arrow::TimestampType* timestamp_type = static_cast<::arrow::TimestampType*>(field_->type().get()); @@ -1050,6 +1059,9 @@ Status ColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr* out case ::arrow::TimeUnit::MILLI: return TypedReadBatch<::arrow::TimestampType, Int64Type>(batch_size, out); break; + case ::arrow::TimeUnit::MICRO: + return TypedReadBatch<::arrow::TimestampType, Int64Type>(batch_size, out); + break; case ::arrow::TimeUnit::NANO: return TypedReadBatch<::arrow::TimestampType, Int96Type>(batch_size, out); break; @@ -1058,6 +1070,8 @@ Status ColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr* out } break; } + TYPED_BATCH_CASE(TIME32, ::arrow::Time32Type, Int32Type) + TYPED_BATCH_CASE(TIME64, ::arrow::Time64Type, Int64Type) default: std::stringstream ss; ss << "No support for reading columns of type " << field_->type()->ToString(); diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc index 25713a70..31895cea 100644 --- a/src/parquet/arrow/schema.cc +++ b/src/parquet/arrow/schema.cc @@ -45,6 +45,7 @@ namespace parquet { namespace arrow { const auto TIMESTAMP_MS = ::arrow::timestamp(::arrow::TimeUnit::MILLI); +const auto TIMESTAMP_US = ::arrow::timestamp(::arrow::TimeUnit::MICRO); const auto TIMESTAMP_NS = ::arrow::timestamp(::arrow::TimeUnit::NANO); TypePtr MakeDecimalType(const PrimitiveNode* node) { @@ -105,18 +106,18 @@ static Status FromInt32(const PrimitiveNode* node, TypePtr* out) { case LogicalType::INT_16: *out = ::arrow::int16(); break; + case LogicalType::INT_32: + *out = ::arrow::int32(); + break; case LogicalType::UINT_32: *out = ::arrow::uint32(); break; case LogicalType::DATE: - *out = ::arrow::date64(); + *out = ::arrow::date32(); break; case LogicalType::TIME_MILLIS: *out = ::arrow::time32(::arrow::TimeUnit::MILLI); break; - case LogicalType::TIME_MICROS: - *out = ::arrow::time64(::arrow::TimeUnit::MICRO); - break; case LogicalType::DECIMAL: *out = MakeDecimalType(node); break; @@ -135,6 +136,9 @@ static Status FromInt64(const PrimitiveNode* node, TypePtr* out) { case LogicalType::NONE: *out = ::arrow::int64(); break; + case LogicalType::INT_64: + *out = ::arrow::int64(); + break; case LogicalType::UINT_64: *out = ::arrow::uint64(); break; @@ -144,6 +148,12 @@ static Status FromInt64(const PrimitiveNode* node, TypePtr* out) { case LogicalType::TIMESTAMP_MILLIS: *out = TIMESTAMP_MS; break; + case LogicalType::TIMESTAMP_MICROS: + *out = TIMESTAMP_US; + break; + case LogicalType::TIME_MICROS: + *out = ::arrow::time64(::arrow::TimeUnit::MICRO); + break; default: std::stringstream ss; ss << "Unhandled logical type " << LogicalTypeToString(node->logical_type()) @@ -455,21 +465,30 @@ Status FieldToNode(const std::shared_ptr& field, break; case ArrowType::TIMESTAMP: { auto timestamp_type = static_cast<::arrow::TimestampType*>(field->type().get()); - if (timestamp_type->unit() != ::arrow::TimestampType::Unit::MILLI) { + auto unit = timestamp_type->unit(); + type = ParquetType::INT64; + if (unit == ::arrow::TimeUnit::MILLI) { + logical_type = LogicalType::TIMESTAMP_MILLIS; + } else if (unit == ::arrow::TimeUnit::MICRO) { + logical_type = LogicalType::TIMESTAMP_MICROS; + } else { return Status::NotImplemented( - "Other timestamp units than millisecond are not yet support with parquet."); + "Only MILLI and MICRO units supported for Arrow timestamps with Parquet."); } - type = ParquetType::INT64; - logical_type = LogicalType::TIMESTAMP_MILLIS; } break; case ArrowType::TIME32: - type = ParquetType::INT64; + type = ParquetType::INT32; logical_type = LogicalType::TIME_MILLIS; break; - case ArrowType::TIME64: + case ArrowType::TIME64: { + auto time_type = static_cast<::arrow::Time64Type*>(field->type().get()); + if (time_type->unit() == ::arrow::TimeUnit::NANO) { + return Status::NotImplemented( + "Nanosecond time not supported in Parquet."); + } type = ParquetType::INT64; logical_type = LogicalType::TIME_MICROS; - break; + } break; case ArrowType::STRUCT: { auto struct_type = std::static_pointer_cast<::arrow::StructType>(field->type()); return StructToNode(struct_type, field->name(), field->nullable(), properties, out); diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h index bff952bb..8bcd3149 100644 --- a/src/parquet/arrow/test-util.h +++ b/src/parquet/arrow/test-util.h @@ -20,6 +20,7 @@ #include "arrow/api.h" #include "arrow/test-util.h" +#include "arrow/type_traits.h" namespace parquet { namespace arrow { diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc index 90cd1350..6ac33b11 100644 --- a/src/parquet/arrow/writer.cc +++ b/src/parquet/arrow/writer.cc @@ -26,6 +26,7 @@ #include "parquet/arrow/schema.h" #include "arrow/api.h" +#include "arrow/visitor_inline.h" using arrow::Array; using arrow::BinaryArray; @@ -39,6 +40,7 @@ using arrow::PrimitiveArray; using arrow::ListArray; using arrow::Status; using arrow::Table; +using arrow::TimeUnit; using parquet::ParquetFileWriter; using parquet::ParquetVersion; @@ -49,44 +51,34 @@ namespace arrow { namespace BitUtil = ::arrow::BitUtil; -class LevelBuilder : public ::arrow::ArrayVisitor { +class LevelBuilder { public: explicit LevelBuilder(MemoryPool* pool) : def_levels_(pool, ::arrow::int16()), rep_levels_(pool, ::arrow::int16()) { def_levels_buffer_ = std::make_shared(pool); } -#define PRIMITIVE_VISIT(ArrowTypePrefix) \ - Status Visit(const ::arrow::ArrowTypePrefix##Array& array) override { \ - array_offsets_.push_back(array.offset()); \ - valid_bitmaps_.push_back(array.null_bitmap_data()); \ - null_counts_.push_back(array.null_count()); \ - values_type_ = array.type_id(); \ - values_array_ = &array; \ - return Status::OK(); \ + Status VisitInline(const Array& array); + + Status Visit(const ::arrow::PrimitiveArray& array) { + array_offsets_.push_back(array.offset()); + valid_bitmaps_.push_back(array.null_bitmap_data()); + null_counts_.push_back(array.null_count()); + values_type_ = array.type_id(); + values_array_ = &array; + return Status::OK(); + } + + Status Visit(const ::arrow::BinaryArray& array) { + array_offsets_.push_back(array.offset()); + valid_bitmaps_.push_back(array.null_bitmap_data()); + null_counts_.push_back(array.null_count()); + values_type_ = array.type_id(); + values_array_ = &array; + return Status::OK(); } - PRIMITIVE_VISIT(Boolean) - PRIMITIVE_VISIT(Int8) - PRIMITIVE_VISIT(Int16) - PRIMITIVE_VISIT(Int32) - PRIMITIVE_VISIT(Int64) - PRIMITIVE_VISIT(UInt8) - PRIMITIVE_VISIT(UInt16) - PRIMITIVE_VISIT(UInt32) - PRIMITIVE_VISIT(UInt64) - PRIMITIVE_VISIT(HalfFloat) - PRIMITIVE_VISIT(Float) - PRIMITIVE_VISIT(Double) - PRIMITIVE_VISIT(String) - PRIMITIVE_VISIT(Binary) - PRIMITIVE_VISIT(Date64) - PRIMITIVE_VISIT(Time32) - PRIMITIVE_VISIT(Time64) - PRIMITIVE_VISIT(Timestamp) - PRIMITIVE_VISIT(Interval) - - Status Visit(const ListArray& array) override { + Status Visit(const ListArray& array) { array_offsets_.push_back(array.offset()); valid_bitmaps_.push_back(array.null_bitmap_data()); null_counts_.push_back(array.null_count()); @@ -95,20 +87,21 @@ class LevelBuilder : public ::arrow::ArrayVisitor { min_offset_idx_ = array.value_offset(min_offset_idx_); max_offset_idx_ = array.value_offset(max_offset_idx_); - return array.values()->Accept(this); + return VisitInline(*array.values()); } -#define NOT_IMPLEMENTED_VIST(ArrowTypePrefix) \ - Status Visit(const ::arrow::ArrowTypePrefix##Array& array) override { \ - return Status::NotImplemented( \ - "Level generation for ArrowTypePrefix not supported yet"); \ - }; +#define NOT_IMPLEMENTED_VISIT(ArrowTypePrefix) \ + Status Visit(const ::arrow::ArrowTypePrefix##Array& array) { \ + return Status::NotImplemented( \ + "Level generation for ArrowTypePrefix not supported yet"); \ + } - NOT_IMPLEMENTED_VIST(Null) - NOT_IMPLEMENTED_VIST(Struct) - NOT_IMPLEMENTED_VIST(Union) - NOT_IMPLEMENTED_VIST(Decimal) - NOT_IMPLEMENTED_VIST(Dictionary) + NOT_IMPLEMENTED_VISIT(Null) + NOT_IMPLEMENTED_VISIT(Struct) + NOT_IMPLEMENTED_VISIT(Union) + NOT_IMPLEMENTED_VISIT(Decimal) + NOT_IMPLEMENTED_VISIT(Dictionary) + NOT_IMPLEMENTED_VISIT(Interval) Status GenerateLevels(const Array& array, const std::shared_ptr& field, int64_t* values_offset, ::arrow::Type::type* values_type, int64_t* num_values, @@ -117,7 +110,7 @@ class LevelBuilder : public ::arrow::ArrayVisitor { // Work downwards to extract bitmaps and offsets min_offset_idx_ = 0; max_offset_idx_ = array.length(); - RETURN_NOT_OK(array.Accept(this)); + RETURN_NOT_OK(VisitInline(array)); *num_values = max_offset_idx_ - min_offset_idx_; *values_offset = min_offset_idx_; *values_type = values_type_; @@ -247,6 +240,10 @@ class LevelBuilder : public ::arrow::ArrayVisitor { const Array* values_array_; }; +Status LevelBuilder::VisitInline(const Array& array) { + return VisitArrayInline(array, this); +} + class FileWriter::Impl { public: Impl(MemoryPool* pool, std::unique_ptr writer); @@ -257,14 +254,15 @@ class FileWriter::Impl { int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels); template - Status WriteNonNullableBatch(TypedColumnWriter* writer, int64_t num_values, - int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels, + Status WriteNonNullableBatch(TypedColumnWriter* writer, + const ArrowType& type, int64_t num_values, int64_t num_levels, + const int16_t* def_levels, const int16_t* rep_levels, const typename ArrowType::c_type* data_ptr); template - Status WriteNullableBatch(TypedColumnWriter* writer, int64_t num_values, - int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels, - const uint8_t* valid_bits, int64_t valid_bits_offset, + Status WriteNullableBatch(TypedColumnWriter* writer, const ArrowType& type, + int64_t num_values, int64_t num_levels, const int16_t* def_levels, + const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset, const typename ArrowType::c_type* data_ptr); Status WriteColumnChunk(const Array& data); @@ -307,13 +305,14 @@ Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer, if (writer->descr()->schema_node()->is_required() || (data->null_count() == 0)) { // no nulls, just dump the data - RETURN_NOT_OK((WriteNonNullableBatch(writer, array->length(), - num_levels, def_levels, rep_levels, data_ptr + data->offset()))); + RETURN_NOT_OK((WriteNonNullableBatch(writer, + static_cast(*array->type()), array->length(), num_levels, + def_levels, rep_levels, data_ptr + data->offset()))); } else { const uint8_t* valid_bits = data->null_bitmap_data(); - RETURN_NOT_OK((WriteNullableBatch(writer, data->length(), - num_levels, def_levels, rep_levels, valid_bits, data->offset(), - data_ptr + data->offset()))); + RETURN_NOT_OK((WriteNullableBatch(writer, + static_cast(*array->type()), data->length(), num_levels, + def_levels, rep_levels, valid_bits, data->offset(), data_ptr + data->offset()))); } PARQUET_CATCH_NOT_OK(writer->Close()); return Status::OK(); @@ -321,8 +320,9 @@ Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer, template Status FileWriter::Impl::WriteNonNullableBatch(TypedColumnWriter* writer, - int64_t num_values, int64_t num_levels, const int16_t* def_levels, - const int16_t* rep_levels, const typename ArrowType::c_type* data_ptr) { + const ArrowType& type, int64_t num_values, int64_t num_levels, + const int16_t* def_levels, const int16_t* rep_levels, + const typename ArrowType::c_type* data_ptr) { using ParquetCType = typename ParquetType::c_type; RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(ParquetCType))); auto buffer_ptr = reinterpret_cast(data_buffer_.mutable_data()); @@ -334,8 +334,9 @@ Status FileWriter::Impl::WriteNonNullableBatch(TypedColumnWriter* w template <> Status FileWriter::Impl::WriteNonNullableBatch( - TypedColumnWriter* writer, int64_t num_values, int64_t num_levels, - const int16_t* def_levels, const int16_t* rep_levels, const int64_t* data_ptr) { + TypedColumnWriter* writer, const ::arrow::Date64Type& type, + int64_t num_values, int64_t num_levels, const int16_t* def_levels, + const int16_t* rep_levels, const int64_t* data_ptr) { RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(int32_t))); auto buffer_ptr = reinterpret_cast(data_buffer_.mutable_data()); for (int i = 0; i < num_values; i++) { @@ -346,27 +347,49 @@ Status FileWriter::Impl::WriteNonNullableBatch( return Status::OK(); } -#define NONNULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType) \ - template <> \ - Status FileWriter::Impl::WriteNonNullableBatch( \ - TypedColumnWriter * writer, int64_t num_values, int64_t num_levels, \ - const int16_t* def_levels, const int16_t* rep_levels, const CType* data_ptr) { \ - PARQUET_CATCH_NOT_OK( \ - writer->WriteBatch(num_levels, def_levels, rep_levels, data_ptr)); \ - return Status::OK(); \ +template <> +Status FileWriter::Impl::WriteNonNullableBatch( + TypedColumnWriter* writer, const ::arrow::Time32Type& type, + int64_t num_values, int64_t num_levels, const int16_t* def_levels, + const int16_t* rep_levels, const int32_t* data_ptr) { + RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(int32_t))); + auto buffer_ptr = reinterpret_cast(data_buffer_.mutable_data()); + if (type.unit() == TimeUnit::SECOND) { + for (int i = 0; i < num_values; i++) { + buffer_ptr[i] = data_ptr[i] * 1000; + } + } else { + std::copy(data_ptr, data_ptr + num_values, buffer_ptr); + } + PARQUET_CATCH_NOT_OK( + writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr)); + return Status::OK(); +} + +#define NONNULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType) \ + template <> \ + Status FileWriter::Impl::WriteNonNullableBatch( \ + TypedColumnWriter * writer, const ArrowType& type, \ + int64_t num_values, int64_t num_levels, const int16_t* def_levels, \ + const int16_t* rep_levels, const CType* data_ptr) { \ + PARQUET_CATCH_NOT_OK( \ + writer->WriteBatch(num_levels, def_levels, rep_levels, data_ptr)); \ + return Status::OK(); \ } NONNULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Int32Type, int32_t) +NONNULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Date32Type, int32_t) NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Int64Type, int64_t) NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t) +NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Time64Type, int64_t) NONNULLABLE_BATCH_FAST_PATH(FloatType, ::arrow::FloatType, float) NONNULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double) template Status FileWriter::Impl::WriteNullableBatch(TypedColumnWriter* writer, - int64_t num_values, int64_t num_levels, const int16_t* def_levels, - const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset, - const typename ArrowType::c_type* data_ptr) { + const ArrowType& type, int64_t num_values, int64_t num_levels, + const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits, + int64_t valid_bits_offset, const typename ArrowType::c_type* data_ptr) { using ParquetCType = typename ParquetType::c_type; RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(ParquetCType))); @@ -386,9 +409,10 @@ Status FileWriter::Impl::WriteNullableBatch(TypedColumnWriter* writ template <> Status FileWriter::Impl::WriteNullableBatch( - TypedColumnWriter* writer, int64_t num_values, int64_t num_levels, - const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits, - int64_t valid_bits_offset, const int64_t* data_ptr) { + TypedColumnWriter* writer, const ::arrow::Date64Type& type, + int64_t num_values, int64_t num_levels, const int16_t* def_levels, + const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset, + const int64_t* data_ptr) { RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(int32_t))); auto buffer_ptr = reinterpret_cast(data_buffer_.mutable_data()); INIT_BITSET(valid_bits, valid_bits_offset); @@ -405,20 +429,54 @@ Status FileWriter::Impl::WriteNullableBatch( return Status::OK(); } +template <> +Status FileWriter::Impl::WriteNullableBatch( + TypedColumnWriter* writer, const ::arrow::Time32Type& type, + int64_t num_values, int64_t num_levels, const int16_t* def_levels, + const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset, + const int32_t* data_ptr) { + RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(int32_t))); + auto buffer_ptr = reinterpret_cast(data_buffer_.mutable_data()); + INIT_BITSET(valid_bits, valid_bits_offset); + + if (type.unit() == TimeUnit::SECOND) { + for (int i = 0; i < num_values; i++) { + if (bitset_valid_bits & (1 << bit_offset_valid_bits)) { + buffer_ptr[i] = data_ptr[i] * 1000; + } + READ_NEXT_BITSET(valid_bits); + } + } else { + for (int i = 0; i < num_values; i++) { + if (bitset_valid_bits & (1 << bit_offset_valid_bits)) { + buffer_ptr[i] = data_ptr[i]; + } + READ_NEXT_BITSET(valid_bits); + } + } + PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced( + num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, buffer_ptr)); + + return Status::OK(); +} + #define NULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType) \ template <> \ Status FileWriter::Impl::WriteNullableBatch( \ - TypedColumnWriter * writer, int64_t num_values, int64_t num_levels, \ - const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits, \ - int64_t valid_bits_offset, const CType* data_ptr) { \ + TypedColumnWriter * writer, const ArrowType& type, \ + int64_t num_values, int64_t num_levels, const int16_t* def_levels, \ + const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset, \ + const CType* data_ptr) { \ PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced( \ num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, data_ptr)); \ return Status::OK(); \ } NULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Int32Type, int32_t) +NULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Date32Type, int32_t) NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Int64Type, int64_t) NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t) +NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Time64Type, int64_t) NULLABLE_BATCH_FAST_PATH(FloatType, ::arrow::FloatType, float) NULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double) @@ -553,14 +611,17 @@ Status FileWriter::Impl::WriteColumnChunk(const Array& data) { WRITE_BATCH_CASE(INT16, Int16Type, Int32Type) WRITE_BATCH_CASE(UINT16, UInt16Type, Int32Type) WRITE_BATCH_CASE(INT32, Int32Type, Int32Type) - WRITE_BATCH_CASE(DATE64, Date64Type, Int32Type) WRITE_BATCH_CASE(INT64, Int64Type, Int64Type) - WRITE_BATCH_CASE(TIMESTAMP, TimestampType, Int64Type) WRITE_BATCH_CASE(UINT64, UInt64Type, Int64Type) WRITE_BATCH_CASE(FLOAT, FloatType, FloatType) WRITE_BATCH_CASE(DOUBLE, DoubleType, DoubleType) WRITE_BATCH_CASE(BINARY, BinaryType, ByteArrayType) WRITE_BATCH_CASE(STRING, BinaryType, ByteArrayType) + WRITE_BATCH_CASE(DATE32, Date32Type, Int32Type) + WRITE_BATCH_CASE(DATE64, Date64Type, Int32Type) + WRITE_BATCH_CASE(TIMESTAMP, TimestampType, Int64Type) + WRITE_BATCH_CASE(TIME32, Time32Type, Int32Type) + WRITE_BATCH_CASE(TIME64, Time64Type, Int64Type) default: std::stringstream ss; ss << "Data type not supported as list value: " << values_array->type()->ToString(); diff --git a/src/parquet/file/printer.h b/src/parquet/file/printer.h index 433f9e8a..957c313a 100644 --- a/src/parquet/file/printer.h +++ b/src/parquet/file/printer.h @@ -32,6 +32,7 @@ namespace parquet { class PARQUET_EXPORT ParquetFilePrinter { private: ParquetFileReader* fileReader; + public: explicit ParquetFilePrinter(ParquetFileReader* reader) : fileReader(reader) {} ~ParquetFilePrinter() {}