Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions cpp/src/parquet/arrow/arrow_statistics_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

#include "gtest/gtest.h"

#include "arrow/array.h"
#include "arrow/table.h"
#include "arrow/testing/gtest_util.h"

#include "parquet/api/reader.h"
#include "parquet/api/writer.h"

#include "parquet/arrow/reader.h"
#include "parquet/arrow/schema.h"
#include "parquet/arrow/writer.h"
#include "parquet/file_writer.h"
Expand Down Expand Up @@ -179,4 +181,107 @@ TEST(StatisticsTest, TruncateOnlyHalfMinMax) {
ASSERT_FALSE(stats->HasMinMax());
}

namespace {
::arrow::Result<std::shared_ptr<::arrow::Array>> StatisticsReadArray(
std::shared_ptr<::arrow::DataType> data_type, const std::string& json) {
auto schema = ::arrow::schema({::arrow::field("column", data_type)});
auto array = ::arrow::ArrayFromJSON(data_type, json);
auto record_batch = ::arrow::RecordBatch::Make(schema, array->length(), {array});
ARROW_ASSIGN_OR_RAISE(auto sink, ::arrow::io::BufferOutputStream::Create());
const auto arrow_writer_properties =
parquet::ArrowWriterProperties::Builder().store_schema()->build();
ARROW_ASSIGN_OR_RAISE(
auto writer,
FileWriter::Open(*schema, ::arrow::default_memory_pool(), sink,
default_writer_properties(), arrow_writer_properties));
ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*record_batch));
ARROW_RETURN_NOT_OK(writer->Close());
ARROW_ASSIGN_OR_RAISE(auto buffer, sink->Finish());

auto reader =
ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer));
std::unique_ptr<FileReader> file_reader;
ARROW_RETURN_NOT_OK(
FileReader::Make(::arrow::default_memory_pool(), std::move(reader), &file_reader));
std::shared_ptr<::arrow::ChunkedArray> chunked_array;
ARROW_RETURN_NOT_OK(file_reader->ReadColumn(0, &chunked_array));
return chunked_array->chunk(0);
}

template <typename ArrowType, typename MinMaxType>
void TestStatisticsReadArray(std::shared_ptr<::arrow::DataType> arrow_type) {
using ArrowArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
using ArrowCType = typename ArrowType::c_type;
constexpr auto min = std::numeric_limits<ArrowCType>::min();
constexpr auto max = std::numeric_limits<ArrowCType>::max();

std::string json;
json += "[";
json += std::to_string(max);
json += ", null, ";
json += std::to_string(min);
json += ", ";
json += std::to_string(max);
json += "]";
ASSERT_OK_AND_ASSIGN(auto array, StatisticsReadArray(arrow_type, json));
auto typed_array = std::static_pointer_cast<ArrowArrayType>(array);
auto statistics = typed_array->statistics();
ASSERT_NE(nullptr, statistics);
ASSERT_EQ(true, statistics->null_count.has_value());
ASSERT_EQ(1, statistics->null_count.value());
ASSERT_EQ(false, statistics->distinct_count.has_value());
ASSERT_EQ(true, statistics->min.has_value());
ASSERT_EQ(true, std::holds_alternative<MinMaxType>(*statistics->min));
ASSERT_EQ(min, std::get<MinMaxType>(*statistics->min));
ASSERT_EQ(true, statistics->is_min_exact);
ASSERT_EQ(true, statistics->max.has_value());
ASSERT_EQ(true, std::holds_alternative<MinMaxType>(*statistics->max));
ASSERT_EQ(max, std::get<MinMaxType>(*statistics->max));
ASSERT_EQ(true, statistics->is_min_exact);
}
} // namespace

TEST(TestStatisticsRead, Int8) {
TestStatisticsReadArray<::arrow::Int8Type, int64_t>(::arrow::int8());
}

TEST(TestStatisticsRead, UInt8) {
TestStatisticsReadArray<::arrow::UInt8Type, uint64_t>(::arrow::uint8());
}

TEST(TestStatisticsRead, Int16) {
TestStatisticsReadArray<::arrow::Int16Type, int64_t>(::arrow::int16());
}

TEST(TestStatisticsRead, UInt16) {
TestStatisticsReadArray<::arrow::UInt16Type, uint64_t>(::arrow::uint16());
}

TEST(TestStatisticsRead, UInt32) {
TestStatisticsReadArray<::arrow::UInt32Type, uint64_t>(::arrow::uint32());
}

TEST(TestStatisticsRead, UInt64) {
TestStatisticsReadArray<::arrow::UInt64Type, uint64_t>(::arrow::uint64());
}

TEST(TestStatisticsRead, Date32) {
TestStatisticsReadArray<::arrow::Date32Type, int64_t>(::arrow::date32());
}

TEST(TestStatisticsRead, Time32) {
TestStatisticsReadArray<::arrow::Time32Type, int64_t>(
::arrow::time32(::arrow::TimeUnit::MILLI));
}

