diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index eee63b11ca1c..8a6a6a07b9ac 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -1016,7 +1016,7 @@ endif() if(ARROW_ORC) arrow_add_object_library(ARROW_ORC adapters/orc/adapter.cc adapters/orc/options.cc - adapters/orc/util.cc) + adapters/orc/statistics.cc adapters/orc/util.cc) foreach(ARROW_ORC_TARGET ${ARROW_ORC_TARGETS}) target_link_libraries(${ARROW_ORC_TARGET} PRIVATE orc::orc) if(ARROW_ORC_VERSION VERSION_LESS "2.0.0") diff --git a/cpp/src/arrow/adapters/orc/CMakeLists.txt b/cpp/src/arrow/adapters/orc/CMakeLists.txt index bae63210b29e..9b778f3e41ef 100644 --- a/cpp/src/arrow/adapters/orc/CMakeLists.txt +++ b/cpp/src/arrow/adapters/orc/CMakeLists.txt @@ -20,7 +20,7 @@ # # Headers: top level -install(FILES adapter.h options.h +install(FILES adapter.h options.h statistics.h DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/arrow/adapters/orc") # pkg-config support diff --git a/cpp/src/arrow/adapters/orc/adapter.cc b/cpp/src/arrow/adapters/orc/adapter.cc index 51cca497485c..2971ed34191a 100644 --- a/cpp/src/arrow/adapters/orc/adapter.cc +++ b/cpp/src/arrow/adapters/orc/adapter.cc @@ -39,10 +39,11 @@ #include "arrow/type.h" #include "arrow/util/bit_util.h" #include "arrow/util/checked_cast.h" -#include "arrow/util/decimal.h" #include "arrow/util/key_value_metadata.h" #include "arrow/util/macros.h" #include "orc/Exceptions.hh" +#include "orc/Statistics.hh" +#include "orc/Type.hh" // alias to not interfere with nested orc namespace namespace liborc = orc; @@ -217,7 +218,7 @@ class ORCFileReader::Impl { std::unique_ptr liborc_reader; ORC_CATCH_NOT_OK(liborc_reader = createReader(std::move(io_wrapper), options)); pool_ = pool; - reader_ = std::move(liborc_reader); + reader_ = std::shared_ptr(liborc_reader.release()); current_row_ = 0; return Init(); @@ -409,6 +410,44 @@ class ORCFileReader::Impl { return ReadBatch(opts, schema, stripes_[static_cast(stripe)].num_rows); } + Result> ReadStripes( + const std::vector& stripe_indices) { + if (stripe_indices.empty()) { + ARROW_ASSIGN_OR_RAISE(auto schema, ReadSchema()); + return Table::MakeEmpty(schema); + } + + std::vector> batches; + batches.reserve(stripe_indices.size()); + for (int64_t stripe_index : stripe_indices) { + ARROW_ASSIGN_OR_RAISE(auto batch, ReadStripe(stripe_index)); + batches.push_back(std::move(batch)); + } + ARROW_ASSIGN_OR_RAISE(auto schema, ReadSchema()); + return Table::FromRecordBatches(schema, std::move(batches)); + } + + Result> ReadStripes(const std::vector& stripe_indices, + const std::vector& include_indices) { + if (stripe_indices.empty()) { + liborc::RowReaderOptions opts = DefaultRowReaderOptions(); + RETURN_NOT_OK(SelectIndices(&opts, include_indices)); + ARROW_ASSIGN_OR_RAISE(auto schema, ReadSchema(opts)); + return Table::MakeEmpty(schema); + } + + std::vector> batches; + batches.reserve(stripe_indices.size()); + for (int64_t stripe_index : stripe_indices) { + ARROW_ASSIGN_OR_RAISE(auto batch, ReadStripe(stripe_index, include_indices)); + batches.push_back(std::move(batch)); + } + liborc::RowReaderOptions opts = DefaultRowReaderOptions(); + RETURN_NOT_OK(SelectIndices(&opts, include_indices)); + ARROW_ASSIGN_OR_RAISE(auto schema, ReadSchema(opts)); + return Table::FromRecordBatches(schema, std::move(batches)); + } + Status SelectStripe(liborc::RowReaderOptions* opts, int64_t stripe) { ARROW_RETURN_IF(stripe < 0 || stripe >= NumberOfStripes(), Status::Invalid("Out of bounds stripe: ", stripe)); @@ -526,7 +565,7 @@ class ORCFileReader::Impl { pool_); } - Result> GetRecordBatchReader( + Result> GetRecordBatchReader( int64_t batch_size, const std::vector& include_names) { liborc::RowReaderOptions opts = DefaultRowReaderOptions(); if (!include_names.empty()) { @@ -539,7 +578,7 @@ class ORCFileReader::Impl { row_reader = reader_->createRowReader(opts); ORC_END_CATCH_NOT_OK - return std::make_shared(std::move(row_reader), schema, batch_size, + return std::make_unique(std::move(row_reader), schema, batch_size, pool_); } @@ -548,9 +587,21 @@ class ORCFileReader::Impl { return NextStripeReader(batch_size, empty_vec); } + std::shared_ptr GetFileMetaData() { + try { + auto file_stats = + std::shared_ptr(reader_->getStatistics().release()); + return std::make_shared(reader_, std::move(file_stats)); + } catch (...) { + return nullptr; + } + } + + const liborc::Type& GetORCType() { return reader_->getType(); } + private: MemoryPool* pool_; - std::unique_ptr reader_; + std::shared_ptr reader_; std::vector stripes_; int64_t current_row_; }; @@ -613,6 +664,17 @@ Result> ORCFileReader::ReadStripe( return impl_->ReadStripe(stripe, include_names); } +Result> ORCFileReader::ReadStripes( + const std::vector& stripe_indices) { + return impl_->ReadStripes(stripe_indices); +} + +Result> ORCFileReader::ReadStripes( + const std::vector& stripe_indices, + const std::vector& include_indices) { + return impl_->ReadStripes(stripe_indices, include_indices); +} + Status ORCFileReader::Seek(int64_t row_number) { return impl_->Seek(row_number); } Result> ORCFileReader::NextStripeReader( @@ -620,7 +682,7 @@ Result> ORCFileReader::NextStripeReader( return impl_->NextStripeReader(batch_size); } -Result> ORCFileReader::GetRecordBatchReader( +Result> ORCFileReader::GetRecordBatchReader( int64_t batch_size, const std::vector& include_names) { return impl_->GetRecordBatchReader(batch_size, include_names); } @@ -678,6 +740,12 @@ std::string ORCFileReader::GetSerializedFileTail() { return impl_->GetSerializedFileTail(); } +std::shared_ptr ORCFileReader::GetFileMetaData() { + return impl_->GetFileMetaData(); +} + +const ::orc::Type& ORCFileReader::GetORCType() { return impl_->GetORCType(); } + namespace { class ArrowOutputStream : public liborc::OutputStream { diff --git a/cpp/src/arrow/adapters/orc/adapter.h b/cpp/src/arrow/adapters/orc/adapter.h index 4ffff81f355f..79ae68924c7d 100644 --- a/cpp/src/arrow/adapters/orc/adapter.h +++ b/cpp/src/arrow/adapters/orc/adapter.h @@ -22,15 +22,19 @@ #include #include "arrow/adapters/orc/options.h" +#include "arrow/adapters/orc/statistics.h" #include "arrow/io/interfaces.h" #include "arrow/memory_pool.h" #include "arrow/record_batch.h" #include "arrow/status.h" -#include "arrow/type.h" #include "arrow/type_fwd.h" #include "arrow/util/macros.h" #include "arrow/util/visibility.h" +namespace orc { +class Type; +} // namespace orc + namespace arrow { namespace adapters { namespace orc { @@ -129,6 +133,29 @@ class ARROW_EXPORT ORCFileReader { Result> ReadStripe( int64_t stripe, const std::vector& include_names); + /// \brief Read multiple selected stripes as a Table + /// + /// Reads only the specified stripes and concatenates them into a single table. + /// This is useful for stripe-selective reading based on predicate pushdown. + /// If stripe_indices is empty, returns an empty table with the file's schema. + /// + /// \param[in] stripe_indices the indices of stripes to read + /// \return the returned Table containing data from the selected stripes + Result> ReadStripes( + const std::vector& stripe_indices); + + /// \brief Read multiple selected stripes with column selection as a Table + /// + /// Reads only the specified stripes and selected columns, concatenating them + /// into a single table. + /// If stripe_indices is empty, returns an empty table with the selected schema. + /// + /// \param[in] stripe_indices the indices of stripes to read + /// \param[in] include_indices the selected field indices to read + /// \return the returned Table containing data from the selected stripes and columns + Result> ReadStripes(const std::vector& stripe_indices, + const std::vector& include_indices); + /// \brief Seek to designated row. Invoke NextStripeReader() after seek /// will return stripe reader starting from designated row. /// @@ -171,7 +198,7 @@ class ARROW_EXPORT ORCFileReader { /// \param[in] include_names the selected field names to read, if not empty /// (otherwise all fields are read) /// \return the record batch iterator - Result> GetRecordBatchReader( + Result> GetRecordBatchReader( int64_t batch_size, const std::vector& include_names); /// \brief The number of stripes in the file @@ -267,6 +294,14 @@ class ARROW_EXPORT ORCFileReader { /// \return A KeyValueMetadata object containing the ORC metadata Result> ReadMetadata(); + /// \brief Get file-level metadata view. + std::shared_ptr GetFileMetaData(); + + /// \brief Get the ORC type tree for column ID mapping. + /// + /// \return reference to the root ORC Type (STRUCT), owned by the reader. + const ::orc::Type& GetORCType(); + private: class Impl; std::unique_ptr impl_; diff --git a/cpp/src/arrow/adapters/orc/adapter_test.cc b/cpp/src/arrow/adapters/orc/adapter_test.cc index 714e61b22b11..e3ca1e35a221 100644 --- a/cpp/src/arrow/adapters/orc/adapter_test.cc +++ b/cpp/src/arrow/adapters/orc/adapter_test.cc @@ -18,6 +18,7 @@ #include #include +#include #include #include @@ -36,6 +37,7 @@ #include "arrow/testing/matchers.h" #include "arrow/testing/random.h" #include "arrow/type.h" +#include "arrow/util/decimal.h" #include "arrow/util/io_util.h" #include "arrow/util/key_value_metadata.h" @@ -336,6 +338,163 @@ std::unique_ptr CreateWriter(uint64_t stripe_size, return liborc::createWriter(type, stream, options); } +Result GetFileColumnStatistics( + adapters::orc::ORCFileReader* reader, int column_index) { + auto file_meta = reader->GetFileMetaData(); + if (file_meta == nullptr) { + return Status::Invalid("Failed to read ORC file metadata"); + } + if (column_index < 0 || column_index >= file_meta->num_columns()) { + return Status::Invalid("Column index ", column_index, " out of range"); + } + auto column_meta = file_meta->Column(column_index); + if (column_meta == nullptr) { + return Status::Invalid("Failed to read ORC file column metadata"); + } + auto column_stats = column_meta->statistics(); + if (column_stats == nullptr) { + return Status::Invalid("Failed to read ORC file column statistics"); + } + return *column_stats; +} + +Result GetStripeColumnStatistics( + adapters::orc::ORCFileReader* reader, int64_t stripe_index, int column_index) { + auto file_meta = reader->GetFileMetaData(); + if (file_meta == nullptr) { + return Status::Invalid("Failed to read ORC file metadata"); + } + if (stripe_index < 0 || stripe_index >= file_meta->num_stripes()) { + return Status::Invalid("Stripe index ", stripe_index, " out of range"); + } + auto stripe_meta = file_meta->Stripe(static_cast(stripe_index)); + if (stripe_meta == nullptr) { + return Status::Invalid("Failed to read ORC stripe metadata"); + } + if (column_index < 0 || column_index >= stripe_meta->num_columns()) { + return Status::Invalid("Column index ", column_index, " out of range"); + } + auto column_meta = stripe_meta->Column(column_index); + if (column_meta == nullptr) { + return Status::Invalid("Failed to read ORC stripe column metadata"); + } + auto column_stats = column_meta->statistics(); + if (column_stats == nullptr) { + return Status::Invalid("Failed to read ORC stripe column statistics"); + } + return *column_stats; +} + +Result> GetStripeColumnStatisticsBulk( + adapters::orc::ORCFileReader* reader, int64_t stripe_index, + const std::vector& column_indices) { + auto file_meta = reader->GetFileMetaData(); + if (file_meta == nullptr) { + return Status::Invalid("Failed to read ORC file metadata"); + } + if (stripe_index < 0 || stripe_index >= file_meta->num_stripes()) { + return Status::Invalid("Stripe index ", stripe_index, " out of range"); + } + auto stripe_meta = file_meta->Stripe(static_cast(stripe_index)); + if (stripe_meta == nullptr) { + return Status::Invalid("Failed to read ORC stripe metadata"); + } + std::vector stats; + stats.reserve(column_indices.size()); + for (int column_index : column_indices) { + if (column_index < 0 || column_index >= stripe_meta->num_columns()) { + return Status::Invalid("Column index ", column_index, " out of range"); + } + auto column_meta = stripe_meta->Column(column_index); + if (column_meta == nullptr) { + return Status::Invalid("Failed to read ORC stripe column metadata"); + } + auto column_stats = column_meta->statistics(); + if (column_stats == nullptr) { + return Status::Invalid("Failed to read ORC stripe column statistics"); + } + stats.push_back(std::move(*column_stats)); + } + return stats; +} + +struct ScalarStats { + bool has_null; + int64_t num_values; + bool has_min_max; + std::shared_ptr min; + std::shared_ptr max; +}; + +Result ToScalarStats(const adapters::orc::Statistics& statistics) { + ScalarStats converted; + converted.has_null = statistics.HasNullCount(); + converted.num_values = statistics.num_values(); + RETURN_NOT_OK(adapters::orc::StatisticsAsScalars(statistics, &converted.min, &converted.max)); + converted.has_min_max = converted.min != nullptr && converted.max != nullptr; + return converted; +} + +TEST(TestAdapterRead, FileMetaDataViewAccessors) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 256; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto long_batch = + internal::checked_cast(struct_batch->fields[0]); + auto str_batch = + internal::checked_cast(struct_batch->fields[1]); + + std::string data_buffer(row_count * 5, '\0'); + uint64_t offset = 0; + for (uint64_t i = 0; i < row_count; ++i) { + std::string str_data = std::to_string(i % 100); + long_batch->data[i] = static_cast(i); + str_batch->data[i] = &data_buffer[offset]; + str_batch->length[i] = static_cast(str_data.size()); + memcpy(&data_buffer[offset], str_data.c_str(), str_data.size()); + offset += str_data.size(); + } + struct_batch->numElements = row_count; + long_batch->numElements = row_count; + str_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + auto file_meta = reader->GetFileMetaData(); + ASSERT_NE(file_meta, nullptr); + EXPECT_TRUE(file_meta->valid()); + EXPECT_EQ(file_meta->num_stripes(), reader->NumberOfStripes()); + EXPECT_EQ(file_meta->num_rows(), reader->NumberOfRows()); + auto metadata = file_meta->key_value_metadata(); + ASSERT_NE(metadata, nullptr); + EXPECT_EQ(file_meta->schema_root().getKind(), liborc::STRUCT); + + auto stripe_meta = file_meta->Stripe(0); + ASSERT_NE(stripe_meta, nullptr); + EXPECT_TRUE(stripe_meta->valid()); + EXPECT_EQ(stripe_meta->stripe_index(), 0); + EXPECT_GT(stripe_meta->num_rows(), 0); + auto column_meta = stripe_meta->Column(1); + ASSERT_NE(column_meta, nullptr); + EXPECT_TRUE(column_meta->valid()); + EXPECT_EQ(column_meta->column_index(), 1); + auto column_stats = column_meta->statistics(); + ASSERT_NE(column_stats, nullptr); + EXPECT_TRUE(column_stats->valid()); +} + TEST(TestAdapterRead, ReadIntAndStringFileMultipleStripes) { MemoryOutputStream mem_stream(kDefaultMemStreamSize); std::unique_ptr type( @@ -1179,4 +1338,794 @@ TEST_F(TestORCWriterMultipleWrite, MultipleWritesIntFieldRecordBatch) { AssertBatchWriteReadEqual(input_batches, expected_output_table, kDefaultSmallMemStreamSize * 100); } + +TEST(TestAdapterRead, GetColumnStatisticsInteger) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 1000; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + auto bigint_batch = + internal::checked_cast(struct_batch->fields[1]); + + for (uint64_t i = 0; i < row_count; ++i) { + int_batch->data[i] = static_cast(i); + bigint_batch->data[i] = static_cast(i + 1000); + } + struct_batch->numElements = row_count; + int_batch->numElements = row_count; + bigint_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + ASSERT_OK_AND_ASSIGN(auto col1_stats, GetFileColumnStatistics(reader.get(), 1)); + ASSERT_OK_AND_ASSIGN(auto col1_scalars, ToScalarStats(col1_stats)); + EXPECT_EQ(col1_scalars.num_values, row_count); + EXPECT_TRUE(col1_scalars.has_min_max); + ASSERT_NE(col1_scalars.min, nullptr); + ASSERT_NE(col1_scalars.max, nullptr); + EXPECT_EQ(checked_pointer_cast(col1_scalars.min)->value, 0); + EXPECT_EQ(checked_pointer_cast(col1_scalars.max)->value, 999); + + ASSERT_OK_AND_ASSIGN(auto col2_stats, GetFileColumnStatistics(reader.get(), 2)); + ASSERT_OK_AND_ASSIGN(auto col2_scalars, ToScalarStats(col2_stats)); + EXPECT_EQ(col2_scalars.num_values, row_count); + EXPECT_TRUE(col2_scalars.has_min_max); + ASSERT_NE(col2_scalars.min, nullptr); + ASSERT_NE(col2_scalars.max, nullptr); + EXPECT_EQ(checked_pointer_cast(col2_scalars.min)->value, 1000); + EXPECT_EQ(checked_pointer_cast(col2_scalars.max)->value, 1999); +} + +TEST(TestAdapterRead, GetStripeColumnStatistics) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 500; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + + for (uint64_t i = 0; i < row_count; ++i) { + int_batch->data[i] = static_cast(i + 100); + } + struct_batch->numElements = row_count; + int_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + ASSERT_OK_AND_ASSIGN(auto stripe0_stats, + GetStripeColumnStatistics(reader.get(), 0, 1)); + ASSERT_OK_AND_ASSIGN(auto stripe0_scalars, ToScalarStats(stripe0_stats)); + EXPECT_TRUE(stripe0_scalars.has_min_max); + EXPECT_EQ(checked_pointer_cast(stripe0_scalars.min)->value, 100); + EXPECT_EQ(checked_pointer_cast(stripe0_scalars.max)->value, 599); +} + +TEST(TestAdapterRead, GetColumnStatisticsString) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 5; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto str_batch = + internal::checked_cast(struct_batch->fields[0]); + + std::vector strings = {"apple", "banana", "cherry", "date", "elderberry"}; + std::string data_buffer; + for (const auto& s : strings) { + data_buffer += s; + } + + size_t offset = 0; + for (size_t i = 0; i < strings.size(); ++i) { + str_batch->data[i] = const_cast(&data_buffer[offset]); + str_batch->length[i] = static_cast(strings[i].size()); + offset += strings[i].size(); + } + struct_batch->numElements = row_count; + str_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + ASSERT_OK_AND_ASSIGN(auto col_stats, GetFileColumnStatistics(reader.get(), 1)); + ASSERT_OK_AND_ASSIGN(auto col_scalars, ToScalarStats(col_stats)); + EXPECT_EQ(col_scalars.num_values, row_count); + EXPECT_TRUE(col_scalars.has_min_max); + ASSERT_NE(col_scalars.min, nullptr); + ASSERT_NE(col_scalars.max, nullptr); + EXPECT_EQ(checked_pointer_cast(col_scalars.min)->ToString(), "apple"); + EXPECT_EQ(checked_pointer_cast(col_scalars.max)->ToString(), "elderberry"); +} + +TEST(TestAdapterRead, GetColumnStatisticsOutOfRange) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 10; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + + for (uint64_t i = 0; i < row_count; ++i) { + int_batch->data[i] = static_cast(i); + } + struct_batch->numElements = row_count; + int_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + EXPECT_THAT(GetFileColumnStatistics(reader.get(), 999), + Raises(StatusCode::Invalid, testing::HasSubstr("out of range"))); + + EXPECT_THAT(GetStripeColumnStatistics(reader.get(), 999, 1), + Raises(StatusCode::Invalid, testing::HasSubstr("out of range"))); +} + +TEST(TestAdapterRead, GetColumnStatisticsWithNulls) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 10; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + + for (uint64_t i = 0; i < row_count; ++i) { + int_batch->data[i] = static_cast(i); + int_batch->notNull[i] = (i % 2 == 0) ? 1 : 0; + } + int_batch->hasNulls = true; + struct_batch->numElements = row_count; + int_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + ASSERT_OK_AND_ASSIGN(auto col_stats, GetFileColumnStatistics(reader.get(), 1)); + ASSERT_OK_AND_ASSIGN(auto col_scalars, ToScalarStats(col_stats)); + EXPECT_TRUE(col_stats.HasNullCount()); + EXPECT_TRUE(col_scalars.has_min_max); +} + +TEST(TestAdapterRead, GetColumnStatisticsBoolean) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 100; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto bool_batch = + internal::checked_cast(struct_batch->fields[0]); + + // Write 60 true values and 40 false values + for (uint64_t i = 0; i < row_count; ++i) { + bool_batch->data[i] = (i < 60) ? 1 : 0; + } + struct_batch->numElements = row_count; + bool_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + ASSERT_OK_AND_ASSIGN(auto col_stats, GetFileColumnStatistics(reader.get(), 1)); + ASSERT_OK_AND_ASSIGN(auto col_scalars, ToScalarStats(col_stats)); + EXPECT_EQ(col_scalars.num_values, row_count); + EXPECT_FALSE(col_scalars.has_min_max); // Boolean types don't have min/max + EXPECT_EQ(col_scalars.min, nullptr); + EXPECT_EQ(col_scalars.max, nullptr); +} + +TEST(TestAdapterRead, GetColumnStatisticsDate) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 10; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto date_batch = + internal::checked_cast(struct_batch->fields[0]); + + // Write dates: 0 (1970-01-01) through 9 (1970-01-10) + for (uint64_t i = 0; i < row_count; ++i) { + date_batch->data[i] = static_cast(i); + } + struct_batch->numElements = row_count; + date_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + ASSERT_OK_AND_ASSIGN(auto col_stats, GetFileColumnStatistics(reader.get(), 1)); + ASSERT_OK_AND_ASSIGN(auto col_scalars, ToScalarStats(col_stats)); + EXPECT_EQ(col_scalars.num_values, row_count); + EXPECT_TRUE(col_scalars.has_min_max); + ASSERT_NE(col_scalars.min, nullptr); + ASSERT_NE(col_scalars.max, nullptr); + EXPECT_EQ(checked_pointer_cast(col_scalars.min)->value, 0); + EXPECT_EQ(checked_pointer_cast(col_scalars.max)->value, 9); +} + +TEST(TestAdapterRead, GetColumnStatisticsTimestamp) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 10; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto ts_batch = + internal::checked_cast(struct_batch->fields[0]); + + // Write timestamps with both seconds and nanoseconds + for (uint64_t i = 0; i < row_count; ++i) { + ts_batch->data[i] = static_cast(i * 1000); // seconds + ts_batch->nanoseconds[i] = static_cast(i * 100); // nanoseconds + } + struct_batch->numElements = row_count; + ts_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + ASSERT_OK_AND_ASSIGN(auto col_stats, GetFileColumnStatistics(reader.get(), 1)); + ASSERT_OK_AND_ASSIGN(auto col_scalars, ToScalarStats(col_stats)); + EXPECT_EQ(col_scalars.num_values, row_count); + EXPECT_TRUE(col_scalars.has_min_max); + ASSERT_NE(col_scalars.min, nullptr); + ASSERT_NE(col_scalars.max, nullptr); + + // Verify the timestamps are TimestampScalar with correct nanosecond values. + // Written: data[i] = i*1000 seconds, nanoseconds[i] = i*100 sub-second nanos. + // Min (i=0): 0s + 0ns = 0 ns total. + // Max (i=9): 9000s + 900ns = 9000 * 1,000,000,000 + 900 = 9,000,000,000,900 ns. + // Statistics store millis + sub-ms-nanos (last 6 digits of ns). + // Conversion: millis * 1,000,000 + sub_ms_nanos = total nanoseconds. + auto min_ts = checked_pointer_cast(col_scalars.min); + auto max_ts = checked_pointer_cast(col_scalars.max); + EXPECT_TRUE(min_ts->is_valid); + EXPECT_TRUE(max_ts->is_valid); + EXPECT_EQ(min_ts->value, 0); + // 9000 seconds + 900 nanoseconds + constexpr int64_t expected_max_ns = 9000LL * 1000000000LL + 900LL; + EXPECT_EQ(max_ts->value, expected_max_ns); +} + +TEST(TestAdapterRead, ReadStripesMultiple) { + // Create a test file and read all stripes using ReadStripes + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 100; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + auto bigint_batch = + internal::checked_cast(struct_batch->fields[1]); + + for (uint64_t i = 0; i < row_count; ++i) { + int_batch->data[i] = static_cast(i); + bigint_batch->data[i] = static_cast(i + 1000); + } + struct_batch->numElements = row_count; + int_batch->numElements = row_count; + bigint_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + int64_t num_stripes = reader->NumberOfStripes(); + ASSERT_GT(num_stripes, 0); + + // Build vector of all stripe indices + std::vector stripe_indices; + for (int64_t i = 0; i < num_stripes; ++i) { + stripe_indices.push_back(i); + } + + // Read all stripes using ReadStripes + ASSERT_OK_AND_ASSIGN(auto table, reader->ReadStripes(stripe_indices)); + + // Should have all rows and correct schema + EXPECT_EQ(table->num_rows(), row_count); + EXPECT_EQ(table->num_columns(), 2); + EXPECT_EQ(table->schema()->field(0)->name(), "col1"); + EXPECT_EQ(table->schema()->field(1)->name(), "col2"); +} + +TEST(TestAdapterRead, ReadStripesWithColumnSelection) { + // Create a test file with multiple columns + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type(liborc::Type::buildTypeFromString( + "struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 100; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + auto bigint_batch = + internal::checked_cast(struct_batch->fields[1]); + auto double_batch = + internal::checked_cast(struct_batch->fields[2]); + + for (uint64_t i = 0; i < row_count; ++i) { + int_batch->data[i] = static_cast(i); + bigint_batch->data[i] = static_cast(i + 100); + double_batch->data[i] = static_cast(i) + 0.5; + } + struct_batch->numElements = row_count; + int_batch->numElements = row_count; + bigint_batch->numElements = row_count; + double_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + int64_t num_stripes = reader->NumberOfStripes(); + ASSERT_GT(num_stripes, 0); + + // Build vector of all stripe indices + std::vector stripe_indices; + for (int64_t i = 0; i < num_stripes; ++i) { + stripe_indices.push_back(i); + } + + // Read all stripes but only columns 0 and 2 (col1 and col3) + std::vector include_indices = {0, 2}; + ASSERT_OK_AND_ASSIGN(auto table, reader->ReadStripes(stripe_indices, include_indices)); + + // Should have all rows and 2 columns + EXPECT_EQ(table->num_rows(), row_count); + EXPECT_EQ(table->num_columns(), 2); + + // Verify we got the right columns (col2 was skipped) + EXPECT_EQ(table->schema()->field(0)->name(), "col1"); + EXPECT_EQ(table->schema()->field(1)->name(), "col3"); +} + +TEST(TestAdapterRead, ReadStripesOutOfRange) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 512; + constexpr uint64_t row_count = 100; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + + for (uint64_t i = 0; i < row_count; ++i) { + int_batch->data[i] = static_cast(i); + } + struct_batch->numElements = row_count; + int_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + int64_t num_stripes = reader->NumberOfStripes(); + ASSERT_GT(num_stripes, 0); + + // Try to read a stripe index that's out of range + std::vector stripe_indices = {0, num_stripes}; // num_stripes is out of range + EXPECT_THAT(reader->ReadStripes(stripe_indices), + Raises(StatusCode::Invalid, testing::HasSubstr("Out of bounds stripe"))); +} + +TEST(TestAdapterRead, ReadStripesEmptyVector) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 512; + constexpr uint64_t row_count = 100; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + + for (uint64_t i = 0; i < row_count; ++i) { + int_batch->data[i] = static_cast(i); + } + struct_batch->numElements = row_count; + int_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + // Empty stripe indices should return an empty table with the file's schema + std::vector stripe_indices; + ASSERT_OK_AND_ASSIGN(auto table, reader->ReadStripes(stripe_indices)); + EXPECT_EQ(table->num_rows(), 0); + EXPECT_EQ(table->num_columns(), 1); + EXPECT_EQ(table->schema()->field(0)->name(), "col1"); +} + +TEST(TestAdapterRead, GetORCType) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type(liborc::Type::buildTypeFromString( + "struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 10; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + + struct_batch->numElements = row_count; + for (size_t i = 0; i < struct_batch->fields.size(); ++i) { + struct_batch->fields[i]->numElements = row_count; + } + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + // Get the ORC type — returns a reference, no cast needed + const liborc::Type& orc_type = reader->GetORCType(); + + // Root should be STRUCT + EXPECT_EQ(orc_type.getKind(), liborc::STRUCT); + + // Should have 3 subtypes (col1, col2, col3) + EXPECT_EQ(orc_type.getSubtypeCount(), 3); + + // Verify field names + EXPECT_EQ(orc_type.getFieldName(0), "col1"); + EXPECT_EQ(orc_type.getFieldName(1), "col2"); + EXPECT_EQ(orc_type.getFieldName(2), "col3"); + + // Verify field types + EXPECT_EQ(orc_type.getSubtype(0)->getKind(), liborc::INT); + EXPECT_EQ(orc_type.getSubtype(1)->getKind(), liborc::LONG); + EXPECT_EQ(orc_type.getSubtype(2)->getKind(), liborc::STRING); +} + +TEST(TestAdapterRead, GetColumnStatisticsDoubleNaN) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 10; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto double_batch = + internal::checked_cast(struct_batch->fields[0]); + + // Write some normal values and some NaN values + for (uint64_t i = 0; i < row_count; ++i) { + if (i % 3 == 0) { + double_batch->data[i] = std::numeric_limits::quiet_NaN(); + } else { + double_batch->data[i] = static_cast(i); + } + } + struct_batch->numElements = row_count; + double_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + ASSERT_OK_AND_ASSIGN(auto col_stats, GetFileColumnStatistics(reader.get(), 1)); + ASSERT_OK_AND_ASSIGN(auto col_scalars, ToScalarStats(col_stats)); + // When NaN values are present, ORC may report NaN as min or max. + // Our guard should detect this and set has_min_max = false. + if (col_scalars.has_min_max) { + // If ORC writer filtered NaN from statistics, min/max should be valid (non-NaN) + auto min_val = checked_pointer_cast(col_scalars.min); + auto max_val = checked_pointer_cast(col_scalars.max); + EXPECT_FALSE(std::isnan(min_val->value)); + EXPECT_FALSE(std::isnan(max_val->value)); + } + // Either has_min_max is false (NaN guard triggered), or min/max are non-NaN. + // Both outcomes are correct. +} + +TEST(TestAdapterRead, GetColumnStatisticsNegativeIndex) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 10; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + + for (uint64_t i = 0; i < row_count; ++i) { + int_batch->data[i] = static_cast(i); + } + struct_batch->numElements = row_count; + int_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + // Negative column index should return Invalid + EXPECT_THAT(GetFileColumnStatistics(reader.get(), -1), + Raises(StatusCode::Invalid, testing::HasSubstr("out of range"))); + + // Negative column index in stripe stats should also return Invalid + EXPECT_THAT(GetStripeColumnStatistics(reader.get(), 0, -1), + Raises(StatusCode::Invalid, testing::HasSubstr("out of range"))); +} + +TEST(TestAdapterRead, GetStripeStatisticsBulk) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type(liborc::Type::buildTypeFromString( + "struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 100; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + auto bigint_batch = + internal::checked_cast(struct_batch->fields[1]); + auto double_batch = + internal::checked_cast(struct_batch->fields[2]); + + for (uint64_t i = 0; i < row_count; ++i) { + int_batch->data[i] = static_cast(i); + bigint_batch->data[i] = static_cast(i + 1000); + double_batch->data[i] = static_cast(i) + 0.5; + } + struct_batch->numElements = row_count; + int_batch->numElements = row_count; + bigint_batch->numElements = row_count; + double_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + // ORC column indices: 0=root struct, 1=col1, 2=col2, 3=col3 + std::vector column_indices = {1, 2, 3}; + + // Get bulk stats for stripe 0 + ASSERT_OK_AND_ASSIGN(auto bulk_stats, + GetStripeColumnStatisticsBulk(reader.get(), 0, column_indices)); + ASSERT_EQ(bulk_stats.size(), 3); + + // Verify bulk results match individual GetStripeColumnStatistics calls + for (size_t i = 0; i < column_indices.size(); ++i) { + ASSERT_OK_AND_ASSIGN( + auto individual_stats, + GetStripeColumnStatistics(reader.get(), 0, column_indices[i])); + ASSERT_OK_AND_ASSIGN(auto bulk_scalars, ToScalarStats(bulk_stats[i])); + ASSERT_OK_AND_ASSIGN(auto individual_scalars, ToScalarStats(individual_stats)); + EXPECT_EQ(bulk_stats[i].HasNullCount(), individual_stats.HasNullCount()); + EXPECT_EQ(bulk_scalars.num_values, individual_scalars.num_values); + EXPECT_EQ(bulk_scalars.has_min_max, individual_scalars.has_min_max); + if (bulk_scalars.has_min_max && individual_scalars.has_min_max) { + EXPECT_TRUE(bulk_scalars.min->Equals(*individual_scalars.min)); + EXPECT_TRUE(bulk_scalars.max->Equals(*individual_scalars.max)); + } + } + + // Verify out-of-range column index in bulk API + std::vector bad_indices = {1, 999}; + EXPECT_THAT(GetStripeColumnStatisticsBulk(reader.get(), 0, bad_indices), + Raises(StatusCode::Invalid, testing::HasSubstr("out of range"))); + + // Verify out-of-range stripe index in bulk API + EXPECT_THAT(GetStripeColumnStatisticsBulk(reader.get(), 999, column_indices), + Raises(StatusCode::Invalid, testing::HasSubstr("out of range"))); +} + +TEST(TestAdapterRead, GetColumnStatisticsDecimal) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + // decimal(10,2) uses Decimal64VectorBatch internally (precision <= 18) + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 5; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto dec_batch = + internal::checked_cast(struct_batch->fields[0]); + + // Write unscaled values with non-zero trailing digits to avoid ORC + // normalizing away trailing zeros in statistics (e.g. 1.00 → 1 scale 0). + // Values: 101, 203, 305, 407, 509 with scale=2 + // Represent: 1.01, 2.03, 3.05, 4.07, 5.09 + dec_batch->values[0] = 101; + dec_batch->values[1] = 203; + dec_batch->values[2] = 305; + dec_batch->values[3] = 407; + dec_batch->values[4] = 509; + struct_batch->numElements = row_count; + dec_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + ASSERT_OK_AND_ASSIGN(auto col_stats, GetFileColumnStatistics(reader.get(), 1)); + ASSERT_OK_AND_ASSIGN(auto col_scalars, ToScalarStats(col_stats)); + EXPECT_EQ(col_scalars.num_values, row_count); + EXPECT_FALSE(col_stats.HasNullCount()); + EXPECT_TRUE(col_scalars.has_min_max); + ASSERT_NE(col_scalars.min, nullptr); + ASSERT_NE(col_scalars.max, nullptr); + + // Verify Decimal128Scalar values (unscaled integers with scale 2) + auto min_dec = checked_pointer_cast(col_scalars.min); + auto max_dec = checked_pointer_cast(col_scalars.max); + EXPECT_EQ(min_dec->value, Decimal128(101)); // 1.01 unscaled + EXPECT_EQ(max_dec->value, Decimal128(509)); // 5.09 unscaled + + // Verify the type has precision 38 and scale 2 (as set by ConvertColumnStatistics) + EXPECT_TRUE(min_dec->type->Equals(decimal128(38, 2))); + EXPECT_TRUE(max_dec->type->Equals(decimal128(38, 2))); +} + } // namespace arrow diff --git a/cpp/src/arrow/adapters/orc/statistics.cc b/cpp/src/arrow/adapters/orc/statistics.cc new file mode 100644 index 000000000000..98a877778d87 --- /dev/null +++ b/cpp/src/arrow/adapters/orc/statistics.cc @@ -0,0 +1,269 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/adapters/orc/statistics.h" + +#include +#include + +#include "arrow/scalar.h" +#include "arrow/type.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/decimal.h" +#include "arrow/util/key_value_metadata.h" +#include "arrow/util/logging.h" +#include "orc/Reader.hh" +#include "orc/Statistics.hh" + +namespace liborc = orc; + +namespace arrow { +namespace adapters { +namespace orc { +namespace { + +constexpr int64_t kMaxMillisForNanos = std::numeric_limits::max() / 1000000LL; +constexpr int64_t kMinMillisForNanos = std::numeric_limits::lowest() / 1000000LL; + +bool MillisFitInNanos(int64_t millis) { + return millis >= kMinMillisForNanos && millis <= kMaxMillisForNanos; +} + +} // namespace + +bool Statistics::HasNullCount() const { return column_statistics_->hasNull(); } + +int FileMetaData::num_columns() const { + return static_cast(file_statistics_->getNumberOfColumns()); +} + +int FileMetaData::num_stripes() const { + return static_cast(reader_->getNumberOfStripes()); +} + +int64_t FileMetaData::num_rows() const { + return static_cast(reader_->getNumberOfRows()); +} + +std::unique_ptr FileMetaData::Stripe(int stripe_index) const { + if (!valid()) { + return nullptr; + } + if (stripe_index < 0 || stripe_index >= num_stripes()) { + return nullptr; + } + + auto stripe_stats = std::shared_ptr( + reader_->getStripeStatistics(static_cast(stripe_index)).release()); + auto stripe_info = reader_->getStripe(static_cast(stripe_index)); + return std::make_unique( + stripe_index, static_cast(stripe_info->getNumberOfRows()), + std::move(stripe_stats)); +} + +std::unique_ptr FileMetaData::Column(int column_index) const { + if (!valid()) { + return nullptr; + } + if (column_index < 0 || static_cast(column_index) >= + file_statistics_->getNumberOfColumns()) { + return nullptr; + } + + const liborc::ColumnStatistics* col_stats = + file_statistics_->getColumnStatistics(static_cast(column_index)); + return std::make_unique( + column_index, Statistics(file_statistics_, col_stats)); +} + +std::shared_ptr FileMetaData::key_value_metadata() const { + if (!valid()) { + return nullptr; + } + + auto metadata = std::make_shared(); + const std::list keys = reader_->getMetadataKeys(); + for (const auto& key : keys) { + metadata->Append(key, reader_->getMetadataValue(key)); + } + return std::const_pointer_cast(metadata); +} + +const ::orc::Type& FileMetaData::schema_root() const { return reader_->getType(); } + +std::shared_ptr ColumnMetaData::statistics() const { + if (!valid()) { + return nullptr; + } + return std::make_shared(statistics_); +} + +int StripeMetaData::num_columns() const { + return static_cast(stripe_statistics_->getNumberOfColumns()); +} + +std::unique_ptr StripeMetaData::Column(int column_index) const { + if (!valid()) { + return nullptr; + } + if (column_index < 0 || static_cast(column_index) >= + stripe_statistics_->getNumberOfColumns()) { + return nullptr; + } + + const liborc::ColumnStatistics* col_stats = + stripe_statistics_->getColumnStatistics(static_cast(column_index)); + return std::make_unique( + column_index, Statistics(stripe_statistics_, col_stats)); +} + +int64_t Statistics::null_count() const { + ARROW_DCHECK(HasNullCount()); + // liborc doesn't expose null_count on ColumnStatistics; callers must treat + // this as a preconditioned API guarded by HasNullCount(). + return 0; +} + +int64_t Statistics::num_values() const { + return static_cast(column_statistics_->getNumberOfValues()); +} + +const liborc::IntegerColumnStatistics* Statistics::integer() const { + return dynamic_cast(column_statistics_); +} + +const liborc::DoubleColumnStatistics* Statistics::floating_point() const { + return dynamic_cast(column_statistics_); +} + +const liborc::StringColumnStatistics* Statistics::string() const { + return dynamic_cast(column_statistics_); +} + +const liborc::DateColumnStatistics* Statistics::date() const { + return dynamic_cast(column_statistics_); +} + +const liborc::TimestampColumnStatistics* Statistics::timestamp() const { + return dynamic_cast(column_statistics_); +} + +const liborc::DecimalColumnStatistics* Statistics::decimal() const { + return dynamic_cast(column_statistics_); +} + +bool Statistics::HasMinMax() const { + if (!valid()) { + return false; + } + + if (const auto* int_stats = integer()) { + return int_stats->hasMinimum() && int_stats->hasMaximum(); + } + if (const auto* double_stats = floating_point()) { + if (!double_stats->hasMinimum() || !double_stats->hasMaximum()) { + return false; + } + return !std::isnan(double_stats->getMinimum()) && !std::isnan(double_stats->getMaximum()); + } + if (const auto* string_stats = string()) { + return string_stats->hasMinimum() && string_stats->hasMaximum(); + } + if (const auto* date_stats = date()) { + return date_stats->hasMinimum() && date_stats->hasMaximum(); + } + if (const auto* ts_stats = timestamp()) { + if (!ts_stats->hasMinimum() || !ts_stats->hasMaximum()) { + return false; + } + return MillisFitInNanos(ts_stats->getMinimum()) && + MillisFitInNanos(ts_stats->getMaximum()); + } + if (const auto* decimal_stats = decimal()) { + if (!decimal_stats->hasMinimum() || !decimal_stats->hasMaximum()) { + return false; + } + liborc::Decimal min_dec = decimal_stats->getMinimum(); + liborc::Decimal max_dec = decimal_stats->getMaximum(); + return min_dec.scale == max_dec.scale; + } + return false; +} + +Status StatisticsAsScalars(const Statistics& statistics, std::shared_ptr* min, + std::shared_ptr* max) { + if (min == nullptr || max == nullptr) { + return Status::Invalid("Output pointers for min and max cannot be null"); + } + if (!statistics.valid()) { + return Status::Invalid("ORC statistics wrapper is not initialized"); + } + + *min = nullptr; + *max = nullptr; + + if (!statistics.HasMinMax()) { + return Status::OK(); + } + + if (const auto* int_stats = statistics.integer()) { + *min = std::make_shared(int_stats->getMinimum()); + *max = std::make_shared(int_stats->getMaximum()); + return Status::OK(); + } + if (const auto* double_stats = statistics.floating_point()) { + *min = std::make_shared(double_stats->getMinimum()); + *max = std::make_shared(double_stats->getMaximum()); + return Status::OK(); + } + if (const auto* string_stats = statistics.string()) { + *min = std::make_shared(string_stats->getMinimum()); + *max = std::make_shared(string_stats->getMaximum()); + return Status::OK(); + } + if (const auto* date_stats = statistics.date()) { + *min = std::make_shared(date_stats->getMinimum()); + *max = std::make_shared(date_stats->getMaximum()); + return Status::OK(); + } + if (const auto* ts_stats = statistics.timestamp()) { + auto ts_type = timestamp(TimeUnit::NANO); + *min = std::make_shared( + ts_stats->getMinimum() * 1000000LL + ts_stats->getMinimumNanos(), ts_type); + *max = std::make_shared( + ts_stats->getMaximum() * 1000000LL + ts_stats->getMaximumNanos(), ts_type); + return Status::OK(); + } + if (const auto* decimal_stats = statistics.decimal()) { + liborc::Decimal min_dec = decimal_stats->getMinimum(); + liborc::Decimal max_dec = decimal_stats->getMaximum(); + + Decimal128 min_d128(min_dec.value.getHighBits(), min_dec.value.getLowBits()); + Decimal128 max_d128(max_dec.value.getHighBits(), max_dec.value.getLowBits()); + auto dec_type = decimal128(38, min_dec.scale); + + *min = std::make_shared(min_d128, dec_type); + *max = std::make_shared(max_d128, dec_type); + return Status::OK(); + } + + return Status::OK(); +} + +} // namespace orc +} // namespace adapters +} // namespace arrow diff --git a/cpp/src/arrow/adapters/orc/statistics.h b/cpp/src/arrow/adapters/orc/statistics.h new file mode 100644 index 000000000000..8481dcd4ab18 --- /dev/null +++ b/cpp/src/arrow/adapters/orc/statistics.h @@ -0,0 +1,151 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "arrow/result.h" +#include "arrow/scalar.h" +#include "arrow/status.h" +#include "arrow/type_fwd.h" +#include "arrow/util/visibility.h" + +namespace orc { +class ColumnStatistics; +class DateColumnStatistics; +class DecimalColumnStatistics; +class DoubleColumnStatistics; +class IntegerColumnStatistics; +class Reader; +class Statistics; +class StringColumnStatistics; +class TimestampColumnStatistics; +class Type; +} // namespace orc + +namespace arrow { +class KeyValueMetadata; + +namespace adapters { +namespace orc { + +class Statistics; +class ColumnMetaData; +class StripeMetaData; + +/// \brief File-level ORC metadata container. +class ARROW_EXPORT FileMetaData { + public: + FileMetaData() = default; + FileMetaData(std::shared_ptr reader, + std::shared_ptr file_statistics) + : reader_(std::move(reader)), file_statistics_(std::move(file_statistics)) {} + + bool valid() const { return reader_ != nullptr && file_statistics_ != nullptr; } + int num_columns() const; + int num_stripes() const; + int64_t num_rows() const; + std::unique_ptr Stripe(int stripe_index) const; + std::unique_ptr Column(int column_index) const; + std::shared_ptr key_value_metadata() const; + const ::orc::Type& schema_root() const; + + private: + std::shared_ptr reader_; + std::shared_ptr file_statistics_; +}; + +/// \brief Stripe-level ORC metadata container. +class ARROW_EXPORT StripeMetaData { + public: + StripeMetaData() = default; + StripeMetaData(int64_t stripe_index, + int64_t num_rows, + std::shared_ptr stripe_statistics) + : stripe_index_(stripe_index), + num_rows_(num_rows), + stripe_statistics_(std::move(stripe_statistics)) {} + + bool valid() const { return stripe_statistics_ != nullptr; } + int64_t stripe_index() const { return stripe_index_; } + int64_t num_rows() const { return num_rows_; } + int num_columns() const; + std::unique_ptr Column(int column_index) const; + + private: + int64_t stripe_index_ = -1; + int64_t num_rows_ = 0; + std::shared_ptr stripe_statistics_; +}; + +/// \brief Thin wrapper over liborc column statistics. +/// +/// Keeps liborc type dispatch in one place and provides a stable API surface +/// for downstream consumers. +class ARROW_EXPORT Statistics { + public: + Statistics() = default; + Statistics(std::shared_ptr owner, + const ::orc::ColumnStatistics* column_statistics) + : owner_(std::move(owner)), column_statistics_(column_statistics) {} + explicit Statistics(const ::orc::ColumnStatistics* column_statistics) + : column_statistics_(column_statistics) {} + + bool valid() const { return column_statistics_ != nullptr; } + bool HasNullCount() const; + int64_t null_count() const; + int64_t num_values() const; + bool HasMinMax() const; + + const ::orc::ColumnStatistics* raw() const { return column_statistics_; } + const ::orc::IntegerColumnStatistics* integer() const; + const ::orc::DoubleColumnStatistics* floating_point() const; + const ::orc::StringColumnStatistics* string() const; + const ::orc::DateColumnStatistics* date() const; + const ::orc::TimestampColumnStatistics* timestamp() const; + const ::orc::DecimalColumnStatistics* decimal() const; + + private: + std::shared_ptr owner_; + const ::orc::ColumnStatistics* column_statistics_ = nullptr; +}; + +/// \brief Column-level metadata container exposing statistics. +class ARROW_EXPORT ColumnMetaData { + public: + ColumnMetaData() = default; + ColumnMetaData(int column_index, Statistics statistics) + : column_index_(column_index), statistics_(std::move(statistics)) {} + + bool valid() const { return statistics_.valid(); } + int column_index() const { return column_index_; } + std::shared_ptr statistics() const; + + private: + int column_index_ = -1; + Statistics statistics_; +}; + +ARROW_EXPORT Status StatisticsAsScalars(const Statistics& statistics, + std::shared_ptr* min, + std::shared_ptr* max); + +} // namespace orc +} // namespace adapters +} // namespace arrow diff --git a/cpp/src/arrow/dataset/file_orc.cc b/cpp/src/arrow/dataset/file_orc.cc index 1393df57f9d7..d66d2eb5663f 100644 --- a/cpp/src/arrow/dataset/file_orc.cc +++ b/cpp/src/arrow/dataset/file_orc.cc @@ -85,7 +85,7 @@ class OrcScanTask { included_fields.push_back(schema->field(match.indices()[0])->name()); } - std::shared_ptr record_batch_reader; + std::unique_ptr record_batch_reader; ARROW_ASSIGN_OR_RAISE( record_batch_reader, reader->GetRecordBatchReader(scan_options.batch_size, included_fields)); @@ -99,7 +99,7 @@ class OrcScanTask { return batch; } - std::shared_ptr record_batch_reader_; + std::unique_ptr record_batch_reader_; }; return Impl::Make(fragment_->source(),