diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index eee63b11ca1c..8a6a6a07b9ac 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -1016,7 +1016,7 @@ endif() if(ARROW_ORC) arrow_add_object_library(ARROW_ORC adapters/orc/adapter.cc adapters/orc/options.cc - adapters/orc/util.cc) + adapters/orc/statistics.cc adapters/orc/util.cc) foreach(ARROW_ORC_TARGET ${ARROW_ORC_TARGETS}) target_link_libraries(${ARROW_ORC_TARGET} PRIVATE orc::orc) if(ARROW_ORC_VERSION VERSION_LESS "2.0.0") diff --git a/cpp/src/arrow/adapters/orc/CMakeLists.txt b/cpp/src/arrow/adapters/orc/CMakeLists.txt index bae63210b29e..9b778f3e41ef 100644 --- a/cpp/src/arrow/adapters/orc/CMakeLists.txt +++ b/cpp/src/arrow/adapters/orc/CMakeLists.txt @@ -20,7 +20,7 @@ # # Headers: top level -install(FILES adapter.h options.h +install(FILES adapter.h options.h statistics.h DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/arrow/adapters/orc") # pkg-config support diff --git a/cpp/src/arrow/adapters/orc/adapter.cc b/cpp/src/arrow/adapters/orc/adapter.cc index 51cca497485c..6cf5ef27b1aa 100644 --- a/cpp/src/arrow/adapters/orc/adapter.cc +++ b/cpp/src/arrow/adapters/orc/adapter.cc @@ -39,10 +39,11 @@ #include "arrow/type.h" #include "arrow/util/bit_util.h" #include "arrow/util/checked_cast.h" -#include "arrow/util/decimal.h" #include "arrow/util/key_value_metadata.h" #include "arrow/util/macros.h" #include "orc/Exceptions.hh" +#include "orc/Statistics.hh" +#include "orc/Type.hh" // alias to not interfere with nested orc namespace namespace liborc = orc; @@ -409,6 +410,44 @@ class ORCFileReader::Impl { return ReadBatch(opts, schema, stripes_[static_cast(stripe)].num_rows); } + Result> ReadStripes( + const std::vector& stripe_indices) { + if (stripe_indices.empty()) { + ARROW_ASSIGN_OR_RAISE(auto schema, ReadSchema()); + return Table::MakeEmpty(schema); + } + + std::vector> batches; + batches.reserve(stripe_indices.size()); + for (int64_t stripe_index : stripe_indices) { + ARROW_ASSIGN_OR_RAISE(auto batch, ReadStripe(stripe_index)); + batches.push_back(std::move(batch)); + } + ARROW_ASSIGN_OR_RAISE(auto schema, ReadSchema()); + return Table::FromRecordBatches(schema, std::move(batches)); + } + + Result> ReadStripes(const std::vector& stripe_indices, + const std::vector& include_indices) { + if (stripe_indices.empty()) { + liborc::RowReaderOptions opts = DefaultRowReaderOptions(); + RETURN_NOT_OK(SelectIndices(&opts, include_indices)); + ARROW_ASSIGN_OR_RAISE(auto schema, ReadSchema(opts)); + return Table::MakeEmpty(schema); + } + + std::vector> batches; + batches.reserve(stripe_indices.size()); + for (int64_t stripe_index : stripe_indices) { + ARROW_ASSIGN_OR_RAISE(auto batch, ReadStripe(stripe_index, include_indices)); + batches.push_back(std::move(batch)); + } + liborc::RowReaderOptions opts = DefaultRowReaderOptions(); + RETURN_NOT_OK(SelectIndices(&opts, include_indices)); + ARROW_ASSIGN_OR_RAISE(auto schema, ReadSchema(opts)); + return Table::FromRecordBatches(schema, std::move(batches)); + } + Status SelectStripe(liborc::RowReaderOptions* opts, int64_t stripe) { ARROW_RETURN_IF(stripe < 0 || stripe >= NumberOfStripes(), Status::Invalid("Out of bounds stripe: ", stripe)); @@ -548,6 +587,65 @@ class ORCFileReader::Impl { return NextStripeReader(batch_size, empty_vec); } + Result GetColumnStatistics(int column_index) { + ORC_BEGIN_CATCH_NOT_OK + auto file_stats = + std::shared_ptr(reader_->getStatistics().release()); + if (column_index < 0 || + static_cast(column_index) >= file_stats->getNumberOfColumns()) { + return Status::Invalid("Column index ", column_index, " out of range [0, ", + file_stats->getNumberOfColumns(), ")"); + } + const liborc::ColumnStatistics* col_stats = + file_stats->getColumnStatistics(static_cast(column_index)); + return Statistics(std::move(file_stats), col_stats); + ORC_END_CATCH_NOT_OK + } + + Result GetStripeColumnStatistics(int64_t stripe_index, int column_index) { + ORC_BEGIN_CATCH_NOT_OK + if (stripe_index < 0 || stripe_index >= static_cast(stripes_.size())) { + return Status::Invalid("Stripe index ", stripe_index, " out of range"); + } + auto stripe_stats = std::shared_ptr( + reader_->getStripeStatistics(static_cast(stripe_index)).release()); + if (column_index < 0 || + static_cast(column_index) >= stripe_stats->getNumberOfColumns()) { + return Status::Invalid("Column index ", column_index, " out of range [0, ", + stripe_stats->getNumberOfColumns(), ")"); + } + const liborc::ColumnStatistics* col_stats = + stripe_stats->getColumnStatistics(static_cast(column_index)); + return Statistics(std::move(stripe_stats), col_stats); + ORC_END_CATCH_NOT_OK + } + + Result> GetStripeStatistics( + int64_t stripe_index, const std::vector& column_indices) { + ORC_BEGIN_CATCH_NOT_OK + if (stripe_index < 0 || stripe_index >= static_cast(stripes_.size())) { + return Status::Invalid("Stripe index ", stripe_index, " out of range"); + } + auto stripe_stats = std::shared_ptr( + reader_->getStripeStatistics(static_cast(stripe_index)).release()); + std::vector results; + results.reserve(column_indices.size()); + for (int col_idx : column_indices) { + if (col_idx < 0 || + static_cast(col_idx) >= stripe_stats->getNumberOfColumns()) { + return Status::Invalid("Column index ", col_idx, " out of range [0, ", + stripe_stats->getNumberOfColumns(), ")"); + } + const liborc::ColumnStatistics* col_stats = + stripe_stats->getColumnStatistics(static_cast(col_idx)); + results.emplace_back(stripe_stats, col_stats); + } + return results; + ORC_END_CATCH_NOT_OK + } + + const liborc::Type& GetORCType() { return reader_->getType(); } + private: MemoryPool* pool_; std::unique_ptr reader_; @@ -613,6 +711,17 @@ Result> ORCFileReader::ReadStripe( return impl_->ReadStripe(stripe, include_names); } +Result> ORCFileReader::ReadStripes( + const std::vector& stripe_indices) { + return impl_->ReadStripes(stripe_indices); +} + +Result> ORCFileReader::ReadStripes( + const std::vector& stripe_indices, + const std::vector& include_indices) { + return impl_->ReadStripes(stripe_indices, include_indices); +} + Status ORCFileReader::Seek(int64_t row_number) { return impl_->Seek(row_number); } Result> ORCFileReader::NextStripeReader( @@ -678,6 +787,22 @@ std::string ORCFileReader::GetSerializedFileTail() { return impl_->GetSerializedFileTail(); } +Result ORCFileReader::GetColumnStatistics(int column_index) { + return impl_->GetColumnStatistics(column_index); +} + +Result ORCFileReader::GetStripeColumnStatistics(int64_t stripe_index, + int column_index) { + return impl_->GetStripeColumnStatistics(stripe_index, column_index); +} + +Result> ORCFileReader::GetStripeStatistics( + int64_t stripe_index, const std::vector& column_indices) { + return impl_->GetStripeStatistics(stripe_index, column_indices); +} + +const ::orc::Type& ORCFileReader::GetORCType() { return impl_->GetORCType(); } + namespace { class ArrowOutputStream : public liborc::OutputStream { diff --git a/cpp/src/arrow/adapters/orc/adapter.h b/cpp/src/arrow/adapters/orc/adapter.h index 4ffff81f355f..b967a6ec3b09 100644 --- a/cpp/src/arrow/adapters/orc/adapter.h +++ b/cpp/src/arrow/adapters/orc/adapter.h @@ -22,15 +22,19 @@ #include #include "arrow/adapters/orc/options.h" +#include "arrow/adapters/orc/statistics.h" #include "arrow/io/interfaces.h" #include "arrow/memory_pool.h" #include "arrow/record_batch.h" #include "arrow/status.h" -#include "arrow/type.h" #include "arrow/type_fwd.h" #include "arrow/util/macros.h" #include "arrow/util/visibility.h" +namespace orc { +class Type; +} // namespace orc + namespace arrow { namespace adapters { namespace orc { @@ -129,6 +133,29 @@ class ARROW_EXPORT ORCFileReader { Result> ReadStripe( int64_t stripe, const std::vector& include_names); + /// \brief Read multiple selected stripes as a Table + /// + /// Reads only the specified stripes and concatenates them into a single table. + /// This is useful for stripe-selective reading based on predicate pushdown. + /// If stripe_indices is empty, returns an empty table with the file's schema. + /// + /// \param[in] stripe_indices the indices of stripes to read + /// \return the returned Table containing data from the selected stripes + Result> ReadStripes( + const std::vector& stripe_indices); + + /// \brief Read multiple selected stripes with column selection as a Table + /// + /// Reads only the specified stripes and selected columns, concatenating them + /// into a single table. + /// If stripe_indices is empty, returns an empty table with the selected schema. + /// + /// \param[in] stripe_indices the indices of stripes to read + /// \param[in] include_indices the selected field indices to read + /// \return the returned Table containing data from the selected stripes and columns + Result> ReadStripes(const std::vector& stripe_indices, + const std::vector& include_indices); + /// \brief Seek to designated row. Invoke NextStripeReader() after seek /// will return stripe reader starting from designated row. /// @@ -267,6 +294,40 @@ class ARROW_EXPORT ORCFileReader { /// \return A KeyValueMetadata object containing the ORC metadata Result> ReadMetadata(); + /// \brief Get file-level statistics for a column. + /// + /// \param[in] column_index the column index (0-based) + /// \return the column statistics + Result GetColumnStatistics(int column_index); + + /// \brief Get stripe-level statistics for a column. + /// + /// \param[in] stripe_index the stripe index (0-based) + /// \param[in] column_index the column index (0-based) + /// \return the column statistics for the specified stripe + Result GetStripeColumnStatistics(int64_t stripe_index, int column_index); + + /// \brief Get stripe-level statistics for multiple columns at once. + /// + /// More efficient than calling GetStripeColumnStatistics() in a loop + /// because it parses the stripe's statistics object only once. + /// + /// \param[in] stripe_index the stripe index (0-based) + /// \param[in] column_indices the column indices to retrieve statistics for + /// \return vector of column statistics, one per requested column index + Result> GetStripeStatistics(int64_t stripe_index, + const std::vector& column_indices); + + /// \brief Get the ORC type tree for column ID mapping. + /// + /// This is needed for building schema manifests that map Arrow schema fields + /// to ORC physical column indices. The ORC type tree uses depth-first pre-order + /// numbering where column 0 is the root struct, column 1 is the first top-level + /// field, etc. + /// + /// \return reference to the root ORC Type (STRUCT), owned by the reader. + const ::orc::Type& GetORCType(); + private: class Impl; std::unique_ptr impl_; diff --git a/cpp/src/arrow/adapters/orc/adapter_test.cc b/cpp/src/arrow/adapters/orc/adapter_test.cc index 714e61b22b11..ab7d265714bc 100644 --- a/cpp/src/arrow/adapters/orc/adapter_test.cc +++ b/cpp/src/arrow/adapters/orc/adapter_test.cc @@ -18,6 +18,7 @@ #include #include +#include #include #include @@ -36,6 +37,7 @@ #include "arrow/testing/matchers.h" #include "arrow/testing/random.h" #include "arrow/type.h" +#include "arrow/util/decimal.h" #include "arrow/util/io_util.h" #include "arrow/util/key_value_metadata.h" @@ -1179,4 +1181,925 @@ TEST_F(TestORCWriterMultipleWrite, MultipleWritesIntFieldRecordBatch) { AssertBatchWriteReadEqual(input_batches, expected_output_table, kDefaultSmallMemStreamSize * 100); } + +TEST(TestAdapterRead, GetColumnStatisticsInteger) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 1000; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + auto bigint_batch = + internal::checked_cast(struct_batch->fields[1]); + + for (uint64_t i = 0; i < row_count; ++i) { + int_batch->data[i] = static_cast(i); + bigint_batch->data[i] = static_cast(i + 1000); + } + struct_batch->numElements = row_count; + int_batch->numElements = row_count; + bigint_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + ASSERT_OK_AND_ASSIGN(auto col1_stats, reader->GetColumnStatistics(1)); + ASSERT_OK_AND_ASSIGN(auto col1_scalars, adapters::orc::OrcStatisticsAsScalars(col1_stats)); + EXPECT_EQ(col1_scalars.num_values, row_count); + EXPECT_TRUE(col1_scalars.has_min_max); + ASSERT_NE(col1_scalars.min, nullptr); + ASSERT_NE(col1_scalars.max, nullptr); + EXPECT_EQ(checked_pointer_cast(col1_scalars.min)->value, 0); + EXPECT_EQ(checked_pointer_cast(col1_scalars.max)->value, 999); + + ASSERT_OK_AND_ASSIGN(auto col2_stats, reader->GetColumnStatistics(2)); + ASSERT_OK_AND_ASSIGN(auto col2_scalars, adapters::orc::OrcStatisticsAsScalars(col2_stats)); + EXPECT_EQ(col2_scalars.num_values, row_count); + EXPECT_TRUE(col2_scalars.has_min_max); + ASSERT_NE(col2_scalars.min, nullptr); + ASSERT_NE(col2_scalars.max, nullptr); + EXPECT_EQ(checked_pointer_cast(col2_scalars.min)->value, 1000); + EXPECT_EQ(checked_pointer_cast(col2_scalars.max)->value, 1999); +} + +TEST(TestAdapterRead, GetStripeColumnStatistics) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 500; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + + for (uint64_t i = 0; i < row_count; ++i) { + int_batch->data[i] = static_cast(i + 100); + } + struct_batch->numElements = row_count; + int_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + ASSERT_OK_AND_ASSIGN(auto stripe0_stats, reader->GetStripeColumnStatistics(0, 1)); + ASSERT_OK_AND_ASSIGN(auto stripe0_scalars, + adapters::orc::OrcStatisticsAsScalars(stripe0_stats)); + EXPECT_TRUE(stripe0_scalars.has_min_max); + EXPECT_EQ(checked_pointer_cast(stripe0_scalars.min)->value, 100); + EXPECT_EQ(checked_pointer_cast(stripe0_scalars.max)->value, 599); +} + +TEST(TestAdapterRead, GetColumnStatisticsString) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 5; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto str_batch = + internal::checked_cast(struct_batch->fields[0]); + + std::vector strings = {"apple", "banana", "cherry", "date", "elderberry"}; + std::string data_buffer; + for (const auto& s : strings) { + data_buffer += s; + } + + size_t offset = 0; + for (size_t i = 0; i < strings.size(); ++i) { + str_batch->data[i] = const_cast(&data_buffer[offset]); + str_batch->length[i] = static_cast(strings[i].size()); + offset += strings[i].size(); + } + struct_batch->numElements = row_count; + str_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + ASSERT_OK_AND_ASSIGN(auto col_stats, reader->GetColumnStatistics(1)); + ASSERT_OK_AND_ASSIGN(auto col_scalars, adapters::orc::OrcStatisticsAsScalars(col_stats)); + EXPECT_EQ(col_scalars.num_values, row_count); + EXPECT_TRUE(col_scalars.has_min_max); + ASSERT_NE(col_scalars.min, nullptr); + ASSERT_NE(col_scalars.max, nullptr); + EXPECT_EQ(checked_pointer_cast(col_scalars.min)->ToString(), "apple"); + EXPECT_EQ(checked_pointer_cast(col_scalars.max)->ToString(), "elderberry"); +} + +TEST(TestAdapterRead, GetColumnStatisticsOutOfRange) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 10; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + + for (uint64_t i = 0; i < row_count; ++i) { + int_batch->data[i] = static_cast(i); + } + struct_batch->numElements = row_count; + int_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + EXPECT_THAT(reader->GetColumnStatistics(999), + Raises(StatusCode::Invalid, testing::HasSubstr("out of range"))); + + EXPECT_THAT(reader->GetStripeColumnStatistics(999, 1), + Raises(StatusCode::Invalid, testing::HasSubstr("out of range"))); +} + +TEST(TestAdapterRead, GetColumnStatisticsWithNulls) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 10; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + + for (uint64_t i = 0; i < row_count; ++i) { + int_batch->data[i] = static_cast(i); + int_batch->notNull[i] = (i % 2 == 0) ? 1 : 0; + } + int_batch->hasNulls = true; + struct_batch->numElements = row_count; + int_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + ASSERT_OK_AND_ASSIGN(auto col_stats, reader->GetColumnStatistics(1)); + ASSERT_OK_AND_ASSIGN(auto col_scalars, adapters::orc::OrcStatisticsAsScalars(col_stats)); + EXPECT_TRUE(col_stats.has_null()); + EXPECT_TRUE(col_scalars.has_min_max); +} + +TEST(TestAdapterRead, GetColumnStatisticsBoolean) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 100; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto bool_batch = + internal::checked_cast(struct_batch->fields[0]); + + // Write 60 true values and 40 false values + for (uint64_t i = 0; i < row_count; ++i) { + bool_batch->data[i] = (i < 60) ? 1 : 0; + } + struct_batch->numElements = row_count; + bool_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + ASSERT_OK_AND_ASSIGN(auto col_stats, reader->GetColumnStatistics(1)); + ASSERT_OK_AND_ASSIGN(auto col_scalars, adapters::orc::OrcStatisticsAsScalars(col_stats)); + EXPECT_EQ(col_scalars.num_values, row_count); + EXPECT_FALSE(col_scalars.has_min_max); // Boolean types don't have min/max + EXPECT_EQ(col_scalars.min, nullptr); + EXPECT_EQ(col_scalars.max, nullptr); +} + +TEST(TestAdapterRead, GetColumnStatisticsDate) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 10; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto date_batch = + internal::checked_cast(struct_batch->fields[0]); + + // Write dates: 0 (1970-01-01) through 9 (1970-01-10) + for (uint64_t i = 0; i < row_count; ++i) { + date_batch->data[i] = static_cast(i); + } + struct_batch->numElements = row_count; + date_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + ASSERT_OK_AND_ASSIGN(auto col_stats, reader->GetColumnStatistics(1)); + ASSERT_OK_AND_ASSIGN(auto col_scalars, adapters::orc::OrcStatisticsAsScalars(col_stats)); + EXPECT_EQ(col_scalars.num_values, row_count); + EXPECT_TRUE(col_scalars.has_min_max); + ASSERT_NE(col_scalars.min, nullptr); + ASSERT_NE(col_scalars.max, nullptr); + EXPECT_EQ(checked_pointer_cast(col_scalars.min)->value, 0); + EXPECT_EQ(checked_pointer_cast(col_scalars.max)->value, 9); +} + +TEST(TestAdapterRead, GetColumnStatisticsTimestamp) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 10; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto ts_batch = + internal::checked_cast(struct_batch->fields[0]); + + // Write timestamps with both seconds and nanoseconds + for (uint64_t i = 0; i < row_count; ++i) { + ts_batch->data[i] = static_cast(i * 1000); // seconds + ts_batch->nanoseconds[i] = static_cast(i * 100); // nanoseconds + } + struct_batch->numElements = row_count; + ts_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + ASSERT_OK_AND_ASSIGN(auto col_stats, reader->GetColumnStatistics(1)); + ASSERT_OK_AND_ASSIGN(auto col_scalars, adapters::orc::OrcStatisticsAsScalars(col_stats)); + EXPECT_EQ(col_scalars.num_values, row_count); + EXPECT_TRUE(col_scalars.has_min_max); + ASSERT_NE(col_scalars.min, nullptr); + ASSERT_NE(col_scalars.max, nullptr); + + // Verify the timestamps are TimestampScalar with correct nanosecond values. + // Written: data[i] = i*1000 seconds, nanoseconds[i] = i*100 sub-second nanos. + // Min (i=0): 0s + 0ns = 0 ns total. + // Max (i=9): 9000s + 900ns = 9000 * 1,000,000,000 + 900 = 9,000,000,000,900 ns. + // Statistics store millis + sub-ms-nanos (last 6 digits of ns). + // Conversion: millis * 1,000,000 + sub_ms_nanos = total nanoseconds. + auto min_ts = checked_pointer_cast(col_scalars.min); + auto max_ts = checked_pointer_cast(col_scalars.max); + EXPECT_TRUE(min_ts->is_valid); + EXPECT_TRUE(max_ts->is_valid); + EXPECT_EQ(min_ts->value, 0); + // 9000 seconds + 900 nanoseconds + constexpr int64_t expected_max_ns = 9000LL * 1000000000LL + 900LL; + EXPECT_EQ(max_ts->value, expected_max_ns); +} + +TEST(TestAdapterRead, ReadStripesMultiple) { + // Create a test file and read all stripes using ReadStripes + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 100; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + auto bigint_batch = + internal::checked_cast(struct_batch->fields[1]); + + for (uint64_t i = 0; i < row_count; ++i) { + int_batch->data[i] = static_cast(i); + bigint_batch->data[i] = static_cast(i + 1000); + } + struct_batch->numElements = row_count; + int_batch->numElements = row_count; + bigint_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + int64_t num_stripes = reader->NumberOfStripes(); + ASSERT_GT(num_stripes, 0); + + // Build vector of all stripe indices + std::vector stripe_indices; + for (int64_t i = 0; i < num_stripes; ++i) { + stripe_indices.push_back(i); + } + + // Read all stripes using ReadStripes + ASSERT_OK_AND_ASSIGN(auto table, reader->ReadStripes(stripe_indices)); + + // Should have all rows and correct schema + EXPECT_EQ(table->num_rows(), row_count); + EXPECT_EQ(table->num_columns(), 2); + EXPECT_EQ(table->schema()->field(0)->name(), "col1"); + EXPECT_EQ(table->schema()->field(1)->name(), "col2"); +} + +TEST(TestAdapterRead, ReadStripesWithColumnSelection) { + // Create a test file with multiple columns + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type(liborc::Type::buildTypeFromString( + "struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 100; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + auto bigint_batch = + internal::checked_cast(struct_batch->fields[1]); + auto double_batch = + internal::checked_cast(struct_batch->fields[2]); + + for (uint64_t i = 0; i < row_count; ++i) { + int_batch->data[i] = static_cast(i); + bigint_batch->data[i] = static_cast(i + 100); + double_batch->data[i] = static_cast(i) + 0.5; + } + struct_batch->numElements = row_count; + int_batch->numElements = row_count; + bigint_batch->numElements = row_count; + double_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + int64_t num_stripes = reader->NumberOfStripes(); + ASSERT_GT(num_stripes, 0); + + // Build vector of all stripe indices + std::vector stripe_indices; + for (int64_t i = 0; i < num_stripes; ++i) { + stripe_indices.push_back(i); + } + + // Read all stripes but only columns 0 and 2 (col1 and col3) + std::vector include_indices = {0, 2}; + ASSERT_OK_AND_ASSIGN(auto table, reader->ReadStripes(stripe_indices, include_indices)); + + // Should have all rows and 2 columns + EXPECT_EQ(table->num_rows(), row_count); + EXPECT_EQ(table->num_columns(), 2); + + // Verify we got the right columns (col2 was skipped) + EXPECT_EQ(table->schema()->field(0)->name(), "col1"); + EXPECT_EQ(table->schema()->field(1)->name(), "col3"); +} + +TEST(TestAdapterRead, ReadStripesOutOfRange) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 512; + constexpr uint64_t row_count = 100; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + + for (uint64_t i = 0; i < row_count; ++i) { + int_batch->data[i] = static_cast(i); + } + struct_batch->numElements = row_count; + int_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + int64_t num_stripes = reader->NumberOfStripes(); + ASSERT_GT(num_stripes, 0); + + // Try to read a stripe index that's out of range + std::vector stripe_indices = {0, num_stripes}; // num_stripes is out of range + EXPECT_THAT(reader->ReadStripes(stripe_indices), + Raises(StatusCode::Invalid, testing::HasSubstr("Out of bounds stripe"))); +} + +TEST(TestAdapterRead, ReadStripesEmptyVector) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 512; + constexpr uint64_t row_count = 100; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + + for (uint64_t i = 0; i < row_count; ++i) { + int_batch->data[i] = static_cast(i); + } + struct_batch->numElements = row_count; + int_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + // Empty stripe indices should return an empty table with the file's schema + std::vector stripe_indices; + ASSERT_OK_AND_ASSIGN(auto table, reader->ReadStripes(stripe_indices)); + EXPECT_EQ(table->num_rows(), 0); + EXPECT_EQ(table->num_columns(), 1); + EXPECT_EQ(table->schema()->field(0)->name(), "col1"); +} + +TEST(TestAdapterRead, GetORCType) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type(liborc::Type::buildTypeFromString( + "struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 10; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + + struct_batch->numElements = row_count; + for (size_t i = 0; i < struct_batch->fields.size(); ++i) { + struct_batch->fields[i]->numElements = row_count; + } + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + // Get the ORC type — returns a reference, no cast needed + const liborc::Type& orc_type = reader->GetORCType(); + + // Root should be STRUCT + EXPECT_EQ(orc_type.getKind(), liborc::STRUCT); + + // Should have 3 subtypes (col1, col2, col3) + EXPECT_EQ(orc_type.getSubtypeCount(), 3); + + // Verify field names + EXPECT_EQ(orc_type.getFieldName(0), "col1"); + EXPECT_EQ(orc_type.getFieldName(1), "col2"); + EXPECT_EQ(orc_type.getFieldName(2), "col3"); + + // Verify field types + EXPECT_EQ(orc_type.getSubtype(0)->getKind(), liborc::INT); + EXPECT_EQ(orc_type.getSubtype(1)->getKind(), liborc::LONG); + EXPECT_EQ(orc_type.getSubtype(2)->getKind(), liborc::STRING); +} + +TEST(TestAdapterRead, GetColumnStatisticsDoubleNaN) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 10; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto double_batch = + internal::checked_cast(struct_batch->fields[0]); + + // Write some normal values and some NaN values + for (uint64_t i = 0; i < row_count; ++i) { + if (i % 3 == 0) { + double_batch->data[i] = std::numeric_limits::quiet_NaN(); + } else { + double_batch->data[i] = static_cast(i); + } + } + struct_batch->numElements = row_count; + double_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + ASSERT_OK_AND_ASSIGN(auto col_stats, reader->GetColumnStatistics(1)); + ASSERT_OK_AND_ASSIGN(auto col_scalars, adapters::orc::OrcStatisticsAsScalars(col_stats)); + // When NaN values are present, ORC may report NaN as min or max. + // Our guard should detect this and set has_min_max = false. + if (col_scalars.has_min_max) { + // If ORC writer filtered NaN from statistics, min/max should be valid (non-NaN) + auto min_val = checked_pointer_cast(col_scalars.min); + auto max_val = checked_pointer_cast(col_scalars.max); + EXPECT_FALSE(std::isnan(min_val->value)); + EXPECT_FALSE(std::isnan(max_val->value)); + } + // Either has_min_max is false (NaN guard triggered), or min/max are non-NaN. + // Both outcomes are correct. +} + +TEST(TestAdapterRead, GetColumnStatisticsNegativeIndex) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 10; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + + for (uint64_t i = 0; i < row_count; ++i) { + int_batch->data[i] = static_cast(i); + } + struct_batch->numElements = row_count; + int_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + // Negative column index should return Invalid + EXPECT_THAT(reader->GetColumnStatistics(-1), + Raises(StatusCode::Invalid, testing::HasSubstr("out of range"))); + + // Negative column index in stripe stats should also return Invalid + EXPECT_THAT(reader->GetStripeColumnStatistics(0, -1), + Raises(StatusCode::Invalid, testing::HasSubstr("out of range"))); +} + +TEST(TestAdapterRead, GetStripeStatisticsBulk) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type(liborc::Type::buildTypeFromString( + "struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 100; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + auto bigint_batch = + internal::checked_cast(struct_batch->fields[1]); + auto double_batch = + internal::checked_cast(struct_batch->fields[2]); + + for (uint64_t i = 0; i < row_count; ++i) { + int_batch->data[i] = static_cast(i); + bigint_batch->data[i] = static_cast(i + 1000); + double_batch->data[i] = static_cast(i) + 0.5; + } + struct_batch->numElements = row_count; + int_batch->numElements = row_count; + bigint_batch->numElements = row_count; + double_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + // ORC column indices: 0=root struct, 1=col1, 2=col2, 3=col3 + std::vector column_indices = {1, 2, 3}; + + // Get bulk stats for stripe 0 + ASSERT_OK_AND_ASSIGN(auto bulk_stats, reader->GetStripeStatistics(0, column_indices)); + ASSERT_EQ(bulk_stats.size(), 3); + + // Verify bulk results match individual GetStripeColumnStatistics calls + for (size_t i = 0; i < column_indices.size(); ++i) { + ASSERT_OK_AND_ASSIGN(auto individual_stats, + reader->GetStripeColumnStatistics(0, column_indices[i])); + ASSERT_OK_AND_ASSIGN(auto bulk_scalars, + adapters::orc::OrcStatisticsAsScalars(bulk_stats[i])); + ASSERT_OK_AND_ASSIGN(auto individual_scalars, + adapters::orc::OrcStatisticsAsScalars(individual_stats)); + EXPECT_EQ(bulk_stats[i].has_null(), individual_stats.has_null()); + EXPECT_EQ(bulk_scalars.num_values, individual_scalars.num_values); + EXPECT_EQ(bulk_scalars.has_min_max, individual_scalars.has_min_max); + if (bulk_scalars.has_min_max && individual_scalars.has_min_max) { + EXPECT_TRUE(bulk_scalars.min->Equals(*individual_scalars.min)); + EXPECT_TRUE(bulk_scalars.max->Equals(*individual_scalars.max)); + } + } + + // Verify out-of-range column index in bulk API + std::vector bad_indices = {1, 999}; + EXPECT_THAT(reader->GetStripeStatistics(0, bad_indices), + Raises(StatusCode::Invalid, testing::HasSubstr("out of range"))); + + // Verify out-of-range stripe index in bulk API + EXPECT_THAT(reader->GetStripeStatistics(999, column_indices), + Raises(StatusCode::Invalid, testing::HasSubstr("out of range"))); +} + +TEST(TestAdapterRead, BuildSchemaManifestNested) { + // ORC type with nested struct and list. + // ORC column IDs (depth-first pre-order): + // 0: root struct + // 1: col1 (int) + // 2: col2 (struct) + // 3: col2.a (string) + // 4: col2.b (bigint) + // 5: col3 (array/list) + // 6: col3._elem (int) + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type(liborc::Type::buildTypeFromString( + "struct,col3:array>")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 1; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + + // Set up col1 (int) + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + int_batch->data[0] = 42; + int_batch->numElements = row_count; + + // Set up col2 (struct) + auto inner_struct = + internal::checked_cast(struct_batch->fields[1]); + auto str_batch = + internal::checked_cast(inner_struct->fields[0]); + auto bigint_batch = + internal::checked_cast(inner_struct->fields[1]); + std::string str_data = "hello"; + str_batch->data[0] = const_cast(str_data.c_str()); + str_batch->length[0] = static_cast(str_data.size()); + str_batch->numElements = row_count; + bigint_batch->data[0] = 100; + bigint_batch->numElements = row_count; + inner_struct->numElements = row_count; + + // Set up col3 (array) - write one list with one element + auto list_batch = + internal::checked_cast(struct_batch->fields[2]); + auto list_elem_batch = + internal::checked_cast(list_batch->elements.get()); + list_batch->offsets[0] = 0; + list_batch->offsets[1] = 1; + list_elem_batch->data[0] = 7; + list_elem_batch->numElements = 1; + list_batch->numElements = row_count; + + struct_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + ASSERT_OK_AND_ASSIGN(auto arrow_schema, reader->ReadSchema()); + + ASSERT_OK_AND_ASSIGN(auto manifest, reader->BuildSchemaManifest(arrow_schema)); + ASSERT_EQ(manifest->schema_fields.size(), 3); + + // col1: leaf, orc_column_id=1 + EXPECT_EQ(manifest->schema_fields[0].field->name(), "col1"); + EXPECT_EQ(manifest->schema_fields[0].orc_column_id, 1); + EXPECT_TRUE(manifest->schema_fields[0].is_leaf()); + + // col2: struct with 2 children, orc_column_id=2 + EXPECT_EQ(manifest->schema_fields[1].field->name(), "col2"); + EXPECT_EQ(manifest->schema_fields[1].orc_column_id, 2); + EXPECT_FALSE(manifest->schema_fields[1].is_leaf()); + ASSERT_EQ(manifest->schema_fields[1].children.size(), 2); + EXPECT_EQ(manifest->schema_fields[1].children[0].field->name(), "a"); + EXPECT_EQ(manifest->schema_fields[1].children[0].orc_column_id, 3); + EXPECT_TRUE(manifest->schema_fields[1].children[0].is_leaf()); + EXPECT_EQ(manifest->schema_fields[1].children[1].field->name(), "b"); + EXPECT_EQ(manifest->schema_fields[1].children[1].orc_column_id, 4); + EXPECT_TRUE(manifest->schema_fields[1].children[1].is_leaf()); + + // col3: list with 1 child element, orc_column_id=5 + EXPECT_EQ(manifest->schema_fields[2].field->name(), "col3"); + EXPECT_EQ(manifest->schema_fields[2].orc_column_id, 5); + EXPECT_FALSE(manifest->schema_fields[2].is_leaf()); + ASSERT_EQ(manifest->schema_fields[2].children.size(), 1); + EXPECT_EQ(manifest->schema_fields[2].children[0].orc_column_id, 6); + EXPECT_TRUE(manifest->schema_fields[2].children[0].is_leaf()); + + // Test GetField() with various paths + EXPECT_EQ(manifest->GetField({}), nullptr); // empty path + EXPECT_EQ(manifest->GetField({99}), nullptr); // out of range + + // GetField({0}) -> col1 + const auto* f0 = manifest->GetField({0}); + ASSERT_NE(f0, nullptr); + EXPECT_EQ(f0->field->name(), "col1"); + EXPECT_EQ(f0->orc_column_id, 1); + + // GetField({1}) -> col2 (struct) + const auto* f1 = manifest->GetField({1}); + ASSERT_NE(f1, nullptr); + EXPECT_EQ(f1->field->name(), "col2"); + EXPECT_EQ(f1->orc_column_id, 2); + + // GetField({1, 0}) -> col2.a + const auto* f1_0 = manifest->GetField({1, 0}); + ASSERT_NE(f1_0, nullptr); + EXPECT_EQ(f1_0->field->name(), "a"); + EXPECT_EQ(f1_0->orc_column_id, 3); + + // GetField({1, 1}) -> col2.b + const auto* f1_1 = manifest->GetField({1, 1}); + ASSERT_NE(f1_1, nullptr); + EXPECT_EQ(f1_1->field->name(), "b"); + EXPECT_EQ(f1_1->orc_column_id, 4); + + // GetField({2, 0}) -> col3 element + const auto* f2_0 = manifest->GetField({2, 0}); + ASSERT_NE(f2_0, nullptr); + EXPECT_EQ(f2_0->orc_column_id, 6); + + // Out of range nested paths + EXPECT_EQ(manifest->GetField({1, 99}), nullptr); + EXPECT_EQ(manifest->GetField({0, 0}), nullptr); // col1 is leaf, no children +} + +TEST(TestAdapterRead, GetColumnStatisticsDecimal) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + // decimal(10,2) uses Decimal64VectorBatch internally (precision <= 18) + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 5; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto dec_batch = + internal::checked_cast(struct_batch->fields[0]); + + // Write unscaled values with non-zero trailing digits to avoid ORC + // normalizing away trailing zeros in statistics (e.g. 1.00 → 1 scale 0). + // Values: 101, 203, 305, 407, 509 with scale=2 + // Represent: 1.01, 2.03, 3.05, 4.07, 5.09 + dec_batch->values[0] = 101; + dec_batch->values[1] = 203; + dec_batch->values[2] = 305; + dec_batch->values[3] = 407; + dec_batch->values[4] = 509; + struct_batch->numElements = row_count; + dec_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + ASSERT_OK_AND_ASSIGN(auto col_stats, reader->GetColumnStatistics(1)); + ASSERT_OK_AND_ASSIGN(auto col_scalars, adapters::orc::OrcStatisticsAsScalars(col_stats)); + EXPECT_EQ(col_scalars.num_values, row_count); + EXPECT_FALSE(col_stats.has_null()); + EXPECT_TRUE(col_scalars.has_min_max); + ASSERT_NE(col_scalars.min, nullptr); + ASSERT_NE(col_scalars.max, nullptr); + + // Verify Decimal128Scalar values (unscaled integers with scale 2) + auto min_dec = checked_pointer_cast(col_scalars.min); + auto max_dec = checked_pointer_cast(col_scalars.max); + EXPECT_EQ(min_dec->value, Decimal128(101)); // 1.01 unscaled + EXPECT_EQ(max_dec->value, Decimal128(509)); // 5.09 unscaled + + // Verify the type has precision 38 and scale 2 (as set by ConvertColumnStatistics) + EXPECT_TRUE(min_dec->type->Equals(decimal128(38, 2))); + EXPECT_TRUE(max_dec->type->Equals(decimal128(38, 2))); +} + } // namespace arrow diff --git a/cpp/src/arrow/adapters/orc/statistics.cc b/cpp/src/arrow/adapters/orc/statistics.cc new file mode 100644 index 000000000000..61748ea959af --- /dev/null +++ b/cpp/src/arrow/adapters/orc/statistics.cc @@ -0,0 +1,185 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/adapters/orc/statistics.h" + +#include +#include + +#include "arrow/scalar.h" +#include "arrow/type.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/decimal.h" +#include "orc/Statistics.hh" + +namespace liborc = orc; + +namespace arrow { +namespace adapters { +namespace orc { +namespace { + +constexpr int64_t kMaxMillisForNanos = std::numeric_limits::max() / 1000000LL; +constexpr int64_t kMinMillisForNanos = std::numeric_limits::lowest() / 1000000LL; + +bool MillisFitInNanos(int64_t millis) { + return millis >= kMinMillisForNanos && millis <= kMaxMillisForNanos; +} + +} // namespace + +bool Statistics::has_null() const { return column_statistics_->hasNull(); } + +std::optional Statistics::null_count() const { + // liborc doesn't expose null_count on ColumnStatistics. + return std::nullopt; +} + +int64_t Statistics::num_values() const { + return static_cast(column_statistics_->getNumberOfValues()); +} + +const liborc::IntegerColumnStatistics* Statistics::integer() const { + return dynamic_cast(column_statistics_); +} + +const liborc::DoubleColumnStatistics* Statistics::floating_point() const { + return dynamic_cast(column_statistics_); +} + +const liborc::StringColumnStatistics* Statistics::string() const { + return dynamic_cast(column_statistics_); +} + +const liborc::DateColumnStatistics* Statistics::date() const { + return dynamic_cast(column_statistics_); +} + +const liborc::TimestampColumnStatistics* Statistics::timestamp() const { + return dynamic_cast(column_statistics_); +} + +const liborc::DecimalColumnStatistics* Statistics::decimal() const { + return dynamic_cast(column_statistics_); +} + +bool Statistics::HasMinMax() const { + if (!valid()) { + return false; + } + + if (const auto* int_stats = integer()) { + return int_stats->hasMinimum() && int_stats->hasMaximum(); + } + if (const auto* double_stats = floating_point()) { + if (!double_stats->hasMinimum() || !double_stats->hasMaximum()) { + return false; + } + return !std::isnan(double_stats->getMinimum()) && !std::isnan(double_stats->getMaximum()); + } + if (const auto* string_stats = string()) { + return string_stats->hasMinimum() && string_stats->hasMaximum(); + } + if (const auto* date_stats = date()) { + return date_stats->hasMinimum() && date_stats->hasMaximum(); + } + if (const auto* ts_stats = timestamp()) { + if (!ts_stats->hasMinimum() || !ts_stats->hasMaximum()) { + return false; + } + return MillisFitInNanos(ts_stats->getMinimum()) && + MillisFitInNanos(ts_stats->getMaximum()); + } + if (const auto* decimal_stats = decimal()) { + if (!decimal_stats->hasMinimum() || !decimal_stats->hasMaximum()) { + return false; + } + liborc::Decimal min_dec = decimal_stats->getMinimum(); + liborc::Decimal max_dec = decimal_stats->getMaximum(); + return min_dec.scale == max_dec.scale; + } + return false; +} + +Result OrcStatisticsAsScalars( + const Statistics& statistics) { + if (!statistics.valid()) { + return Status::Invalid("ORC statistics wrapper is not initialized"); + } + + OrcColumnStatisticsAsScalars converted; + converted.has_null = statistics.has_null(); + converted.num_values = statistics.num_values(); + converted.has_min_max = false; + converted.min = nullptr; + converted.max = nullptr; + + if (!statistics.HasMinMax()) { + return converted; + } + + converted.has_min_max = true; + if (const auto* int_stats = statistics.integer()) { + converted.min = std::make_shared(int_stats->getMinimum()); + converted.max = std::make_shared(int_stats->getMaximum()); + return converted; + } + if (const auto* double_stats = statistics.floating_point()) { + converted.min = std::make_shared(double_stats->getMinimum()); + converted.max = std::make_shared(double_stats->getMaximum()); + return converted; + } + if (const auto* string_stats = statistics.string()) { + converted.min = std::make_shared(string_stats->getMinimum()); + converted.max = std::make_shared(string_stats->getMaximum()); + return converted; + } + if (const auto* date_stats = statistics.date()) { + converted.min = std::make_shared(date_stats->getMinimum()); + converted.max = std::make_shared(date_stats->getMaximum()); + return converted; + } + if (const auto* ts_stats = statistics.timestamp()) { + auto ts_type = timestamp(TimeUnit::NANO); + converted.min = std::make_shared( + ts_stats->getMinimum() * 1000000LL + ts_stats->getMinimumNanos(), ts_type); + converted.max = std::make_shared( + ts_stats->getMaximum() * 1000000LL + ts_stats->getMaximumNanos(), ts_type); + return converted; + } + if (const auto* decimal_stats = statistics.decimal()) { + liborc::Decimal min_dec = decimal_stats->getMinimum(); + liborc::Decimal max_dec = decimal_stats->getMaximum(); + + Decimal128 min_d128(min_dec.value.getHighBits(), min_dec.value.getLowBits()); + Decimal128 max_d128(max_dec.value.getHighBits(), max_dec.value.getLowBits()); + auto dec_type = decimal128(38, min_dec.scale); + + converted.min = std::make_shared(min_d128, dec_type); + converted.max = std::make_shared(max_d128, dec_type); + return converted; + } + + converted.has_min_max = false; + converted.min = nullptr; + converted.max = nullptr; + return converted; +} + +} // namespace orc +} // namespace adapters +} // namespace arrow diff --git a/cpp/src/arrow/adapters/orc/statistics.h b/cpp/src/arrow/adapters/orc/statistics.h new file mode 100644 index 000000000000..188eb04cc1e8 --- /dev/null +++ b/cpp/src/arrow/adapters/orc/statistics.h @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include + +#include "arrow/result.h" +#include "arrow/scalar.h" +#include "arrow/type_fwd.h" +#include "arrow/util/visibility.h" + +namespace orc { +class ColumnStatistics; +class DateColumnStatistics; +class DecimalColumnStatistics; +class DoubleColumnStatistics; +class IntegerColumnStatistics; +class Statistics; +class StringColumnStatistics; +class TimestampColumnStatistics; +} // namespace orc + +namespace arrow { +namespace adapters { +namespace orc { + +/// \brief Scalar materialization of ORC column statistics. +struct ARROW_EXPORT OrcColumnStatisticsAsScalars { + /// \brief Whether the column contains null values. + bool has_null; + /// \brief Number of non-null values in the column. + int64_t num_values; + /// \brief Whether min/max statistics are available and valid. + bool has_min_max; + /// \brief Minimum value (nullptr if unavailable). + std::shared_ptr min; + /// \brief Maximum value (nullptr if unavailable). + std::shared_ptr max; +}; + +/// \brief Thin wrapper over liborc column statistics. +/// +/// Keeps liborc type dispatch in one place and provides a stable API surface +/// for downstream consumers. +class ARROW_EXPORT Statistics { + public: + Statistics() = default; + Statistics(std::shared_ptr owner, + const ::orc::ColumnStatistics* column_statistics) + : owner_(std::move(owner)), column_statistics_(column_statistics) {} + explicit Statistics(const ::orc::ColumnStatistics* column_statistics) + : column_statistics_(column_statistics) {} + + bool valid() const { return column_statistics_ != nullptr; } + bool has_null() const; + std::optional null_count() const; + int64_t num_values() const; + bool HasMinMax() const; + + const ::orc::ColumnStatistics* raw() const { return column_statistics_; } + const ::orc::IntegerColumnStatistics* integer() const; + const ::orc::DoubleColumnStatistics* floating_point() const; + const ::orc::StringColumnStatistics* string() const; + const ::orc::DateColumnStatistics* date() const; + const ::orc::TimestampColumnStatistics* timestamp() const; + const ::orc::DecimalColumnStatistics* decimal() const; + + private: + std::shared_ptr owner_; + const ::orc::ColumnStatistics* column_statistics_ = nullptr; +}; + +ARROW_EXPORT Result OrcStatisticsAsScalars( + const Statistics& statistics); + +} // namespace orc +} // namespace adapters +} // namespace arrow