TEST(TestStatisticsRead, Time64) {
TestStatisticsReadArray<::arrow::Time64Type, int64_t>(
::arrow::time64(::arrow::TimeUnit::MICRO));
}

TEST(TestStatisticsRead, Duration) {
TestStatisticsReadArray<::arrow::DurationType, int64_t>(
::arrow::duration(::arrow::TimeUnit::NANO));
}

} // namespace parquet::arrow
5 changes: 3 additions & 2 deletions cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -485,8 +485,9 @@ class LeafReader : public ColumnReaderImpl {
NextRowGroup();
}
}
RETURN_NOT_OK(
TransferColumnData(record_reader_.get(), field_, descr_, ctx_->pool, &out_));
RETURN_NOT_OK(TransferColumnData(record_reader_.get(),
input_->column_chunk_metadata(), field_, descr_,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The call to input_->column_chunk_metadata() fails here if the list of row_groups in input_ is empty, because input_ is not yet initialized properly at this point in that case

Via row_group_metadata() -> RowGroup(-1)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!
Could you open a new issue for it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ctx_.get(), &out_));
return Status::OK();
END_PARQUET_CATCH_EXCEPTIONS
}
Expand Down
78 changes: 58 additions & 20 deletions cpp/src/parquet/arrow/reader_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -319,26 +319,59 @@ void ReconstructChunksWithoutNulls(::arrow::ArrayVector* chunks) {
}

template <typename ArrowType, typename ParquetType>
Status TransferInt(RecordReader* reader, MemoryPool* pool,
const std::shared_ptr<Field>& field, Datum* out) {
Status TransferInt(RecordReader* reader,
std::unique_ptr<::parquet::ColumnChunkMetaData> metadata,
const ReaderContext* ctx, const std::shared_ptr<Field>& field,
Datum* out) {
using ArrowCType = typename ArrowType::c_type;
using ParquetCType = typename ParquetType::c_type;
int64_t length = reader->values_written();
ARROW_ASSIGN_OR_RAISE(auto data,
::arrow::AllocateBuffer(length * sizeof(ArrowCType), pool));
::arrow::AllocateBuffer(length * sizeof(ArrowCType), ctx->pool));

auto values = reinterpret_cast<const ParquetCType*>(reader->values());
auto out_ptr = reinterpret_cast<ArrowCType*>(data->mutable_data());
std::copy(values, values + length, out_ptr);
int64_t null_count = 0;
std::vector<std::shared_ptr<Buffer>> buffers = {nullptr, std::move(data)};
if (field->nullable()) {
*out = std::make_shared<ArrayType<ArrowType>>(field->type(), length, std::move(data),
reader->ReleaseIsValid(),
reader->null_count());
} else {
*out =
std::make_shared<ArrayType<ArrowType>>(field->type(), length, std::move(data),
/*null_bitmap=*/nullptr, /*null_count=*/0);
null_count = reader->null_count();
buffers[0] = reader->ReleaseIsValid();
}
auto array_data =
::arrow::ArrayData::Make(field->type(), length, std::move(buffers), null_count);
auto array_statistics = std::make_shared<::arrow::ArrayStatistics>();
array_statistics->null_count = null_count;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The null_count for some type ( nested ) would be a bit weird, FYI

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the information.
Let's revisit it when we add support for arrow::ArrayStatistics of nested types.

auto statistics = metadata->statistics().get();
if (statistics) {
if (statistics->HasDistinctCount()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a separate function for the stats conversion?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll do it when I add more target types as the next pull request.
I'll know what is common pattern when I add more target types.

array_statistics->distinct_count = statistics->distinct_count();
}
if (statistics->HasMinMax()) {
auto typed_statistics =
static_cast<::parquet::TypedStatistics<ParquetType>*>(statistics);
const ArrowCType min = typed_statistics->min();
const ArrowCType max = typed_statistics->max();
if (std::is_signed<ArrowCType>::value) {
array_statistics->min = static_cast<int64_t>(min);
array_statistics->max = static_cast<int64_t>(max);
} else {
array_statistics->min = static_cast<uint64_t>(min);
array_statistics->max = static_cast<uint64_t>(max);
}
// We can assume that integer based min/max are always exact if
// they exist. Apache Parquet's "Statistics" has
// "is_min_value_exact" and "is_max_value_exact" but we can
// ignore them for integer based min/max.
//
// See also the discussion at dev@parquet.apache.org:
// https://lists.apache.org/thread/zfnmg5p51b7oylft5w5k4670wgkd4zv4
array_statistics->is_min_exact = true;
array_statistics->is_max_exact = true;
Comment on lines +369 to +370
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add correspond comment here? This might be a bit tricky

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. We should document about the discussion at #43595 (comment) , right?

BTW, could you share the e-mail URL for #43595 (comment) ?

I guess no, I'll send a mail to maillist to make it sure

I couldn't find it at https://lists.apache.org/list.html?dev@parquet.apache.org .

Ah, I forgot to add a writer check here. I should have set true only when a writer is Apache Parquet C++. I'll fix it.

Copy link
Member

@mapleFU mapleFU Sep 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, let me setup a discussion, generally if it's from Parquet C++, it will works. I'm a bit busy this morning preparing for my tour, I'll try to work it out this noon

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem. Thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a comment that we can always use true for integer based min/max.

I didn't need if (::arrow::internal::StartsWith(ctx->reader->metadata()->created_by(), "parquet-cpp-arrow")) for this case based on your e-mail.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, I found the string and FLBA might being truncated, other types in public impl will not being truncated if exists

}
}
array_data->statistics = std::move(array_statistics);
*out = std::make_shared<ArrayType<ArrowType>>(std::move(array_data));
return Status::OK();
}

