Skip to content
This repository was archived by the owner on May 10, 2024. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/parquet/arrow/arrow-reader-writer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,27 @@ TEST_F(TestStringParquetIO, EmptyStringColumnRequiredWrite) {
ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
}

using TestNullParquetIO = TestParquetIO<::arrow::NullType>;

TEST_F(TestNullParquetIO, NullColumn) {
std::shared_ptr<Array> values = std::make_shared<::arrow::NullArray>(SMALL_SIZE);
std::shared_ptr<Table> table = MakeSimpleTable(values, true);
this->sink_ = std::make_shared<InMemoryOutputStream>();
ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_,
values->length(), default_writer_properties()));

std::shared_ptr<Table> out;
std::unique_ptr<FileReader> reader;
this->ReaderFromSink(&reader);
this->ReadTableFromFile(std::move(reader), &out);
ASSERT_EQ(1, out->num_columns());
ASSERT_EQ(100, out->num_rows());

std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
ASSERT_EQ(1, chunked_array->num_chunks());
ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
}

template <typename T>
using ParquetCDataType = typename ParquetDataType<T>::c_type;

Expand Down
11 changes: 7 additions & 4 deletions src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -495,10 +495,6 @@ Status FileReader::Impl::ReadTable(
std::shared_ptr<::arrow::Schema> schema;
RETURN_NOT_OK(GetSchema(indices, &schema));

int num_fields = static_cast<int>(schema->num_fields());
int nthreads = std::min<int>(num_threads_, num_fields);
std::vector<std::shared_ptr<Column>> columns(num_fields);

// We only need to read schema fields which have columns indicated
// in the indices vector
std::vector<int> field_indices;
Expand All @@ -507,13 +503,16 @@ Status FileReader::Impl::ReadTable(
return Status::Invalid("Invalid column index");
}

std::vector<std::shared_ptr<Column>> columns(field_indices.size());
auto ReadColumnFunc = [&indices, &field_indices, &schema, &columns, this](int i) {
std::shared_ptr<Array> array;
RETURN_NOT_OK(ReadSchemaField(field_indices[i], indices, &array));
columns[i] = std::make_shared<Column>(schema->field(i), array);
return Status::OK();
};

int num_fields = static_cast<int>(field_indices.size());
int nthreads = std::min<int>(num_threads_, num_fields);
if (nthreads == 1) {
for (int i = 0; i < num_fields; i++) {
RETURN_NOT_OK(ReadColumnFunc(i));
Expand Down Expand Up @@ -1262,6 +1261,10 @@ Status PrimitiveImpl::NextBatch(
}

switch (field_->type()->id()) {
case ::arrow::Type::NA:
*out = std::make_shared<::arrow::NullArray>(batch_size);
return Status::OK();
break;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This break is unreachable, any reason to keep it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, getting rid of compiler warnings.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotcha, I hadn't seen that before

TYPED_BATCH_CASE(BOOL, ::arrow::BooleanType, BooleanType)
TYPED_BATCH_CASE(UINT8, ::arrow::UInt8Type, Int32Type)
TYPED_BATCH_CASE(INT8, ::arrow::Int8Type, Int32Type)
Expand Down
12 changes: 9 additions & 3 deletions src/parquet/arrow/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@ static Status FromInt64(const PrimitiveNode* node, TypePtr* out) {
}

Status FromPrimitive(const PrimitiveNode* primitive, TypePtr* out) {
if (primitive->logical_type() == LogicalType::NA) {
*out = ::arrow::null();
return Status::OK();
}

switch (primitive->physical_type()) {
case ParquetType::BOOLEAN:
*out = ::arrow::boolean();
Expand Down Expand Up @@ -410,9 +415,10 @@ Status FieldToNode(const std::shared_ptr<Field>& field,
int length = -1;

switch (field->type()->id()) {
// TODO:
// case ArrowType::NA:
// break;
case ArrowType::NA:
type = ParquetType::INT32;
logical_type = LogicalType::NA;
break;
case ArrowType::BOOL:
type = ParquetType::BOOLEAN;
break;
Expand Down
25 changes: 24 additions & 1 deletion src/parquet/arrow/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ class LevelBuilder {

Status VisitInline(const Array& array);

Status Visit(const ::arrow::NullArray& array) {
array_offsets_.push_back(static_cast<int32_t>(array.offset()));
valid_bitmaps_.push_back(array.null_bitmap_data());
null_counts_.push_back(array.length());
values_type_ = array.type_id();
values_array_ = &array;
return Status::OK();
}

Status Visit(const ::arrow::PrimitiveArray& array) {
array_offsets_.push_back(static_cast<int32_t>(array.offset()));
valid_bitmaps_.push_back(array.null_bitmap_data());
Expand Down Expand Up @@ -98,7 +107,6 @@ class LevelBuilder {
"Level generation for ArrowTypePrefix not supported yet"); \
}

NOT_IMPLEMENTED_VISIT(Null)
NOT_IMPLEMENTED_VISIT(Struct)
NOT_IMPLEMENTED_VISIT(Union)
NOT_IMPLEMENTED_VISIT(Decimal)
Expand Down Expand Up @@ -141,6 +149,8 @@ class LevelBuilder {
reinterpret_cast<int16_t*>(def_levels_buffer_->mutable_data());
if (array.null_count() == 0) {
std::fill(def_levels_ptr, def_levels_ptr + array.length(), 1);
} else if (array.null_count() == array.length()) {
std::fill(def_levels_ptr, def_levels_ptr + array.length(), 0);
} else {
const uint8_t* valid_bits = array.null_bitmap_data();
INIT_BITSET(valid_bits, static_cast<int>(array.offset()));
Expand Down Expand Up @@ -509,6 +519,18 @@ Status FileWriter::Impl::TypedWriteBatch<BooleanType, ::arrow::BooleanType>(
return Status::OK();
}

template <>
Status FileWriter::Impl::TypedWriteBatch<Int32Type, ::arrow::NullType>(
ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t num_levels,
const int16_t* def_levels, const int16_t* rep_levels) {
auto writer = reinterpret_cast<TypedColumnWriter<Int32Type>*>(column_writer);

PARQUET_CATCH_NOT_OK(
writer->WriteBatch(num_levels, def_levels, rep_levels, nullptr));
PARQUET_CATCH_NOT_OK(writer->Close());
return Status::OK();
}

template <>
Status FileWriter::Impl::TypedWriteBatch<ByteArrayType, ::arrow::BinaryType>(
ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t num_levels,
Expand Down Expand Up @@ -639,6 +661,7 @@ Status FileWriter::Impl::WriteColumnChunk(const Array& data) {
column_writer, values_array, num_levels, def_levels, rep_levels);
}
}
WRITE_BATCH_CASE(NA, NullType, Int32Type)
WRITE_BATCH_CASE(BOOL, BooleanType, BooleanType)
WRITE_BATCH_CASE(INT8, Int8Type, Int32Type)
WRITE_BATCH_CASE(UINT8, UInt8Type, Int32Type)
Expand Down
1 change: 1 addition & 0 deletions src/parquet/file/metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ SortOrder get_sort_order(LogicalType::type converted, Type::type primitive) {
case LogicalType::BSON:
case LogicalType::JSON:
return SortOrder::UNSIGNED;
case LogicalType::NA:
case LogicalType::DECIMAL:
case LogicalType::LIST:
case LogicalType::MAP:
Expand Down
3 changes: 3 additions & 0 deletions src/parquet/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ PrimitiveNode::PrimitiveNode(const std::string& name, Repetition::type repetitio
throw ParquetException(ss.str());
}
break;
case LogicalType::NA:
// NA can annotate any type
break;
default:
ss << LogicalTypeToString(logical_type);
ss << " can not be applied to a primitive type";
Expand Down
3 changes: 2 additions & 1 deletion src/parquet/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ struct LogicalType {
INT_64,
JSON,
BSON,
INTERVAL
INTERVAL,
NA = 25
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this from parquet.thrift?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

};
};

Expand Down