From 4f3e4386f21851dbb297a4a02cf6478d2d8d6e28 Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Fri, 23 Jun 2017 14:44:58 +0200 Subject: [PATCH 1/2] PARQUET-1041: Support Arrow's NullArray --- src/parquet/arrow/arrow-reader-writer-test.cc | 21 ++++++++++++++++ src/parquet/arrow/reader.cc | 14 ++++++----- src/parquet/arrow/schema.cc | 12 ++++++--- src/parquet/arrow/writer.cc | 25 ++++++++++++++++++- src/parquet/file/metadata.cc | 1 + src/parquet/schema.cc | 3 +++ src/parquet/types.h | 3 ++- 7 files changed, 68 insertions(+), 11 deletions(-) diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc index 97bb19b3..3beca354 100644 --- a/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -802,6 +802,27 @@ TEST_F(TestStringParquetIO, EmptyStringColumnRequiredWrite) { ASSERT_TRUE(values->Equals(chunked_array->chunk(0))); } +using TestNullParquetIO = TestParquetIO<::arrow::NullType>; + +TEST_F(TestNullParquetIO, NullColumn) { + std::shared_ptr values = std::make_shared<::arrow::NullArray>(SMALL_SIZE); + std::shared_ptr table = MakeSimpleTable(values, true); + this->sink_ = std::make_shared(); + ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, + values->length(), default_writer_properties())); + + std::shared_ptr
out; + std::unique_ptr reader; + this->ReaderFromSink(&reader); + this->ReadTableFromFile(std::move(reader), &out); + ASSERT_EQ(1, out->num_columns()); + ASSERT_EQ(100, out->num_rows()); + + std::shared_ptr chunked_array = out->column(0)->data(); + ASSERT_EQ(1, chunked_array->num_chunks()); + ASSERT_TRUE(values->Equals(chunked_array->chunk(0))); +} + template using ParquetCDataType = typename ParquetDataType::c_type; diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc index 7c1b3810..c8bf18a1 100644 --- a/src/parquet/arrow/reader.cc +++ b/src/parquet/arrow/reader.cc @@ -495,10 +495,6 @@ Status FileReader::Impl::ReadTable( std::shared_ptr<::arrow::Schema> schema; RETURN_NOT_OK(GetSchema(indices, &schema)); - int num_fields = static_cast(schema->num_fields()); - int nthreads = std::min(num_threads_, num_fields); - std::vector> columns(num_fields); - // We only need to read schema fields which have columns indicated // in the indices vector std::vector field_indices; @@ -507,6 +503,7 @@ Status FileReader::Impl::ReadTable( return Status::Invalid("Invalid column index"); } + std::vector> columns(field_indices.size()); auto ReadColumnFunc = [&indices, &field_indices, &schema, &columns, this](int i) { std::shared_ptr array; RETURN_NOT_OK(ReadSchemaField(field_indices[i], indices, &array)); @@ -514,12 +511,13 @@ Status FileReader::Impl::ReadTable( return Status::OK(); }; + int nthreads = std::min(num_threads_, field_indices.size()); if (nthreads == 1) { - for (int i = 0; i < num_fields; i++) { + for (int i = 0; i < static_cast(field_indices.size()); i++) { RETURN_NOT_OK(ReadColumnFunc(i)); } } else { - RETURN_NOT_OK(ParallelFor(nthreads, num_fields, ReadColumnFunc)); + RETURN_NOT_OK(ParallelFor(nthreads, field_indices.size(), ReadColumnFunc)); } *table = std::make_shared
(schema, columns); @@ -1262,6 +1260,10 @@ Status PrimitiveImpl::NextBatch( } switch (field_->type()->id()) { + case ::arrow::Type::NA: + *out = std::make_shared<::arrow::NullArray>(batch_size); + return Status::OK(); + break; TYPED_BATCH_CASE(BOOL, ::arrow::BooleanType, BooleanType) TYPED_BATCH_CASE(UINT8, ::arrow::UInt8Type, Int32Type) TYPED_BATCH_CASE(INT8, ::arrow::Int8Type, Int32Type) diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc index a78a23bf..2a4ddcdc 100644 --- a/src/parquet/arrow/schema.cc +++ b/src/parquet/arrow/schema.cc @@ -166,6 +166,11 @@ static Status FromInt64(const PrimitiveNode* node, TypePtr* out) { } Status FromPrimitive(const PrimitiveNode* primitive, TypePtr* out) { + if (primitive->logical_type() == LogicalType::NA) { + *out = ::arrow::null(); + return Status::OK(); + } + switch (primitive->physical_type()) { case ParquetType::BOOLEAN: *out = ::arrow::boolean(); @@ -410,9 +415,10 @@ Status FieldToNode(const std::shared_ptr& field, int length = -1; switch (field->type()->id()) { - // TODO: - // case ArrowType::NA: - // break; + case ArrowType::NA: + type = ParquetType::INT32; + logical_type = LogicalType::NA; + break; case ArrowType::BOOL: type = ParquetType::BOOLEAN; break; diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc index 3344d1b6..af4f7544 100644 --- a/src/parquet/arrow/writer.cc +++ b/src/parquet/arrow/writer.cc @@ -62,6 +62,15 @@ class LevelBuilder { Status VisitInline(const Array& array); + Status Visit(const ::arrow::NullArray& array) { + array_offsets_.push_back(static_cast(array.offset())); + valid_bitmaps_.push_back(array.null_bitmap_data()); + null_counts_.push_back(array.length()); + values_type_ = array.type_id(); + values_array_ = &array; + return Status::OK(); + } + Status Visit(const ::arrow::PrimitiveArray& array) { array_offsets_.push_back(static_cast(array.offset())); valid_bitmaps_.push_back(array.null_bitmap_data()); @@ -98,7 +107,6 @@ class LevelBuilder { "Level generation for ArrowTypePrefix not supported yet"); \ } - NOT_IMPLEMENTED_VISIT(Null) NOT_IMPLEMENTED_VISIT(Struct) NOT_IMPLEMENTED_VISIT(Union) NOT_IMPLEMENTED_VISIT(Decimal) @@ -141,6 +149,8 @@ class LevelBuilder { reinterpret_cast(def_levels_buffer_->mutable_data()); if (array.null_count() == 0) { std::fill(def_levels_ptr, def_levels_ptr + array.length(), 1); + } else if (array.null_count() == array.length()) { + std::fill(def_levels_ptr, def_levels_ptr + array.length(), 0); } else { const uint8_t* valid_bits = array.null_bitmap_data(); INIT_BITSET(valid_bits, static_cast(array.offset())); @@ -509,6 +519,18 @@ Status FileWriter::Impl::TypedWriteBatch( return Status::OK(); } +template <> +Status FileWriter::Impl::TypedWriteBatch( + ColumnWriter* column_writer, const std::shared_ptr& array, int64_t num_levels, + const int16_t* def_levels, const int16_t* rep_levels) { + auto writer = reinterpret_cast*>(column_writer); + + PARQUET_CATCH_NOT_OK( + writer->WriteBatch(num_levels, def_levels, rep_levels, nullptr)); + PARQUET_CATCH_NOT_OK(writer->Close()); + return Status::OK(); +} + template <> Status FileWriter::Impl::TypedWriteBatch( ColumnWriter* column_writer, const std::shared_ptr& array, int64_t num_levels, @@ -639,6 +661,7 @@ Status FileWriter::Impl::WriteColumnChunk(const Array& data) { column_writer, values_array, num_levels, def_levels, rep_levels); } } + WRITE_BATCH_CASE(NA, NullType, Int32Type) WRITE_BATCH_CASE(BOOL, BooleanType, BooleanType) WRITE_BATCH_CASE(INT8, Int8Type, Int32Type) WRITE_BATCH_CASE(UINT8, UInt8Type, Int32Type) diff --git a/src/parquet/file/metadata.cc b/src/parquet/file/metadata.cc index aea7a749..b37ef4f4 100644 --- a/src/parquet/file/metadata.cc +++ b/src/parquet/file/metadata.cc @@ -76,6 +76,7 @@ SortOrder get_sort_order(LogicalType::type converted, Type::type primitive) { case LogicalType::BSON: case LogicalType::JSON: return SortOrder::UNSIGNED; + case LogicalType::NA: case LogicalType::DECIMAL: case LogicalType::LIST: case LogicalType::MAP: diff --git a/src/parquet/schema.cc b/src/parquet/schema.cc index 1209ad19..4efa0b2d 100644 --- a/src/parquet/schema.cc +++ b/src/parquet/schema.cc @@ -190,6 +190,9 @@ PrimitiveNode::PrimitiveNode(const std::string& name, Repetition::type repetitio throw ParquetException(ss.str()); } break; + case LogicalType::NA: + // NA can annotate any type + break; default: ss << LogicalTypeToString(logical_type); ss << " can not be applied to a primitive type"; diff --git a/src/parquet/types.h b/src/parquet/types.h index 2b9b11f8..8504f5d3 100644 --- a/src/parquet/types.h +++ b/src/parquet/types.h @@ -81,7 +81,8 @@ struct LogicalType { INT_64, JSON, BSON, - INTERVAL + INTERVAL, + NA = 25 }; }; From cdc68351760b66c6242d10a5d0c749a51f8a419e Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Fri, 23 Jun 2017 15:23:45 +0200 Subject: [PATCH 2/2] Fix int conversion --- src/parquet/arrow/reader.cc | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc index c8bf18a1..ef9ac343 100644 --- a/src/parquet/arrow/reader.cc +++ b/src/parquet/arrow/reader.cc @@ -511,13 +511,14 @@ Status FileReader::Impl::ReadTable( return Status::OK(); }; - int nthreads = std::min(num_threads_, field_indices.size()); + int num_fields = static_cast(field_indices.size()); + int nthreads = std::min(num_threads_, num_fields); if (nthreads == 1) { - for (int i = 0; i < static_cast(field_indices.size()); i++) { + for (int i = 0; i < num_fields; i++) { RETURN_NOT_OK(ReadColumnFunc(i)); } } else { - RETURN_NOT_OK(ParallelFor(nthreads, field_indices.size(), ReadColumnFunc)); + RETURN_NOT_OK(ParallelFor(nthreads, num_fields, ReadColumnFunc)); } *table = std::make_shared
(schema, columns);