Expand Down Expand Up @@ -728,21 +761,26 @@ Status TransferHalfFloat(RecordReader* reader, MemoryPool* pool,

} // namespace

#define TRANSFER_INT32(ENUM, ArrowType) \
case ::arrow::Type::ENUM: { \
Status s = TransferInt<ArrowType, Int32Type>(reader, pool, value_field, &result); \
RETURN_NOT_OK(s); \
#define TRANSFER_INT32(ENUM, ArrowType) \
case ::arrow::Type::ENUM: { \
Status s = TransferInt<ArrowType, Int32Type>(reader, std::move(metadata), ctx, \
value_field, &result); \
RETURN_NOT_OK(s); \
} break;

#define TRANSFER_INT64(ENUM, ArrowType) \
case ::arrow::Type::ENUM: { \
Status s = TransferInt<ArrowType, Int64Type>(reader, pool, value_field, &result); \
RETURN_NOT_OK(s); \
#define TRANSFER_INT64(ENUM, ArrowType) \
case ::arrow::Type::ENUM: { \
Status s = TransferInt<ArrowType, Int64Type>(reader, std::move(metadata), ctx, \
value_field, &result); \
RETURN_NOT_OK(s); \
} break;

Status TransferColumnData(RecordReader* reader, const std::shared_ptr<Field>& value_field,
const ColumnDescriptor* descr, MemoryPool* pool,
Status TransferColumnData(RecordReader* reader,
std::unique_ptr<::parquet::ColumnChunkMetaData> metadata,
const std::shared_ptr<Field>& value_field,
const ColumnDescriptor* descr, const ReaderContext* ctx,
std::shared_ptr<ChunkedArray>* out) {
auto pool = ctx->pool;
Datum result;
std::shared_ptr<ChunkedArray> chunked_result;
switch (value_field->type()->id()) {
Expand Down
28 changes: 21 additions & 7 deletions cpp/src/parquet/arrow/reader_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ class FileColumnIterator {
: column_index_(column_index),
reader_(reader),
schema_(reader->metadata()->schema()),
row_groups_(row_groups.begin(), row_groups.end()) {}
row_groups_(row_groups.begin(), row_groups.end()),
row_group_index_(-1) {}

virtual ~FileColumnIterator() {}

Expand All @@ -75,7 +76,8 @@ class FileColumnIterator {
return nullptr;
}

auto row_group_reader = reader_->RowGroup(row_groups_.front());
row_group_index_ = row_groups_.front();
auto row_group_reader = reader_->RowGroup(row_group_index_);
row_groups_.pop_front();
return row_group_reader->GetColumnPageReader(column_index_);
}
Expand All @@ -86,23 +88,29 @@ class FileColumnIterator {

std::shared_ptr<FileMetaData> metadata() const { return reader_->metadata(); }

std::unique_ptr<RowGroupMetaData> row_group_metadata() const {
return metadata()->RowGroup(row_group_index_);
}

std::unique_ptr<ColumnChunkMetaData> column_chunk_metadata() const {
return row_group_metadata()->ColumnChunk(column_index_);
}

int column_index() const { return column_index_; }

int row_group_index() const { return row_group_index_; }

protected:
int column_index_;
ParquetFileReader* reader_;
const SchemaDescriptor* schema_;
std::deque<int> row_groups_;
int row_group_index_;
};

using FileColumnIteratorFactory =
std::function<FileColumnIterator*(int, ParquetFileReader*)>;

Status TransferColumnData(::parquet::internal::RecordReader* reader,
const std::shared_ptr<::arrow::Field>& value_field,
const ColumnDescriptor* descr, ::arrow::MemoryPool* pool,
std::shared_ptr<::arrow::ChunkedArray>* out);

struct ReaderContext {
ParquetFileReader* reader;
::arrow::MemoryPool* pool;
Expand All @@ -118,5 +126,11 @@ struct ReaderContext {
}
};

Status TransferColumnData(::parquet::internal::RecordReader* reader,
std::unique_ptr<::parquet::ColumnChunkMetaData> metadata,
const std::shared_ptr<::arrow::Field>& value_field,
const ColumnDescriptor* descr, const ReaderContext* ctx,
std::shared_ptr<::arrow::ChunkedArray>* out);

} // namespace arrow
} // namespace parquet
Loading