From bc57e6edc512c368990a224297d215fe89c209ad Mon Sep 17 00:00:00 2001 From: Christian Bush Date: Wed, 25 Feb 2026 17:03:06 -0800 Subject: [PATCH 1/3] [C++] Add ORC adapter column statistics and schema manifest support Add ConvertColumnStatistics, BuildSchemaManifest, and related helpers to the ORC adapter along with comprehensive tests. --- cpp/src/arrow/adapters/orc/adapter.cc | 261 ++++++++ cpp/src/arrow/adapters/orc/adapter.h | 74 +++ cpp/src/arrow/adapters/orc/adapter_test.cc | 713 +++++++++++++++++++++ 3 files changed, 1048 insertions(+) diff --git a/cpp/src/arrow/adapters/orc/adapter.cc b/cpp/src/arrow/adapters/orc/adapter.cc index 51cca497485c..7dfddde91660 100644 --- a/cpp/src/arrow/adapters/orc/adapter.cc +++ b/cpp/src/arrow/adapters/orc/adapter.cc @@ -18,6 +18,7 @@ #include "arrow/adapters/orc/adapter.h" #include +#include #include #include #include @@ -33,6 +34,7 @@ #include "arrow/io/interfaces.h" #include "arrow/memory_pool.h" #include "arrow/record_batch.h" +#include "arrow/scalar.h" #include "arrow/status.h" #include "arrow/table.h" #include "arrow/table_builder.h" @@ -43,6 +45,8 @@ #include "arrow/util/key_value_metadata.h" #include "arrow/util/macros.h" #include "orc/Exceptions.hh" +#include "orc/Statistics.hh" +#include "orc/Type.hh" // alias to not interfere with nested orc namespace namespace liborc = orc; @@ -98,6 +102,131 @@ namespace { // The following is required by ORC to be uint64_t constexpr uint64_t kOrcNaturalWriteSize = 128 * 1024; +// Convert ORC column statistics to Arrow OrcColumnStatistics +// Returns a Result with the converted statistics, or an error if conversion fails +Result ConvertColumnStatistics( + const liborc::ColumnStatistics* orc_stats) { + OrcColumnStatistics stats; + + stats.has_null = orc_stats->hasNull(); + stats.num_values = static_cast(orc_stats->getNumberOfValues()); + stats.has_min_max = false; + stats.min = nullptr; + stats.max = nullptr; + + // Try to extract min/max based on the ORC column statistics type + const auto* int_stats = + dynamic_cast( + orc_stats); + const auto* double_stats = + dynamic_cast( + orc_stats); + const auto* string_stats = + dynamic_cast( + orc_stats); + const auto* date_stats = + dynamic_cast( + orc_stats); + const auto* ts_stats = + dynamic_cast( + orc_stats); + const auto* decimal_stats = + dynamic_cast( + orc_stats); + + if (int_stats != nullptr) { + if (int_stats->hasMinimum() && int_stats->hasMaximum()) { + stats.has_min_max = true; + stats.min = std::make_shared( + int_stats->getMinimum()); + stats.max = std::make_shared( + int_stats->getMaximum()); + } + } else if (double_stats != nullptr) { + if (double_stats->hasMinimum() && + double_stats->hasMaximum()) { + double min_val = double_stats->getMinimum(); + double max_val = double_stats->getMaximum(); + if (!std::isnan(min_val) && !std::isnan(max_val)) { + stats.has_min_max = true; + stats.min = std::make_shared(min_val); + stats.max = std::make_shared(max_val); + } + } + } else if (string_stats != nullptr) { + if (string_stats->hasMinimum() && + string_stats->hasMaximum()) { + stats.has_min_max = true; + stats.min = std::make_shared( + string_stats->getMinimum()); + stats.max = std::make_shared( + string_stats->getMaximum()); + } + } else if (date_stats != nullptr) { + if (date_stats->hasMinimum() && + date_stats->hasMaximum()) { + stats.has_min_max = true; + stats.min = std::make_shared( + date_stats->getMinimum(), date32()); + stats.max = std::make_shared( + date_stats->getMaximum(), date32()); + } + } else if (ts_stats != nullptr) { + if (ts_stats->hasMinimum() && ts_stats->hasMaximum()) { + stats.has_min_max = true; + // getMinimum/getMaximum return milliseconds. + // getMinimumNanos/getMaximumNanos return the + // last 6 digits of nanoseconds. + int64_t min_millis = ts_stats->getMinimum(); + int64_t max_millis = ts_stats->getMaximum(); + int32_t min_nanos = ts_stats->getMinimumNanos(); + int32_t max_nanos = ts_stats->getMaximumNanos(); + + // millis * 1,000,000 + sub-millisecond nanos + int64_t min_ns = min_millis * 1000000LL + min_nanos; + int64_t max_ns = max_millis * 1000000LL + max_nanos; + + auto ts_type = timestamp(TimeUnit::NANO); + stats.min = + std::make_shared(min_ns, ts_type); + stats.max = + std::make_shared(max_ns, ts_type); + } + } else if (decimal_stats != nullptr) { + if (decimal_stats->hasMinimum() && + decimal_stats->hasMaximum()) { + liborc::Decimal min_dec = decimal_stats->getMinimum(); + liborc::Decimal max_dec = decimal_stats->getMaximum(); + + if (min_dec.scale != max_dec.scale) { + // Corrupted stats: scales should always match within + // a column. has_min_max remains false (conservative). + } else { + stats.has_min_max = true; + + Decimal128 min_d128(min_dec.value.getHighBits(), + min_dec.value.getLowBits()); + Decimal128 max_d128(max_dec.value.getHighBits(), + max_dec.value.getLowBits()); + + // Precision 38 is max for Decimal128; the dataset + // layer will Cast() to the actual column type. + auto dec_type = decimal128(38, min_dec.scale); + + stats.min = + std::make_shared(min_d128, + dec_type); + stats.max = + std::make_shared(max_d128, + dec_type); + } + } + } + // Other types (Boolean, Binary, Collection, etc.) don't have min/max statistics + + return stats; +} + using internal::checked_cast; class ArrowInputFile : public liborc::InputStream { @@ -409,6 +538,42 @@ class ORCFileReader::Impl { return ReadBatch(opts, schema, stripes_[static_cast(stripe)].num_rows); } + Result> ReadStripes( + const std::vector& stripe_indices) { + if (stripe_indices.empty()) { + return Status::Invalid("stripe_indices cannot be empty"); + } + + std::vector> tables; + tables.reserve(stripe_indices.size()); + + for (int64_t stripe_index : stripe_indices) { + ARROW_ASSIGN_OR_RAISE(auto batch, ReadStripe(stripe_index)); + ARROW_ASSIGN_OR_RAISE(auto table, Table::FromRecordBatches({batch})); + tables.push_back(table); + } + + return ConcatenateTables(tables); + } + + Result> ReadStripes(const std::vector& stripe_indices, + const std::vector& include_indices) { + if (stripe_indices.empty()) { + return Status::Invalid("stripe_indices cannot be empty"); + } + + std::vector> tables; + tables.reserve(stripe_indices.size()); + + for (int64_t stripe_index : stripe_indices) { + ARROW_ASSIGN_OR_RAISE(auto batch, ReadStripe(stripe_index, include_indices)); + ARROW_ASSIGN_OR_RAISE(auto table, Table::FromRecordBatches({batch})); + tables.push_back(table); + } + + return ConcatenateTables(tables); + } + Status SelectStripe(liborc::RowReaderOptions* opts, int64_t stripe) { ARROW_RETURN_IF(stripe < 0 || stripe >= NumberOfStripes(), Status::Invalid("Out of bounds stripe: ", stripe)); @@ -548,6 +713,75 @@ class ORCFileReader::Impl { return NextStripeReader(batch_size, empty_vec); } + Result GetColumnStatistics(int column_index) { + ORC_BEGIN_CATCH_NOT_OK + std::unique_ptr file_stats = reader_->getStatistics(); + if (column_index < 0 || + static_cast(column_index) >= file_stats->getNumberOfColumns()) { + return Status::Invalid("Column index ", column_index, " out of range [0, ", + file_stats->getNumberOfColumns(), ")"); + } + // NOTE: col_stats is a non-owning pointer into file_stats. + // ConvertColumnStatistics copies all values into Arrow scalars synchronously, + // so the pointer remains valid for the duration of this call. + const liborc::ColumnStatistics* col_stats = + file_stats->getColumnStatistics(static_cast(column_index)); + return ConvertColumnStatistics(col_stats); + ORC_END_CATCH_NOT_OK + } + + Result GetStripeColumnStatistics(int64_t stripe_index, + int column_index) { + ORC_BEGIN_CATCH_NOT_OK + if (stripe_index < 0 || stripe_index >= static_cast(stripes_.size())) { + return Status::Invalid("Stripe index ", stripe_index, " out of range"); + } + std::unique_ptr stripe_stats = + reader_->getStripeStatistics(static_cast(stripe_index)); + if (column_index < 0 || + static_cast(column_index) >= stripe_stats->getNumberOfColumns()) { + return Status::Invalid("Column index ", column_index, " out of range [0, ", + stripe_stats->getNumberOfColumns(), ")"); + } + // NOTE: col_stats is a non-owning pointer into stripe_stats. + // ConvertColumnStatistics copies all values into Arrow scalars synchronously, + // so the pointer remains valid for the duration of this call. + const liborc::ColumnStatistics* col_stats = + stripe_stats->getColumnStatistics(static_cast(column_index)); + return ConvertColumnStatistics(col_stats); + ORC_END_CATCH_NOT_OK + } + + Result> GetStripeStatistics( + int64_t stripe_index, const std::vector& column_indices) { + ORC_BEGIN_CATCH_NOT_OK + if (stripe_index < 0 || stripe_index >= static_cast(stripes_.size())) { + return Status::Invalid("Stripe index ", stripe_index, " out of range"); + } + std::unique_ptr stripe_stats = + reader_->getStripeStatistics(static_cast(stripe_index)); + std::vector results; + results.reserve(column_indices.size()); + for (int col_idx : column_indices) { + if (col_idx < 0 || + static_cast(col_idx) >= stripe_stats->getNumberOfColumns()) { + return Status::Invalid("Column index ", col_idx, " out of range [0, ", + stripe_stats->getNumberOfColumns(), ")"); + } + // NOTE: col_stats is a non-owning pointer into stripe_stats. + // ConvertColumnStatistics copies all values into Arrow scalars synchronously, + // so the pointer remains valid for the duration of this call. + const liborc::ColumnStatistics* col_stats = + stripe_stats->getColumnStatistics(static_cast(col_idx)); + ARROW_ASSIGN_OR_RAISE(auto converted, ConvertColumnStatistics(col_stats)); + results.push_back(std::move(converted)); + } + return results; + ORC_END_CATCH_NOT_OK + } + + const liborc::Type& GetORCType() { return reader_->getType(); } + private: MemoryPool* pool_; std::unique_ptr reader_; @@ -613,6 +847,17 @@ Result> ORCFileReader::ReadStripe( return impl_->ReadStripe(stripe, include_names); } +Result> ORCFileReader::ReadStripes( + const std::vector& stripe_indices) { + return impl_->ReadStripes(stripe_indices); +} + +Result> ORCFileReader::ReadStripes( + const std::vector& stripe_indices, + const std::vector& include_indices) { + return impl_->ReadStripes(stripe_indices, include_indices); +} + Status ORCFileReader::Seek(int64_t row_number) { return impl_->Seek(row_number); } Result> ORCFileReader::NextStripeReader( @@ -678,6 +923,22 @@ std::string ORCFileReader::GetSerializedFileTail() { return impl_->GetSerializedFileTail(); } +Result ORCFileReader::GetColumnStatistics(int column_index) { + return impl_->GetColumnStatistics(column_index); +} + +Result ORCFileReader::GetStripeColumnStatistics( + int64_t stripe_index, int column_index) { + return impl_->GetStripeColumnStatistics(stripe_index, column_index); +} + +Result> ORCFileReader::GetStripeStatistics( + int64_t stripe_index, const std::vector& column_indices) { + return impl_->GetStripeStatistics(stripe_index, column_indices); +} + +const ::orc::Type& ORCFileReader::GetORCType() { return impl_->GetORCType(); } + namespace { class ArrowOutputStream : public liborc::OutputStream { diff --git a/cpp/src/arrow/adapters/orc/adapter.h b/cpp/src/arrow/adapters/orc/adapter.h index 4ffff81f355f..b962c7486bfb 100644 --- a/cpp/src/arrow/adapters/orc/adapter.h +++ b/cpp/src/arrow/adapters/orc/adapter.h @@ -31,10 +31,28 @@ #include "arrow/util/macros.h" #include "arrow/util/visibility.h" +namespace orc { +class Type; +} // namespace orc + namespace arrow { namespace adapters { namespace orc { +/// \brief Column statistics from an ORC file +struct OrcColumnStatistics { + /// \brief Whether the column contains null values + bool has_null; + /// \brief Total number of values in the column + int64_t num_values; + /// \brief Whether min/max statistics are available + bool has_min_max; + /// \brief Minimum value (nullptr if not available) + std::shared_ptr min; + /// \brief Maximum value (nullptr if not available) + std::shared_ptr max; +}; + /// \brief Information about an ORC stripe struct StripeInformation { /// \brief Offset of the stripe from the start of the file, in bytes @@ -129,6 +147,27 @@ class ARROW_EXPORT ORCFileReader { Result> ReadStripe( int64_t stripe, const std::vector& include_names); + /// \brief Read multiple selected stripes as a Table + /// + /// Reads only the specified stripes and concatenates them into a single table. + /// This is useful for stripe-selective reading based on predicate pushdown. + /// + /// \param[in] stripe_indices the indices of stripes to read + /// \return the returned Table containing data from the selected stripes + Result> ReadStripes( + const std::vector& stripe_indices); + + /// \brief Read multiple selected stripes with column selection as a Table + /// + /// Reads only the specified stripes and selected columns, concatenating them + /// into a single table. + /// + /// \param[in] stripe_indices the indices of stripes to read + /// \param[in] include_indices the selected field indices to read + /// \return the returned Table containing data from the selected stripes and columns + Result> ReadStripes(const std::vector& stripe_indices, + const std::vector& include_indices); + /// \brief Seek to designated row. Invoke NextStripeReader() after seek /// will return stripe reader starting from designated row. /// @@ -267,6 +306,41 @@ class ARROW_EXPORT ORCFileReader { /// \return A KeyValueMetadata object containing the ORC metadata Result> ReadMetadata(); + /// \brief Get file-level statistics for a column. + /// + /// \param[in] column_index the column index (0-based) + /// \return the column statistics + Result GetColumnStatistics(int column_index); + + /// \brief Get stripe-level statistics for a column. + /// + /// \param[in] stripe_index the stripe index (0-based) + /// \param[in] column_index the column index (0-based) + /// \return the column statistics for the specified stripe + Result GetStripeColumnStatistics(int64_t stripe_index, + int column_index); + + /// \brief Get stripe-level statistics for multiple columns at once. + /// + /// More efficient than calling GetStripeColumnStatistics() in a loop + /// because it parses the stripe's statistics object only once. + /// + /// \param[in] stripe_index the stripe index (0-based) + /// \param[in] column_indices the column indices to retrieve statistics for + /// \return vector of column statistics, one per requested column index + Result> GetStripeStatistics( + int64_t stripe_index, const std::vector& column_indices); + + /// \brief Get the ORC type tree for column ID mapping. + /// + /// This is needed for building schema manifests that map Arrow schema fields + /// to ORC physical column indices. The ORC type tree uses depth-first pre-order + /// numbering where column 0 is the root struct, column 1 is the first top-level + /// field, etc. + /// + /// \return reference to the root ORC Type (STRUCT), owned by the reader. + const ::orc::Type& GetORCType(); + private: class Impl; std::unique_ptr impl_; diff --git a/cpp/src/arrow/adapters/orc/adapter_test.cc b/cpp/src/arrow/adapters/orc/adapter_test.cc index 714e61b22b11..5377f693a02e 100644 --- a/cpp/src/arrow/adapters/orc/adapter_test.cc +++ b/cpp/src/arrow/adapters/orc/adapter_test.cc @@ -18,6 +18,7 @@ #include #include +#include #include #include @@ -1179,4 +1180,716 @@ TEST_F(TestORCWriterMultipleWrite, MultipleWritesIntFieldRecordBatch) { AssertBatchWriteReadEqual(input_batches, expected_output_table, kDefaultSmallMemStreamSize * 100); } + +TEST(TestAdapterRead, GetColumnStatisticsInteger) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 1000; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + auto bigint_batch = + internal::checked_cast(struct_batch->fields[1]); + + for (uint64_t i = 0; i < row_count; ++i) { + int_batch->data[i] = static_cast(i); + bigint_batch->data[i] = static_cast(i + 1000); + } + struct_batch->numElements = row_count; + int_batch->numElements = row_count; + bigint_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + ASSERT_OK_AND_ASSIGN(auto col1_stats, reader->GetColumnStatistics(1)); + EXPECT_EQ(col1_stats.num_values, row_count); + EXPECT_TRUE(col1_stats.has_min_max); + ASSERT_NE(col1_stats.min, nullptr); + ASSERT_NE(col1_stats.max, nullptr); + EXPECT_EQ(checked_pointer_cast(col1_stats.min)->value, 0); + EXPECT_EQ(checked_pointer_cast(col1_stats.max)->value, 999); + + ASSERT_OK_AND_ASSIGN(auto col2_stats, reader->GetColumnStatistics(2)); + EXPECT_EQ(col2_stats.num_values, row_count); + EXPECT_TRUE(col2_stats.has_min_max); + ASSERT_NE(col2_stats.min, nullptr); + ASSERT_NE(col2_stats.max, nullptr); + EXPECT_EQ(checked_pointer_cast(col2_stats.min)->value, 1000); + EXPECT_EQ(checked_pointer_cast(col2_stats.max)->value, 1999); +} + +TEST(TestAdapterRead, GetStripeColumnStatistics) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 500; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + + for (uint64_t i = 0; i < row_count; ++i) { + int_batch->data[i] = static_cast(i + 100); + } + struct_batch->numElements = row_count; + int_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + ASSERT_OK_AND_ASSIGN(auto stripe0_stats, reader->GetStripeColumnStatistics(0, 1)); + EXPECT_TRUE(stripe0_stats.has_min_max); + EXPECT_EQ(checked_pointer_cast(stripe0_stats.min)->value, 100); + EXPECT_EQ(checked_pointer_cast(stripe0_stats.max)->value, 599); +} + +TEST(TestAdapterRead, GetColumnStatisticsString) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 5; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto str_batch = + internal::checked_cast(struct_batch->fields[0]); + + std::vector strings = {"apple", "banana", "cherry", "date", "elderberry"}; + std::string data_buffer; + for (const auto& s : strings) { + data_buffer += s; + } + + size_t offset = 0; + for (size_t i = 0; i < strings.size(); ++i) { + str_batch->data[i] = const_cast(&data_buffer[offset]); + str_batch->length[i] = static_cast(strings[i].size()); + offset += strings[i].size(); + } + struct_batch->numElements = row_count; + str_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + ASSERT_OK_AND_ASSIGN(auto col_stats, reader->GetColumnStatistics(1)); + EXPECT_EQ(col_stats.num_values, row_count); + EXPECT_TRUE(col_stats.has_min_max); + ASSERT_NE(col_stats.min, nullptr); + ASSERT_NE(col_stats.max, nullptr); + EXPECT_EQ(checked_pointer_cast(col_stats.min)->ToString(), "apple"); + EXPECT_EQ(checked_pointer_cast(col_stats.max)->ToString(), "elderberry"); +} + +TEST(TestAdapterRead, GetColumnStatisticsOutOfRange) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 10; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + + for (uint64_t i = 0; i < row_count; ++i) { + int_batch->data[i] = static_cast(i); + } + struct_batch->numElements = row_count; + int_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + EXPECT_THAT(reader->GetColumnStatistics(999), + Raises(StatusCode::Invalid, testing::HasSubstr("out of range"))); + + EXPECT_THAT(reader->GetStripeColumnStatistics(999, 1), + Raises(StatusCode::Invalid, testing::HasSubstr("out of range"))); +} + +TEST(TestAdapterRead, GetColumnStatisticsWithNulls) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 10; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + + for (uint64_t i = 0; i < row_count; ++i) { + int_batch->data[i] = static_cast(i); + int_batch->notNull[i] = (i % 2 == 0) ? 1 : 0; + } + int_batch->hasNulls = true; + struct_batch->numElements = row_count; + int_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + ASSERT_OK_AND_ASSIGN(auto col_stats, reader->GetColumnStatistics(1)); + EXPECT_TRUE(col_stats.has_null); + EXPECT_TRUE(col_stats.has_min_max); +} + +TEST(TestAdapterRead, GetColumnStatisticsBoolean) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 100; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto bool_batch = + internal::checked_cast(struct_batch->fields[0]); + + // Write 60 true values and 40 false values + for (uint64_t i = 0; i < row_count; ++i) { + bool_batch->data[i] = (i < 60) ? 1 : 0; + } + struct_batch->numElements = row_count; + bool_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + ASSERT_OK_AND_ASSIGN(auto col_stats, reader->GetColumnStatistics(1)); + EXPECT_EQ(col_stats.num_values, row_count); + EXPECT_FALSE(col_stats.has_min_max); // Boolean types don't have min/max + EXPECT_EQ(col_stats.min, nullptr); + EXPECT_EQ(col_stats.max, nullptr); +} + +TEST(TestAdapterRead, GetColumnStatisticsDate) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 10; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto date_batch = + internal::checked_cast(struct_batch->fields[0]); + + // Write dates: 0 (1970-01-01) through 9 (1970-01-10) + for (uint64_t i = 0; i < row_count; ++i) { + date_batch->data[i] = static_cast(i); + } + struct_batch->numElements = row_count; + date_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + ASSERT_OK_AND_ASSIGN(auto col_stats, reader->GetColumnStatistics(1)); + EXPECT_EQ(col_stats.num_values, row_count); + EXPECT_TRUE(col_stats.has_min_max); + ASSERT_NE(col_stats.min, nullptr); + ASSERT_NE(col_stats.max, nullptr); + EXPECT_EQ(checked_pointer_cast(col_stats.min)->value, 0); + EXPECT_EQ(checked_pointer_cast(col_stats.max)->value, 9); +} + +TEST(TestAdapterRead, GetColumnStatisticsTimestamp) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 10; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto ts_batch = + internal::checked_cast(struct_batch->fields[0]); + + // Write timestamps with both seconds and nanoseconds + for (uint64_t i = 0; i < row_count; ++i) { + ts_batch->data[i] = static_cast(i * 1000); // seconds + ts_batch->nanoseconds[i] = static_cast(i * 100); // nanoseconds + } + struct_batch->numElements = row_count; + ts_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + ASSERT_OK_AND_ASSIGN(auto col_stats, reader->GetColumnStatistics(1)); + EXPECT_EQ(col_stats.num_values, row_count); + EXPECT_TRUE(col_stats.has_min_max); + ASSERT_NE(col_stats.min, nullptr); + ASSERT_NE(col_stats.max, nullptr); + + // Verify the timestamps are TimestampScalar + auto min_ts = checked_pointer_cast(col_stats.min); + auto max_ts = checked_pointer_cast(col_stats.max); + EXPECT_TRUE(min_ts->is_valid); + EXPECT_TRUE(max_ts->is_valid); + // The actual values will be in nanoseconds + EXPECT_GT(max_ts->value, min_ts->value); +} + +TEST(TestAdapterRead, ReadStripesMultiple) { + // Create a test file and read all stripes using ReadStripes + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 100; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + auto bigint_batch = + internal::checked_cast(struct_batch->fields[1]); + + for (uint64_t i = 0; i < row_count; ++i) { + int_batch->data[i] = static_cast(i); + bigint_batch->data[i] = static_cast(i + 1000); + } + struct_batch->numElements = row_count; + int_batch->numElements = row_count; + bigint_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + int64_t num_stripes = reader->NumberOfStripes(); + ASSERT_GT(num_stripes, 0); + + // Build vector of all stripe indices + std::vector stripe_indices; + for (int64_t i = 0; i < num_stripes; ++i) { + stripe_indices.push_back(i); + } + + // Read all stripes using ReadStripes + ASSERT_OK_AND_ASSIGN(auto table, reader->ReadStripes(stripe_indices)); + + // Should have all rows and correct schema + EXPECT_EQ(table->num_rows(), row_count); + EXPECT_EQ(table->num_columns(), 2); + EXPECT_EQ(table->schema()->field(0)->name(), "col1"); + EXPECT_EQ(table->schema()->field(1)->name(), "col2"); +} + +TEST(TestAdapterRead, ReadStripesWithColumnSelection) { + // Create a test file with multiple columns + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type(liborc::Type::buildTypeFromString( + "struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 100; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + auto bigint_batch = + internal::checked_cast(struct_batch->fields[1]); + auto double_batch = + internal::checked_cast(struct_batch->fields[2]); + + for (uint64_t i = 0; i < row_count; ++i) { + int_batch->data[i] = static_cast(i); + bigint_batch->data[i] = static_cast(i + 100); + double_batch->data[i] = static_cast(i) + 0.5; + } + struct_batch->numElements = row_count; + int_batch->numElements = row_count; + bigint_batch->numElements = row_count; + double_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + int64_t num_stripes = reader->NumberOfStripes(); + ASSERT_GT(num_stripes, 0); + + // Build vector of all stripe indices + std::vector stripe_indices; + for (int64_t i = 0; i < num_stripes; ++i) { + stripe_indices.push_back(i); + } + + // Read all stripes but only columns 0 and 2 (col1 and col3) + std::vector include_indices = {0, 2}; + ASSERT_OK_AND_ASSIGN(auto table, reader->ReadStripes(stripe_indices, include_indices)); + + // Should have all rows and 2 columns + EXPECT_EQ(table->num_rows(), row_count); + EXPECT_EQ(table->num_columns(), 2); + + // Verify we got the right columns (col2 was skipped) + EXPECT_EQ(table->schema()->field(0)->name(), "col1"); + EXPECT_EQ(table->schema()->field(1)->name(), "col3"); +} + +TEST(TestAdapterRead, ReadStripesOutOfRange) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 512; + constexpr uint64_t row_count = 100; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + + for (uint64_t i = 0; i < row_count; ++i) { + int_batch->data[i] = static_cast(i); + } + struct_batch->numElements = row_count; + int_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + int64_t num_stripes = reader->NumberOfStripes(); + ASSERT_GT(num_stripes, 0); + + // Try to read a stripe index that's out of range + std::vector stripe_indices = {0, num_stripes}; // num_stripes is out of range + EXPECT_THAT(reader->ReadStripes(stripe_indices), + Raises(StatusCode::Invalid, testing::HasSubstr("Out of bounds stripe"))); +} + +TEST(TestAdapterRead, ReadStripesEmptyVector) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 512; + constexpr uint64_t row_count = 100; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + + for (uint64_t i = 0; i < row_count; ++i) { + int_batch->data[i] = static_cast(i); + } + struct_batch->numElements = row_count; + int_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + // Try to read with an empty stripe indices vector + std::vector stripe_indices; + EXPECT_THAT(reader->ReadStripes(stripe_indices), + Raises(StatusCode::Invalid, testing::HasSubstr("cannot be empty"))); +} + +TEST(TestAdapterRead, GetORCType) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type(liborc::Type::buildTypeFromString( + "struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 10; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + + struct_batch->numElements = row_count; + for (size_t i = 0; i < struct_batch->fields.size(); ++i) { + struct_batch->fields[i]->numElements = row_count; + } + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + // Get the ORC type — returns a reference, no cast needed + const liborc::Type& orc_type = reader->GetORCType(); + + // Root should be STRUCT + EXPECT_EQ(orc_type.getKind(), liborc::STRUCT); + + // Should have 3 subtypes (col1, col2, col3) + EXPECT_EQ(orc_type.getSubtypeCount(), 3); + + // Verify field names + EXPECT_EQ(orc_type.getFieldName(0), "col1"); + EXPECT_EQ(orc_type.getFieldName(1), "col2"); + EXPECT_EQ(orc_type.getFieldName(2), "col3"); + + // Verify field types + EXPECT_EQ(orc_type.getSubtype(0)->getKind(), liborc::INT); + EXPECT_EQ(orc_type.getSubtype(1)->getKind(), liborc::LONG); + EXPECT_EQ(orc_type.getSubtype(2)->getKind(), liborc::STRING); +} + +TEST(TestAdapterRead, GetColumnStatisticsDoubleNaN) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 10; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto double_batch = + internal::checked_cast(struct_batch->fields[0]); + + // Write some normal values and some NaN values + for (uint64_t i = 0; i < row_count; ++i) { + if (i % 3 == 0) { + double_batch->data[i] = std::numeric_limits::quiet_NaN(); + } else { + double_batch->data[i] = static_cast(i); + } + } + struct_batch->numElements = row_count; + double_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + ASSERT_OK_AND_ASSIGN(auto col_stats, reader->GetColumnStatistics(1)); + // When NaN values are present, ORC may report NaN as min or max. + // Our guard should detect this and set has_min_max = false. + if (col_stats.has_min_max) { + // If ORC writer filtered NaN from statistics, min/max should be valid (non-NaN) + auto min_val = checked_pointer_cast(col_stats.min); + auto max_val = checked_pointer_cast(col_stats.max); + EXPECT_FALSE(std::isnan(min_val->value)); + EXPECT_FALSE(std::isnan(max_val->value)); + } + // Either has_min_max is false (NaN guard triggered), or min/max are non-NaN. + // Both outcomes are correct. +} + +TEST(TestAdapterRead, GetColumnStatisticsNegativeIndex) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 10; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + + for (uint64_t i = 0; i < row_count; ++i) { + int_batch->data[i] = static_cast(i); + } + struct_batch->numElements = row_count; + int_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + // Negative column index should return Invalid + EXPECT_THAT(reader->GetColumnStatistics(-1), + Raises(StatusCode::Invalid, testing::HasSubstr("out of range"))); + + // Negative column index in stripe stats should also return Invalid + EXPECT_THAT(reader->GetStripeColumnStatistics(0, -1), + Raises(StatusCode::Invalid, testing::HasSubstr("out of range"))); +} + +TEST(TestAdapterRead, GetStripeStatisticsBulk) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type(liborc::Type::buildTypeFromString( + "struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 100; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + auto bigint_batch = + internal::checked_cast(struct_batch->fields[1]); + auto double_batch = + internal::checked_cast(struct_batch->fields[2]); + + for (uint64_t i = 0; i < row_count; ++i) { + int_batch->data[i] = static_cast(i); + bigint_batch->data[i] = static_cast(i + 1000); + double_batch->data[i] = static_cast(i) + 0.5; + } + struct_batch->numElements = row_count; + int_batch->numElements = row_count; + bigint_batch->numElements = row_count; + double_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + // ORC column indices: 0=root struct, 1=col1, 2=col2, 3=col3 + std::vector column_indices = {1, 2, 3}; + + // Get bulk stats for stripe 0 + ASSERT_OK_AND_ASSIGN(auto bulk_stats, reader->GetStripeStatistics(0, column_indices)); + ASSERT_EQ(bulk_stats.size(), 3); + + // Verify bulk results match individual GetStripeColumnStatistics calls + for (size_t i = 0; i < column_indices.size(); ++i) { + ASSERT_OK_AND_ASSIGN(auto individual_stats, + reader->GetStripeColumnStatistics(0, column_indices[i])); + EXPECT_EQ(bulk_stats[i].has_null, individual_stats.has_null); + EXPECT_EQ(bulk_stats[i].num_values, individual_stats.num_values); + EXPECT_EQ(bulk_stats[i].has_min_max, individual_stats.has_min_max); + if (bulk_stats[i].has_min_max && individual_stats.has_min_max) { + EXPECT_TRUE(bulk_stats[i].min->Equals(*individual_stats.min)); + EXPECT_TRUE(bulk_stats[i].max->Equals(*individual_stats.max)); + } + } + + // Verify out-of-range column index in bulk API + std::vector bad_indices = {1, 999}; + EXPECT_THAT(reader->GetStripeStatistics(0, bad_indices), + Raises(StatusCode::Invalid, testing::HasSubstr("out of range"))); + + // Verify out-of-range stripe index in bulk API + EXPECT_THAT(reader->GetStripeStatistics(999, column_indices), + Raises(StatusCode::Invalid, testing::HasSubstr("out of range"))); +} + } // namespace arrow From 8870048c09f2a280e7653c4d35f2a55276e7c90d Mon Sep 17 00:00:00 2001 From: Christian Bush Date: Wed, 25 Feb 2026 17:27:15 -0800 Subject: [PATCH 2/3] [C++] Fix ORC adapter timestamp overflow, ReadStripes empty handling, and add tests - Prevent int64 overflow when converting timestamp millis to nanoseconds by checking bounds before multiplication - Return empty table instead of error when ReadStripes gets empty indices - Use RecordBatch vector + FromRecordBatches instead of Table + ConcatenateTables - Inline dynamic_cast in ConvertColumnStatistics for cleaner code - Fix Date32Scalar constructor to use default type - Add decimal statistics test and nested schema manifest test - Add precise timestamp statistics value assertions Co-Authored-By: Claude Opus 4.6 --- cpp/src/arrow/adapters/orc/adapter.cc | 137 ++++++-------- cpp/src/arrow/adapters/orc/adapter.h | 2 + cpp/src/arrow/adapters/orc/adapter_test.cc | 207 ++++++++++++++++++++- 3 files changed, 264 insertions(+), 82 deletions(-) diff --git a/cpp/src/arrow/adapters/orc/adapter.cc b/cpp/src/arrow/adapters/orc/adapter.cc index 7dfddde91660..f1b5a5c7bc8c 100644 --- a/cpp/src/arrow/adapters/orc/adapter.cc +++ b/cpp/src/arrow/adapters/orc/adapter.cc @@ -102,6 +102,17 @@ namespace { // The following is required by ORC to be uint64_t constexpr uint64_t kOrcNaturalWriteSize = 128 * 1024; +// Max millisecond value that can be safely converted to nanoseconds without +// overflowing int64_t. Beyond ~292,000 years from epoch, millis * 1,000,000 +// exceeds int64_t range. +constexpr int64_t kMaxMillisForNanos = std::numeric_limits::max() / 1000000LL; +constexpr int64_t kMinMillisForNanos = std::numeric_limits::lowest() / 1000000LL; + +// Whether a millisecond value can be converted to nanoseconds without overflowing int64_t. +bool MillisFitInNanos(int64_t millis) { + return millis >= kMinMillisForNanos && millis <= kMaxMillisForNanos; +} + // Convert ORC column statistics to Arrow OrcColumnStatistics // Returns a Result with the converted statistics, or an error if conversion fails Result ConvertColumnStatistics( @@ -114,37 +125,17 @@ Result ConvertColumnStatistics( stats.min = nullptr; stats.max = nullptr; - // Try to extract min/max based on the ORC column statistics type - const auto* int_stats = - dynamic_cast( - orc_stats); - const auto* double_stats = - dynamic_cast( - orc_stats); - const auto* string_stats = - dynamic_cast( - orc_stats); - const auto* date_stats = - dynamic_cast( - orc_stats); - const auto* ts_stats = - dynamic_cast( - orc_stats); - const auto* decimal_stats = - dynamic_cast( - orc_stats); - - if (int_stats != nullptr) { + // Try to extract min/max based on the ORC column statistics type. + if (const auto* int_stats = + dynamic_cast(orc_stats)) { if (int_stats->hasMinimum() && int_stats->hasMaximum()) { stats.has_min_max = true; - stats.min = std::make_shared( - int_stats->getMinimum()); - stats.max = std::make_shared( - int_stats->getMaximum()); + stats.min = std::make_shared(int_stats->getMinimum()); + stats.max = std::make_shared(int_stats->getMaximum()); } - } else if (double_stats != nullptr) { - if (double_stats->hasMinimum() && - double_stats->hasMaximum()) { + } else if (const auto* double_stats = + dynamic_cast(orc_stats)) { + if (double_stats->hasMinimum() && double_stats->hasMaximum()) { double min_val = double_stats->getMinimum(); double max_val = double_stats->getMaximum(); if (!std::isnan(min_val) && !std::isnan(max_val)) { @@ -153,46 +144,38 @@ Result ConvertColumnStatistics( stats.max = std::make_shared(max_val); } } - } else if (string_stats != nullptr) { - if (string_stats->hasMinimum() && - string_stats->hasMaximum()) { + } else if (const auto* string_stats = + dynamic_cast(orc_stats)) { + if (string_stats->hasMinimum() && string_stats->hasMaximum()) { stats.has_min_max = true; - stats.min = std::make_shared( - string_stats->getMinimum()); - stats.max = std::make_shared( - string_stats->getMaximum()); + stats.min = std::make_shared(string_stats->getMinimum()); + stats.max = std::make_shared(string_stats->getMaximum()); } - } else if (date_stats != nullptr) { - if (date_stats->hasMinimum() && - date_stats->hasMaximum()) { + } else if (const auto* date_stats = + dynamic_cast(orc_stats)) { + if (date_stats->hasMinimum() && date_stats->hasMaximum()) { stats.has_min_max = true; - stats.min = std::make_shared( - date_stats->getMinimum(), date32()); - stats.max = std::make_shared( - date_stats->getMaximum(), date32()); + stats.min = std::make_shared(date_stats->getMinimum()); + stats.max = std::make_shared(date_stats->getMaximum()); } - } else if (ts_stats != nullptr) { + } else if (const auto* ts_stats = + dynamic_cast(orc_stats)) { if (ts_stats->hasMinimum() && ts_stats->hasMaximum()) { - stats.has_min_max = true; - // getMinimum/getMaximum return milliseconds. - // getMinimumNanos/getMaximumNanos return the - // last 6 digits of nanoseconds. + // ORC returns millis + sub-millisecond nanos (0-999999). + // Convert to total nanoseconds, skipping if millis would overflow. int64_t min_millis = ts_stats->getMinimum(); int64_t max_millis = ts_stats->getMaximum(); - int32_t min_nanos = ts_stats->getMinimumNanos(); - int32_t max_nanos = ts_stats->getMaximumNanos(); - - // millis * 1,000,000 + sub-millisecond nanos - int64_t min_ns = min_millis * 1000000LL + min_nanos; - int64_t max_ns = max_millis * 1000000LL + max_nanos; - - auto ts_type = timestamp(TimeUnit::NANO); - stats.min = - std::make_shared(min_ns, ts_type); - stats.max = - std::make_shared(max_ns, ts_type); + if (MillisFitInNanos(min_millis) && MillisFitInNanos(max_millis)) { + stats.has_min_max = true; + auto ts_type = timestamp(TimeUnit::NANO); + stats.min = std::make_shared( + min_millis * 1000000LL + ts_stats->getMinimumNanos(), ts_type); + stats.max = std::make_shared( + max_millis * 1000000LL + ts_stats->getMaximumNanos(), ts_type); + } } - } else if (decimal_stats != nullptr) { + } else if (const auto* decimal_stats = + dynamic_cast(orc_stats)) { if (decimal_stats->hasMinimum() && decimal_stats->hasMaximum()) { liborc::Decimal min_dec = decimal_stats->getMinimum(); @@ -541,37 +524,39 @@ class ORCFileReader::Impl { Result> ReadStripes( const std::vector& stripe_indices) { if (stripe_indices.empty()) { - return Status::Invalid("stripe_indices cannot be empty"); + ARROW_ASSIGN_OR_RAISE(auto schema, ReadSchema()); + return Table::MakeEmpty(schema); } - std::vector> tables; - tables.reserve(stripe_indices.size()); - + std::vector> batches; + batches.reserve(stripe_indices.size()); for (int64_t stripe_index : stripe_indices) { ARROW_ASSIGN_OR_RAISE(auto batch, ReadStripe(stripe_index)); - ARROW_ASSIGN_OR_RAISE(auto table, Table::FromRecordBatches({batch})); - tables.push_back(table); + batches.push_back(std::move(batch)); } - - return ConcatenateTables(tables); + ARROW_ASSIGN_OR_RAISE(auto schema, ReadSchema()); + return Table::FromRecordBatches(schema, std::move(batches)); } Result> ReadStripes(const std::vector& stripe_indices, const std::vector& include_indices) { if (stripe_indices.empty()) { - return Status::Invalid("stripe_indices cannot be empty"); + liborc::RowReaderOptions opts = DefaultRowReaderOptions(); + RETURN_NOT_OK(SelectIndices(&opts, include_indices)); + ARROW_ASSIGN_OR_RAISE(auto schema, ReadSchema(opts)); + return Table::MakeEmpty(schema); } - std::vector> tables; - tables.reserve(stripe_indices.size()); - + std::vector> batches; + batches.reserve(stripe_indices.size()); for (int64_t stripe_index : stripe_indices) { ARROW_ASSIGN_OR_RAISE(auto batch, ReadStripe(stripe_index, include_indices)); - ARROW_ASSIGN_OR_RAISE(auto table, Table::FromRecordBatches({batch})); - tables.push_back(table); + batches.push_back(std::move(batch)); } - - return ConcatenateTables(tables); + liborc::RowReaderOptions opts = DefaultRowReaderOptions(); + RETURN_NOT_OK(SelectIndices(&opts, include_indices)); + ARROW_ASSIGN_OR_RAISE(auto schema, ReadSchema(opts)); + return Table::FromRecordBatches(schema, std::move(batches)); } Status SelectStripe(liborc::RowReaderOptions* opts, int64_t stripe) { diff --git a/cpp/src/arrow/adapters/orc/adapter.h b/cpp/src/arrow/adapters/orc/adapter.h index b962c7486bfb..7201d14f53d6 100644 --- a/cpp/src/arrow/adapters/orc/adapter.h +++ b/cpp/src/arrow/adapters/orc/adapter.h @@ -151,6 +151,7 @@ class ARROW_EXPORT ORCFileReader { /// /// Reads only the specified stripes and concatenates them into a single table. /// This is useful for stripe-selective reading based on predicate pushdown. + /// If stripe_indices is empty, returns an empty table with the file's schema. /// /// \param[in] stripe_indices the indices of stripes to read /// \return the returned Table containing data from the selected stripes @@ -161,6 +162,7 @@ class ARROW_EXPORT ORCFileReader { /// /// Reads only the specified stripes and selected columns, concatenating them /// into a single table. + /// If stripe_indices is empty, returns an empty table with the selected schema. /// /// \param[in] stripe_indices the indices of stripes to read /// \param[in] include_indices the selected field indices to read diff --git a/cpp/src/arrow/adapters/orc/adapter_test.cc b/cpp/src/arrow/adapters/orc/adapter_test.cc index 5377f693a02e..d810e4de106c 100644 --- a/cpp/src/arrow/adapters/orc/adapter_test.cc +++ b/cpp/src/arrow/adapters/orc/adapter_test.cc @@ -37,6 +37,7 @@ #include "arrow/testing/matchers.h" #include "arrow/testing/random.h" #include "arrow/type.h" +#include "arrow/util/decimal.h" #include "arrow/util/io_util.h" #include "arrow/util/key_value_metadata.h" @@ -1498,13 +1499,20 @@ TEST(TestAdapterRead, GetColumnStatisticsTimestamp) { ASSERT_NE(col_stats.min, nullptr); ASSERT_NE(col_stats.max, nullptr); - // Verify the timestamps are TimestampScalar + // Verify the timestamps are TimestampScalar with correct nanosecond values. + // Written: data[i] = i*1000 seconds, nanoseconds[i] = i*100 sub-second nanos. + // Min (i=0): 0s + 0ns = 0 ns total. + // Max (i=9): 9000s + 900ns = 9000 * 1,000,000,000 + 900 = 9,000,000,000,900 ns. + // Statistics store millis + sub-ms-nanos (last 6 digits of ns). + // Conversion: millis * 1,000,000 + sub_ms_nanos = total nanoseconds. auto min_ts = checked_pointer_cast(col_stats.min); auto max_ts = checked_pointer_cast(col_stats.max); EXPECT_TRUE(min_ts->is_valid); EXPECT_TRUE(max_ts->is_valid); - // The actual values will be in nanoseconds - EXPECT_GT(max_ts->value, min_ts->value); + EXPECT_EQ(min_ts->value, 0); + // 9000 seconds + 900 nanoseconds + constexpr int64_t expected_max_ns = 9000LL * 1000000000LL + 900LL; + EXPECT_EQ(max_ts->value, expected_max_ns); } TEST(TestAdapterRead, ReadStripesMultiple) { @@ -1687,10 +1695,12 @@ TEST(TestAdapterRead, ReadStripesEmptyVector) { ASSERT_OK_AND_ASSIGN(auto reader, adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); - // Try to read with an empty stripe indices vector + // Empty stripe indices should return an empty table with the file's schema std::vector stripe_indices; - EXPECT_THAT(reader->ReadStripes(stripe_indices), - Raises(StatusCode::Invalid, testing::HasSubstr("cannot be empty"))); + ASSERT_OK_AND_ASSIGN(auto table, reader->ReadStripes(stripe_indices)); + EXPECT_EQ(table->num_rows(), 0); + EXPECT_EQ(table->num_columns(), 1); + EXPECT_EQ(table->schema()->field(0)->name(), "col1"); } TEST(TestAdapterRead, GetORCType) { @@ -1892,4 +1902,189 @@ TEST(TestAdapterRead, GetStripeStatisticsBulk) { Raises(StatusCode::Invalid, testing::HasSubstr("out of range"))); } +TEST(TestAdapterRead, BuildSchemaManifestNested) { + // ORC type with nested struct and list. + // ORC column IDs (depth-first pre-order): + // 0: root struct + // 1: col1 (int) + // 2: col2 (struct) + // 3: col2.a (string) + // 4: col2.b (bigint) + // 5: col3 (array/list) + // 6: col3._elem (int) + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type(liborc::Type::buildTypeFromString( + "struct,col3:array>")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 1; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + + // Set up col1 (int) + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + int_batch->data[0] = 42; + int_batch->numElements = row_count; + + // Set up col2 (struct) + auto inner_struct = + internal::checked_cast(struct_batch->fields[1]); + auto str_batch = + internal::checked_cast(inner_struct->fields[0]); + auto bigint_batch = + internal::checked_cast(inner_struct->fields[1]); + std::string str_data = "hello"; + str_batch->data[0] = const_cast(str_data.c_str()); + str_batch->length[0] = static_cast(str_data.size()); + str_batch->numElements = row_count; + bigint_batch->data[0] = 100; + bigint_batch->numElements = row_count; + inner_struct->numElements = row_count; + + // Set up col3 (array) - write one list with one element + auto list_batch = + internal::checked_cast(struct_batch->fields[2]); + auto list_elem_batch = + internal::checked_cast(list_batch->elements.get()); + list_batch->offsets[0] = 0; + list_batch->offsets[1] = 1; + list_elem_batch->data[0] = 7; + list_elem_batch->numElements = 1; + list_batch->numElements = row_count; + + struct_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + ASSERT_OK_AND_ASSIGN(auto arrow_schema, reader->ReadSchema()); + + ASSERT_OK_AND_ASSIGN(auto manifest, reader->BuildSchemaManifest(arrow_schema)); + ASSERT_EQ(manifest->schema_fields.size(), 3); + + // col1: leaf, orc_column_id=1 + EXPECT_EQ(manifest->schema_fields[0].field->name(), "col1"); + EXPECT_EQ(manifest->schema_fields[0].orc_column_id, 1); + EXPECT_TRUE(manifest->schema_fields[0].is_leaf()); + + // col2: struct with 2 children, orc_column_id=2 + EXPECT_EQ(manifest->schema_fields[1].field->name(), "col2"); + EXPECT_EQ(manifest->schema_fields[1].orc_column_id, 2); + EXPECT_FALSE(manifest->schema_fields[1].is_leaf()); + ASSERT_EQ(manifest->schema_fields[1].children.size(), 2); + EXPECT_EQ(manifest->schema_fields[1].children[0].field->name(), "a"); + EXPECT_EQ(manifest->schema_fields[1].children[0].orc_column_id, 3); + EXPECT_TRUE(manifest->schema_fields[1].children[0].is_leaf()); + EXPECT_EQ(manifest->schema_fields[1].children[1].field->name(), "b"); + EXPECT_EQ(manifest->schema_fields[1].children[1].orc_column_id, 4); + EXPECT_TRUE(manifest->schema_fields[1].children[1].is_leaf()); + + // col3: list with 1 child element, orc_column_id=5 + EXPECT_EQ(manifest->schema_fields[2].field->name(), "col3"); + EXPECT_EQ(manifest->schema_fields[2].orc_column_id, 5); + EXPECT_FALSE(manifest->schema_fields[2].is_leaf()); + ASSERT_EQ(manifest->schema_fields[2].children.size(), 1); + EXPECT_EQ(manifest->schema_fields[2].children[0].orc_column_id, 6); + EXPECT_TRUE(manifest->schema_fields[2].children[0].is_leaf()); + + // Test GetField() with various paths + EXPECT_EQ(manifest->GetField({}), nullptr); // empty path + EXPECT_EQ(manifest->GetField({99}), nullptr); // out of range + + // GetField({0}) -> col1 + const auto* f0 = manifest->GetField({0}); + ASSERT_NE(f0, nullptr); + EXPECT_EQ(f0->field->name(), "col1"); + EXPECT_EQ(f0->orc_column_id, 1); + + // GetField({1}) -> col2 (struct) + const auto* f1 = manifest->GetField({1}); + ASSERT_NE(f1, nullptr); + EXPECT_EQ(f1->field->name(), "col2"); + EXPECT_EQ(f1->orc_column_id, 2); + + // GetField({1, 0}) -> col2.a + const auto* f1_0 = manifest->GetField({1, 0}); + ASSERT_NE(f1_0, nullptr); + EXPECT_EQ(f1_0->field->name(), "a"); + EXPECT_EQ(f1_0->orc_column_id, 3); + + // GetField({1, 1}) -> col2.b + const auto* f1_1 = manifest->GetField({1, 1}); + ASSERT_NE(f1_1, nullptr); + EXPECT_EQ(f1_1->field->name(), "b"); + EXPECT_EQ(f1_1->orc_column_id, 4); + + // GetField({2, 0}) -> col3 element + const auto* f2_0 = manifest->GetField({2, 0}); + ASSERT_NE(f2_0, nullptr); + EXPECT_EQ(f2_0->orc_column_id, 6); + + // Out of range nested paths + EXPECT_EQ(manifest->GetField({1, 99}), nullptr); + EXPECT_EQ(manifest->GetField({0, 0}), nullptr); // col1 is leaf, no children +} + +TEST(TestAdapterRead, GetColumnStatisticsDecimal) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + // decimal(10,2) uses Decimal64VectorBatch internally (precision <= 18) + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 5; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto dec_batch = + internal::checked_cast(struct_batch->fields[0]); + + // Write unscaled values with non-zero trailing digits to avoid ORC + // normalizing away trailing zeros in statistics (e.g. 1.00 → 1 scale 0). + // Values: 101, 203, 305, 407, 509 with scale=2 + // Represent: 1.01, 2.03, 3.05, 4.07, 5.09 + dec_batch->values[0] = 101; + dec_batch->values[1] = 203; + dec_batch->values[2] = 305; + dec_batch->values[3] = 407; + dec_batch->values[4] = 509; + struct_batch->numElements = row_count; + dec_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + ASSERT_OK_AND_ASSIGN(auto col_stats, reader->GetColumnStatistics(1)); + EXPECT_EQ(col_stats.num_values, row_count); + EXPECT_FALSE(col_stats.has_null); + EXPECT_TRUE(col_stats.has_min_max); + ASSERT_NE(col_stats.min, nullptr); + ASSERT_NE(col_stats.max, nullptr); + + // Verify Decimal128Scalar values (unscaled integers with scale 2) + auto min_dec = checked_pointer_cast(col_stats.min); + auto max_dec = checked_pointer_cast(col_stats.max); + EXPECT_EQ(min_dec->value, Decimal128(101)); // 1.01 unscaled + EXPECT_EQ(max_dec->value, Decimal128(509)); // 5.09 unscaled + + // Verify the type has precision 38 and scale 2 (as set by ConvertColumnStatistics) + EXPECT_TRUE(min_dec->type->Equals(decimal128(38, 2))); + EXPECT_TRUE(max_dec->type->Equals(decimal128(38, 2))); +} + } // namespace arrow From d53718c3aeb5c381b45429958e7f7979a6b2038c Mon Sep 17 00:00:00 2001 From: Christian Bush Date: Sun, 22 Mar 2026 20:53:51 -0700 Subject: [PATCH 3/3] [C++][ORC] Introduce thin statistics wrapper for liborc Add an ORC statistics wrapper that owns liborc statistics lifetimes and centralizes type downcasting behind a stable API, then move scalar materialization into OrcStatisticsAsScalars. Update ORC reader statistics APIs and tests to consume the wrapper boundary. Made-with: Cursor --- cpp/src/arrow/CMakeLists.txt | 2 +- cpp/src/arrow/adapters/orc/CMakeLists.txt | 2 +- cpp/src/arrow/adapters/orc/adapter.cc | 155 ++--------------- cpp/src/arrow/adapters/orc/adapter.h | 25 +-- cpp/src/arrow/adapters/orc/adapter_test.cc | 125 ++++++++------ cpp/src/arrow/adapters/orc/statistics.cc | 185 +++++++++++++++++++++ cpp/src/arrow/adapters/orc/statistics.h | 95 +++++++++++ 7 files changed, 374 insertions(+), 215 deletions(-) create mode 100644 cpp/src/arrow/adapters/orc/statistics.cc create mode 100644 cpp/src/arrow/adapters/orc/statistics.h diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index eee63b11ca1c..8a6a6a07b9ac 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -1016,7 +1016,7 @@ endif() if(ARROW_ORC) arrow_add_object_library(ARROW_ORC adapters/orc/adapter.cc adapters/orc/options.cc - adapters/orc/util.cc) + adapters/orc/statistics.cc adapters/orc/util.cc) foreach(ARROW_ORC_TARGET ${ARROW_ORC_TARGETS}) target_link_libraries(${ARROW_ORC_TARGET} PRIVATE orc::orc) if(ARROW_ORC_VERSION VERSION_LESS "2.0.0") diff --git a/cpp/src/arrow/adapters/orc/CMakeLists.txt b/cpp/src/arrow/adapters/orc/CMakeLists.txt index bae63210b29e..9b778f3e41ef 100644 --- a/cpp/src/arrow/adapters/orc/CMakeLists.txt +++ b/cpp/src/arrow/adapters/orc/CMakeLists.txt @@ -20,7 +20,7 @@ # # Headers: top level -install(FILES adapter.h options.h +install(FILES adapter.h options.h statistics.h DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/arrow/adapters/orc") # pkg-config support diff --git a/cpp/src/arrow/adapters/orc/adapter.cc b/cpp/src/arrow/adapters/orc/adapter.cc index f1b5a5c7bc8c..6cf5ef27b1aa 100644 --- a/cpp/src/arrow/adapters/orc/adapter.cc +++ b/cpp/src/arrow/adapters/orc/adapter.cc @@ -18,7 +18,6 @@ #include "arrow/adapters/orc/adapter.h" #include -#include #include #include #include @@ -34,14 +33,12 @@ #include "arrow/io/interfaces.h" #include "arrow/memory_pool.h" #include "arrow/record_batch.h" -#include "arrow/scalar.h" #include "arrow/status.h" #include "arrow/table.h" #include "arrow/table_builder.h" #include "arrow/type.h" #include "arrow/util/bit_util.h" #include "arrow/util/checked_cast.h" -#include "arrow/util/decimal.h" #include "arrow/util/key_value_metadata.h" #include "arrow/util/macros.h" #include "orc/Exceptions.hh" @@ -102,114 +99,6 @@ namespace { // The following is required by ORC to be uint64_t constexpr uint64_t kOrcNaturalWriteSize = 128 * 1024; -// Max millisecond value that can be safely converted to nanoseconds without -// overflowing int64_t. Beyond ~292,000 years from epoch, millis * 1,000,000 -// exceeds int64_t range. -constexpr int64_t kMaxMillisForNanos = std::numeric_limits::max() / 1000000LL; -constexpr int64_t kMinMillisForNanos = std::numeric_limits::lowest() / 1000000LL; - -// Whether a millisecond value can be converted to nanoseconds without overflowing int64_t. -bool MillisFitInNanos(int64_t millis) { - return millis >= kMinMillisForNanos && millis <= kMaxMillisForNanos; -} - -// Convert ORC column statistics to Arrow OrcColumnStatistics -// Returns a Result with the converted statistics, or an error if conversion fails -Result ConvertColumnStatistics( - const liborc::ColumnStatistics* orc_stats) { - OrcColumnStatistics stats; - - stats.has_null = orc_stats->hasNull(); - stats.num_values = static_cast(orc_stats->getNumberOfValues()); - stats.has_min_max = false; - stats.min = nullptr; - stats.max = nullptr; - - // Try to extract min/max based on the ORC column statistics type. - if (const auto* int_stats = - dynamic_cast(orc_stats)) { - if (int_stats->hasMinimum() && int_stats->hasMaximum()) { - stats.has_min_max = true; - stats.min = std::make_shared(int_stats->getMinimum()); - stats.max = std::make_shared(int_stats->getMaximum()); - } - } else if (const auto* double_stats = - dynamic_cast(orc_stats)) { - if (double_stats->hasMinimum() && double_stats->hasMaximum()) { - double min_val = double_stats->getMinimum(); - double max_val = double_stats->getMaximum(); - if (!std::isnan(min_val) && !std::isnan(max_val)) { - stats.has_min_max = true; - stats.min = std::make_shared(min_val); - stats.max = std::make_shared(max_val); - } - } - } else if (const auto* string_stats = - dynamic_cast(orc_stats)) { - if (string_stats->hasMinimum() && string_stats->hasMaximum()) { - stats.has_min_max = true; - stats.min = std::make_shared(string_stats->getMinimum()); - stats.max = std::make_shared(string_stats->getMaximum()); - } - } else if (const auto* date_stats = - dynamic_cast(orc_stats)) { - if (date_stats->hasMinimum() && date_stats->hasMaximum()) { - stats.has_min_max = true; - stats.min = std::make_shared(date_stats->getMinimum()); - stats.max = std::make_shared(date_stats->getMaximum()); - } - } else if (const auto* ts_stats = - dynamic_cast(orc_stats)) { - if (ts_stats->hasMinimum() && ts_stats->hasMaximum()) { - // ORC returns millis + sub-millisecond nanos (0-999999). - // Convert to total nanoseconds, skipping if millis would overflow. - int64_t min_millis = ts_stats->getMinimum(); - int64_t max_millis = ts_stats->getMaximum(); - if (MillisFitInNanos(min_millis) && MillisFitInNanos(max_millis)) { - stats.has_min_max = true; - auto ts_type = timestamp(TimeUnit::NANO); - stats.min = std::make_shared( - min_millis * 1000000LL + ts_stats->getMinimumNanos(), ts_type); - stats.max = std::make_shared( - max_millis * 1000000LL + ts_stats->getMaximumNanos(), ts_type); - } - } - } else if (const auto* decimal_stats = - dynamic_cast(orc_stats)) { - if (decimal_stats->hasMinimum() && - decimal_stats->hasMaximum()) { - liborc::Decimal min_dec = decimal_stats->getMinimum(); - liborc::Decimal max_dec = decimal_stats->getMaximum(); - - if (min_dec.scale != max_dec.scale) { - // Corrupted stats: scales should always match within - // a column. has_min_max remains false (conservative). - } else { - stats.has_min_max = true; - - Decimal128 min_d128(min_dec.value.getHighBits(), - min_dec.value.getLowBits()); - Decimal128 max_d128(max_dec.value.getHighBits(), - max_dec.value.getLowBits()); - - // Precision 38 is max for Decimal128; the dataset - // layer will Cast() to the actual column type. - auto dec_type = decimal128(38, min_dec.scale); - - stats.min = - std::make_shared(min_d128, - dec_type); - stats.max = - std::make_shared(max_d128, - dec_type); - } - } - } - // Other types (Boolean, Binary, Collection, etc.) don't have min/max statistics - - return stats; -} - using internal::checked_cast; class ArrowInputFile : public liborc::InputStream { @@ -698,54 +587,48 @@ class ORCFileReader::Impl { return NextStripeReader(batch_size, empty_vec); } - Result GetColumnStatistics(int column_index) { + Result GetColumnStatistics(int column_index) { ORC_BEGIN_CATCH_NOT_OK - std::unique_ptr file_stats = reader_->getStatistics(); + auto file_stats = + std::shared_ptr(reader_->getStatistics().release()); if (column_index < 0 || static_cast(column_index) >= file_stats->getNumberOfColumns()) { return Status::Invalid("Column index ", column_index, " out of range [0, ", file_stats->getNumberOfColumns(), ")"); } - // NOTE: col_stats is a non-owning pointer into file_stats. - // ConvertColumnStatistics copies all values into Arrow scalars synchronously, - // so the pointer remains valid for the duration of this call. const liborc::ColumnStatistics* col_stats = file_stats->getColumnStatistics(static_cast(column_index)); - return ConvertColumnStatistics(col_stats); + return Statistics(std::move(file_stats), col_stats); ORC_END_CATCH_NOT_OK } - Result GetStripeColumnStatistics(int64_t stripe_index, - int column_index) { + Result GetStripeColumnStatistics(int64_t stripe_index, int column_index) { ORC_BEGIN_CATCH_NOT_OK if (stripe_index < 0 || stripe_index >= static_cast(stripes_.size())) { return Status::Invalid("Stripe index ", stripe_index, " out of range"); } - std::unique_ptr stripe_stats = - reader_->getStripeStatistics(static_cast(stripe_index)); + auto stripe_stats = std::shared_ptr( + reader_->getStripeStatistics(static_cast(stripe_index)).release()); if (column_index < 0 || static_cast(column_index) >= stripe_stats->getNumberOfColumns()) { return Status::Invalid("Column index ", column_index, " out of range [0, ", stripe_stats->getNumberOfColumns(), ")"); } - // NOTE: col_stats is a non-owning pointer into stripe_stats. - // ConvertColumnStatistics copies all values into Arrow scalars synchronously, - // so the pointer remains valid for the duration of this call. const liborc::ColumnStatistics* col_stats = stripe_stats->getColumnStatistics(static_cast(column_index)); - return ConvertColumnStatistics(col_stats); + return Statistics(std::move(stripe_stats), col_stats); ORC_END_CATCH_NOT_OK } - Result> GetStripeStatistics( + Result> GetStripeStatistics( int64_t stripe_index, const std::vector& column_indices) { ORC_BEGIN_CATCH_NOT_OK if (stripe_index < 0 || stripe_index >= static_cast(stripes_.size())) { return Status::Invalid("Stripe index ", stripe_index, " out of range"); } - std::unique_ptr stripe_stats = - reader_->getStripeStatistics(static_cast(stripe_index)); - std::vector results; + auto stripe_stats = std::shared_ptr( + reader_->getStripeStatistics(static_cast(stripe_index)).release()); + std::vector results; results.reserve(column_indices.size()); for (int col_idx : column_indices) { if (col_idx < 0 || @@ -753,13 +636,9 @@ class ORCFileReader::Impl { return Status::Invalid("Column index ", col_idx, " out of range [0, ", stripe_stats->getNumberOfColumns(), ")"); } - // NOTE: col_stats is a non-owning pointer into stripe_stats. - // ConvertColumnStatistics copies all values into Arrow scalars synchronously, - // so the pointer remains valid for the duration of this call. const liborc::ColumnStatistics* col_stats = stripe_stats->getColumnStatistics(static_cast(col_idx)); - ARROW_ASSIGN_OR_RAISE(auto converted, ConvertColumnStatistics(col_stats)); - results.push_back(std::move(converted)); + results.emplace_back(stripe_stats, col_stats); } return results; ORC_END_CATCH_NOT_OK @@ -908,16 +787,16 @@ std::string ORCFileReader::GetSerializedFileTail() { return impl_->GetSerializedFileTail(); } -Result ORCFileReader::GetColumnStatistics(int column_index) { +Result ORCFileReader::GetColumnStatistics(int column_index) { return impl_->GetColumnStatistics(column_index); } -Result ORCFileReader::GetStripeColumnStatistics( - int64_t stripe_index, int column_index) { +Result ORCFileReader::GetStripeColumnStatistics(int64_t stripe_index, + int column_index) { return impl_->GetStripeColumnStatistics(stripe_index, column_index); } -Result> ORCFileReader::GetStripeStatistics( +Result> ORCFileReader::GetStripeStatistics( int64_t stripe_index, const std::vector& column_indices) { return impl_->GetStripeStatistics(stripe_index, column_indices); } diff --git a/cpp/src/arrow/adapters/orc/adapter.h b/cpp/src/arrow/adapters/orc/adapter.h index 7201d14f53d6..b967a6ec3b09 100644 --- a/cpp/src/arrow/adapters/orc/adapter.h +++ b/cpp/src/arrow/adapters/orc/adapter.h @@ -22,11 +22,11 @@ #include #include "arrow/adapters/orc/options.h" +#include "arrow/adapters/orc/statistics.h" #include "arrow/io/interfaces.h" #include "arrow/memory_pool.h" #include "arrow/record_batch.h" #include "arrow/status.h" -#include "arrow/type.h" #include "arrow/type_fwd.h" #include "arrow/util/macros.h" #include "arrow/util/visibility.h" @@ -39,20 +39,6 @@ namespace arrow { namespace adapters { namespace orc { -/// \brief Column statistics from an ORC file -struct OrcColumnStatistics { - /// \brief Whether the column contains null values - bool has_null; - /// \brief Total number of values in the column - int64_t num_values; - /// \brief Whether min/max statistics are available - bool has_min_max; - /// \brief Minimum value (nullptr if not available) - std::shared_ptr min; - /// \brief Maximum value (nullptr if not available) - std::shared_ptr max; -}; - /// \brief Information about an ORC stripe struct StripeInformation { /// \brief Offset of the stripe from the start of the file, in bytes @@ -312,15 +298,14 @@ class ARROW_EXPORT ORCFileReader { /// /// \param[in] column_index the column index (0-based) /// \return the column statistics - Result GetColumnStatistics(int column_index); + Result GetColumnStatistics(int column_index); /// \brief Get stripe-level statistics for a column. /// /// \param[in] stripe_index the stripe index (0-based) /// \param[in] column_index the column index (0-based) /// \return the column statistics for the specified stripe - Result GetStripeColumnStatistics(int64_t stripe_index, - int column_index); + Result GetStripeColumnStatistics(int64_t stripe_index, int column_index); /// \brief Get stripe-level statistics for multiple columns at once. /// @@ -330,8 +315,8 @@ class ARROW_EXPORT ORCFileReader { /// \param[in] stripe_index the stripe index (0-based) /// \param[in] column_indices the column indices to retrieve statistics for /// \return vector of column statistics, one per requested column index - Result> GetStripeStatistics( - int64_t stripe_index, const std::vector& column_indices); + Result> GetStripeStatistics(int64_t stripe_index, + const std::vector& column_indices); /// \brief Get the ORC type tree for column ID mapping. /// diff --git a/cpp/src/arrow/adapters/orc/adapter_test.cc b/cpp/src/arrow/adapters/orc/adapter_test.cc index d810e4de106c..ab7d265714bc 100644 --- a/cpp/src/arrow/adapters/orc/adapter_test.cc +++ b/cpp/src/arrow/adapters/orc/adapter_test.cc @@ -1216,20 +1216,22 @@ TEST(TestAdapterRead, GetColumnStatisticsInteger) { adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); ASSERT_OK_AND_ASSIGN(auto col1_stats, reader->GetColumnStatistics(1)); - EXPECT_EQ(col1_stats.num_values, row_count); - EXPECT_TRUE(col1_stats.has_min_max); - ASSERT_NE(col1_stats.min, nullptr); - ASSERT_NE(col1_stats.max, nullptr); - EXPECT_EQ(checked_pointer_cast(col1_stats.min)->value, 0); - EXPECT_EQ(checked_pointer_cast(col1_stats.max)->value, 999); + ASSERT_OK_AND_ASSIGN(auto col1_scalars, adapters::orc::OrcStatisticsAsScalars(col1_stats)); + EXPECT_EQ(col1_scalars.num_values, row_count); + EXPECT_TRUE(col1_scalars.has_min_max); + ASSERT_NE(col1_scalars.min, nullptr); + ASSERT_NE(col1_scalars.max, nullptr); + EXPECT_EQ(checked_pointer_cast(col1_scalars.min)->value, 0); + EXPECT_EQ(checked_pointer_cast(col1_scalars.max)->value, 999); ASSERT_OK_AND_ASSIGN(auto col2_stats, reader->GetColumnStatistics(2)); - EXPECT_EQ(col2_stats.num_values, row_count); - EXPECT_TRUE(col2_stats.has_min_max); - ASSERT_NE(col2_stats.min, nullptr); - ASSERT_NE(col2_stats.max, nullptr); - EXPECT_EQ(checked_pointer_cast(col2_stats.min)->value, 1000); - EXPECT_EQ(checked_pointer_cast(col2_stats.max)->value, 1999); + ASSERT_OK_AND_ASSIGN(auto col2_scalars, adapters::orc::OrcStatisticsAsScalars(col2_stats)); + EXPECT_EQ(col2_scalars.num_values, row_count); + EXPECT_TRUE(col2_scalars.has_min_max); + ASSERT_NE(col2_scalars.min, nullptr); + ASSERT_NE(col2_scalars.max, nullptr); + EXPECT_EQ(checked_pointer_cast(col2_scalars.min)->value, 1000); + EXPECT_EQ(checked_pointer_cast(col2_scalars.max)->value, 1999); } TEST(TestAdapterRead, GetStripeColumnStatistics) { @@ -1262,9 +1264,11 @@ TEST(TestAdapterRead, GetStripeColumnStatistics) { adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); ASSERT_OK_AND_ASSIGN(auto stripe0_stats, reader->GetStripeColumnStatistics(0, 1)); - EXPECT_TRUE(stripe0_stats.has_min_max); - EXPECT_EQ(checked_pointer_cast(stripe0_stats.min)->value, 100); - EXPECT_EQ(checked_pointer_cast(stripe0_stats.max)->value, 599); + ASSERT_OK_AND_ASSIGN(auto stripe0_scalars, + adapters::orc::OrcStatisticsAsScalars(stripe0_stats)); + EXPECT_TRUE(stripe0_scalars.has_min_max); + EXPECT_EQ(checked_pointer_cast(stripe0_scalars.min)->value, 100); + EXPECT_EQ(checked_pointer_cast(stripe0_scalars.max)->value, 599); } TEST(TestAdapterRead, GetColumnStatisticsString) { @@ -1306,12 +1310,13 @@ TEST(TestAdapterRead, GetColumnStatisticsString) { adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); ASSERT_OK_AND_ASSIGN(auto col_stats, reader->GetColumnStatistics(1)); - EXPECT_EQ(col_stats.num_values, row_count); - EXPECT_TRUE(col_stats.has_min_max); - ASSERT_NE(col_stats.min, nullptr); - ASSERT_NE(col_stats.max, nullptr); - EXPECT_EQ(checked_pointer_cast(col_stats.min)->ToString(), "apple"); - EXPECT_EQ(checked_pointer_cast(col_stats.max)->ToString(), "elderberry"); + ASSERT_OK_AND_ASSIGN(auto col_scalars, adapters::orc::OrcStatisticsAsScalars(col_stats)); + EXPECT_EQ(col_scalars.num_values, row_count); + EXPECT_TRUE(col_scalars.has_min_max); + ASSERT_NE(col_scalars.min, nullptr); + ASSERT_NE(col_scalars.max, nullptr); + EXPECT_EQ(checked_pointer_cast(col_scalars.min)->ToString(), "apple"); + EXPECT_EQ(checked_pointer_cast(col_scalars.max)->ToString(), "elderberry"); } TEST(TestAdapterRead, GetColumnStatisticsOutOfRange) { @@ -1382,8 +1387,9 @@ TEST(TestAdapterRead, GetColumnStatisticsWithNulls) { adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); ASSERT_OK_AND_ASSIGN(auto col_stats, reader->GetColumnStatistics(1)); - EXPECT_TRUE(col_stats.has_null); - EXPECT_TRUE(col_stats.has_min_max); + ASSERT_OK_AND_ASSIGN(auto col_scalars, adapters::orc::OrcStatisticsAsScalars(col_stats)); + EXPECT_TRUE(col_stats.has_null()); + EXPECT_TRUE(col_scalars.has_min_max); } TEST(TestAdapterRead, GetColumnStatisticsBoolean) { @@ -1417,10 +1423,11 @@ TEST(TestAdapterRead, GetColumnStatisticsBoolean) { adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); ASSERT_OK_AND_ASSIGN(auto col_stats, reader->GetColumnStatistics(1)); - EXPECT_EQ(col_stats.num_values, row_count); - EXPECT_FALSE(col_stats.has_min_max); // Boolean types don't have min/max - EXPECT_EQ(col_stats.min, nullptr); - EXPECT_EQ(col_stats.max, nullptr); + ASSERT_OK_AND_ASSIGN(auto col_scalars, adapters::orc::OrcStatisticsAsScalars(col_stats)); + EXPECT_EQ(col_scalars.num_values, row_count); + EXPECT_FALSE(col_scalars.has_min_max); // Boolean types don't have min/max + EXPECT_EQ(col_scalars.min, nullptr); + EXPECT_EQ(col_scalars.max, nullptr); } TEST(TestAdapterRead, GetColumnStatisticsDate) { @@ -1454,12 +1461,13 @@ TEST(TestAdapterRead, GetColumnStatisticsDate) { adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); ASSERT_OK_AND_ASSIGN(auto col_stats, reader->GetColumnStatistics(1)); - EXPECT_EQ(col_stats.num_values, row_count); - EXPECT_TRUE(col_stats.has_min_max); - ASSERT_NE(col_stats.min, nullptr); - ASSERT_NE(col_stats.max, nullptr); - EXPECT_EQ(checked_pointer_cast(col_stats.min)->value, 0); - EXPECT_EQ(checked_pointer_cast(col_stats.max)->value, 9); + ASSERT_OK_AND_ASSIGN(auto col_scalars, adapters::orc::OrcStatisticsAsScalars(col_stats)); + EXPECT_EQ(col_scalars.num_values, row_count); + EXPECT_TRUE(col_scalars.has_min_max); + ASSERT_NE(col_scalars.min, nullptr); + ASSERT_NE(col_scalars.max, nullptr); + EXPECT_EQ(checked_pointer_cast(col_scalars.min)->value, 0); + EXPECT_EQ(checked_pointer_cast(col_scalars.max)->value, 9); } TEST(TestAdapterRead, GetColumnStatisticsTimestamp) { @@ -1494,10 +1502,11 @@ TEST(TestAdapterRead, GetColumnStatisticsTimestamp) { adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); ASSERT_OK_AND_ASSIGN(auto col_stats, reader->GetColumnStatistics(1)); - EXPECT_EQ(col_stats.num_values, row_count); - EXPECT_TRUE(col_stats.has_min_max); - ASSERT_NE(col_stats.min, nullptr); - ASSERT_NE(col_stats.max, nullptr); + ASSERT_OK_AND_ASSIGN(auto col_scalars, adapters::orc::OrcStatisticsAsScalars(col_stats)); + EXPECT_EQ(col_scalars.num_values, row_count); + EXPECT_TRUE(col_scalars.has_min_max); + ASSERT_NE(col_scalars.min, nullptr); + ASSERT_NE(col_scalars.max, nullptr); // Verify the timestamps are TimestampScalar with correct nanosecond values. // Written: data[i] = i*1000 seconds, nanoseconds[i] = i*100 sub-second nanos. @@ -1505,8 +1514,8 @@ TEST(TestAdapterRead, GetColumnStatisticsTimestamp) { // Max (i=9): 9000s + 900ns = 9000 * 1,000,000,000 + 900 = 9,000,000,000,900 ns. // Statistics store millis + sub-ms-nanos (last 6 digits of ns). // Conversion: millis * 1,000,000 + sub_ms_nanos = total nanoseconds. - auto min_ts = checked_pointer_cast(col_stats.min); - auto max_ts = checked_pointer_cast(col_stats.max); + auto min_ts = checked_pointer_cast(col_scalars.min); + auto max_ts = checked_pointer_cast(col_scalars.max); EXPECT_TRUE(min_ts->is_valid); EXPECT_TRUE(max_ts->is_valid); EXPECT_EQ(min_ts->value, 0); @@ -1784,12 +1793,13 @@ TEST(TestAdapterRead, GetColumnStatisticsDoubleNaN) { adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); ASSERT_OK_AND_ASSIGN(auto col_stats, reader->GetColumnStatistics(1)); + ASSERT_OK_AND_ASSIGN(auto col_scalars, adapters::orc::OrcStatisticsAsScalars(col_stats)); // When NaN values are present, ORC may report NaN as min or max. // Our guard should detect this and set has_min_max = false. - if (col_stats.has_min_max) { + if (col_scalars.has_min_max) { // If ORC writer filtered NaN from statistics, min/max should be valid (non-NaN) - auto min_val = checked_pointer_cast(col_stats.min); - auto max_val = checked_pointer_cast(col_stats.max); + auto min_val = checked_pointer_cast(col_scalars.min); + auto max_val = checked_pointer_cast(col_scalars.max); EXPECT_FALSE(std::isnan(min_val->value)); EXPECT_FALSE(std::isnan(max_val->value)); } @@ -1883,12 +1893,16 @@ TEST(TestAdapterRead, GetStripeStatisticsBulk) { for (size_t i = 0; i < column_indices.size(); ++i) { ASSERT_OK_AND_ASSIGN(auto individual_stats, reader->GetStripeColumnStatistics(0, column_indices[i])); - EXPECT_EQ(bulk_stats[i].has_null, individual_stats.has_null); - EXPECT_EQ(bulk_stats[i].num_values, individual_stats.num_values); - EXPECT_EQ(bulk_stats[i].has_min_max, individual_stats.has_min_max); - if (bulk_stats[i].has_min_max && individual_stats.has_min_max) { - EXPECT_TRUE(bulk_stats[i].min->Equals(*individual_stats.min)); - EXPECT_TRUE(bulk_stats[i].max->Equals(*individual_stats.max)); + ASSERT_OK_AND_ASSIGN(auto bulk_scalars, + adapters::orc::OrcStatisticsAsScalars(bulk_stats[i])); + ASSERT_OK_AND_ASSIGN(auto individual_scalars, + adapters::orc::OrcStatisticsAsScalars(individual_stats)); + EXPECT_EQ(bulk_stats[i].has_null(), individual_stats.has_null()); + EXPECT_EQ(bulk_scalars.num_values, individual_scalars.num_values); + EXPECT_EQ(bulk_scalars.has_min_max, individual_scalars.has_min_max); + if (bulk_scalars.has_min_max && individual_scalars.has_min_max) { + EXPECT_TRUE(bulk_scalars.min->Equals(*individual_scalars.min)); + EXPECT_TRUE(bulk_scalars.max->Equals(*individual_scalars.max)); } } @@ -2070,15 +2084,16 @@ TEST(TestAdapterRead, GetColumnStatisticsDecimal) { adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); ASSERT_OK_AND_ASSIGN(auto col_stats, reader->GetColumnStatistics(1)); - EXPECT_EQ(col_stats.num_values, row_count); - EXPECT_FALSE(col_stats.has_null); - EXPECT_TRUE(col_stats.has_min_max); - ASSERT_NE(col_stats.min, nullptr); - ASSERT_NE(col_stats.max, nullptr); + ASSERT_OK_AND_ASSIGN(auto col_scalars, adapters::orc::OrcStatisticsAsScalars(col_stats)); + EXPECT_EQ(col_scalars.num_values, row_count); + EXPECT_FALSE(col_stats.has_null()); + EXPECT_TRUE(col_scalars.has_min_max); + ASSERT_NE(col_scalars.min, nullptr); + ASSERT_NE(col_scalars.max, nullptr); // Verify Decimal128Scalar values (unscaled integers with scale 2) - auto min_dec = checked_pointer_cast(col_stats.min); - auto max_dec = checked_pointer_cast(col_stats.max); + auto min_dec = checked_pointer_cast(col_scalars.min); + auto max_dec = checked_pointer_cast(col_scalars.max); EXPECT_EQ(min_dec->value, Decimal128(101)); // 1.01 unscaled EXPECT_EQ(max_dec->value, Decimal128(509)); // 5.09 unscaled diff --git a/cpp/src/arrow/adapters/orc/statistics.cc b/cpp/src/arrow/adapters/orc/statistics.cc new file mode 100644 index 000000000000..61748ea959af --- /dev/null +++ b/cpp/src/arrow/adapters/orc/statistics.cc @@ -0,0 +1,185 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/adapters/orc/statistics.h" + +#include +#include + +#include "arrow/scalar.h" +#include "arrow/type.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/decimal.h" +#include "orc/Statistics.hh" + +namespace liborc = orc; + +namespace arrow { +namespace adapters { +namespace orc { +namespace { + +constexpr int64_t kMaxMillisForNanos = std::numeric_limits::max() / 1000000LL; +constexpr int64_t kMinMillisForNanos = std::numeric_limits::lowest() / 1000000LL; + +bool MillisFitInNanos(int64_t millis) { + return millis >= kMinMillisForNanos && millis <= kMaxMillisForNanos; +} + +} // namespace + +bool Statistics::has_null() const { return column_statistics_->hasNull(); } + +std::optional Statistics::null_count() const { + // liborc doesn't expose null_count on ColumnStatistics. + return std::nullopt; +} + +int64_t Statistics::num_values() const { + return static_cast(column_statistics_->getNumberOfValues()); +} + +const liborc::IntegerColumnStatistics* Statistics::integer() const { + return dynamic_cast(column_statistics_); +} + +const liborc::DoubleColumnStatistics* Statistics::floating_point() const { + return dynamic_cast(column_statistics_); +} + +const liborc::StringColumnStatistics* Statistics::string() const { + return dynamic_cast(column_statistics_); +} + +const liborc::DateColumnStatistics* Statistics::date() const { + return dynamic_cast(column_statistics_); +} + +const liborc::TimestampColumnStatistics* Statistics::timestamp() const { + return dynamic_cast(column_statistics_); +} + +const liborc::DecimalColumnStatistics* Statistics::decimal() const { + return dynamic_cast(column_statistics_); +} + +bool Statistics::HasMinMax() const { + if (!valid()) { + return false; + } + + if (const auto* int_stats = integer()) { + return int_stats->hasMinimum() && int_stats->hasMaximum(); + } + if (const auto* double_stats = floating_point()) { + if (!double_stats->hasMinimum() || !double_stats->hasMaximum()) { + return false; + } + return !std::isnan(double_stats->getMinimum()) && !std::isnan(double_stats->getMaximum()); + } + if (const auto* string_stats = string()) { + return string_stats->hasMinimum() && string_stats->hasMaximum(); + } + if (const auto* date_stats = date()) { + return date_stats->hasMinimum() && date_stats->hasMaximum(); + } + if (const auto* ts_stats = timestamp()) { + if (!ts_stats->hasMinimum() || !ts_stats->hasMaximum()) { + return false; + } + return MillisFitInNanos(ts_stats->getMinimum()) && + MillisFitInNanos(ts_stats->getMaximum()); + } + if (const auto* decimal_stats = decimal()) { + if (!decimal_stats->hasMinimum() || !decimal_stats->hasMaximum()) { + return false; + } + liborc::Decimal min_dec = decimal_stats->getMinimum(); + liborc::Decimal max_dec = decimal_stats->getMaximum(); + return min_dec.scale == max_dec.scale; + } + return false; +} + +Result OrcStatisticsAsScalars( + const Statistics& statistics) { + if (!statistics.valid()) { + return Status::Invalid("ORC statistics wrapper is not initialized"); + } + + OrcColumnStatisticsAsScalars converted; + converted.has_null = statistics.has_null(); + converted.num_values = statistics.num_values(); + converted.has_min_max = false; + converted.min = nullptr; + converted.max = nullptr; + + if (!statistics.HasMinMax()) { + return converted; + } + + converted.has_min_max = true; + if (const auto* int_stats = statistics.integer()) { + converted.min = std::make_shared(int_stats->getMinimum()); + converted.max = std::make_shared(int_stats->getMaximum()); + return converted; + } + if (const auto* double_stats = statistics.floating_point()) { + converted.min = std::make_shared(double_stats->getMinimum()); + converted.max = std::make_shared(double_stats->getMaximum()); + return converted; + } + if (const auto* string_stats = statistics.string()) { + converted.min = std::make_shared(string_stats->getMinimum()); + converted.max = std::make_shared(string_stats->getMaximum()); + return converted; + } + if (const auto* date_stats = statistics.date()) { + converted.min = std::make_shared(date_stats->getMinimum()); + converted.max = std::make_shared(date_stats->getMaximum()); + return converted; + } + if (const auto* ts_stats = statistics.timestamp()) { + auto ts_type = timestamp(TimeUnit::NANO); + converted.min = std::make_shared( + ts_stats->getMinimum() * 1000000LL + ts_stats->getMinimumNanos(), ts_type); + converted.max = std::make_shared( + ts_stats->getMaximum() * 1000000LL + ts_stats->getMaximumNanos(), ts_type); + return converted; + } + if (const auto* decimal_stats = statistics.decimal()) { + liborc::Decimal min_dec = decimal_stats->getMinimum(); + liborc::Decimal max_dec = decimal_stats->getMaximum(); + + Decimal128 min_d128(min_dec.value.getHighBits(), min_dec.value.getLowBits()); + Decimal128 max_d128(max_dec.value.getHighBits(), max_dec.value.getLowBits()); + auto dec_type = decimal128(38, min_dec.scale); + + converted.min = std::make_shared(min_d128, dec_type); + converted.max = std::make_shared(max_d128, dec_type); + return converted; + } + + converted.has_min_max = false; + converted.min = nullptr; + converted.max = nullptr; + return converted; +} + +} // namespace orc +} // namespace adapters +} // namespace arrow diff --git a/cpp/src/arrow/adapters/orc/statistics.h b/cpp/src/arrow/adapters/orc/statistics.h new file mode 100644 index 000000000000..188eb04cc1e8 --- /dev/null +++ b/cpp/src/arrow/adapters/orc/statistics.h @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include + +#include "arrow/result.h" +#include "arrow/scalar.h" +#include "arrow/type_fwd.h" +#include "arrow/util/visibility.h" + +namespace orc { +class ColumnStatistics; +class DateColumnStatistics; +class DecimalColumnStatistics; +class DoubleColumnStatistics; +class IntegerColumnStatistics; +class Statistics; +class StringColumnStatistics; +class TimestampColumnStatistics; +} // namespace orc + +namespace arrow { +namespace adapters { +namespace orc { + +/// \brief Scalar materialization of ORC column statistics. +struct ARROW_EXPORT OrcColumnStatisticsAsScalars { + /// \brief Whether the column contains null values. + bool has_null; + /// \brief Number of non-null values in the column. + int64_t num_values; + /// \brief Whether min/max statistics are available and valid. + bool has_min_max; + /// \brief Minimum value (nullptr if unavailable). + std::shared_ptr min; + /// \brief Maximum value (nullptr if unavailable). + std::shared_ptr max; +}; + +/// \brief Thin wrapper over liborc column statistics. +/// +/// Keeps liborc type dispatch in one place and provides a stable API surface +/// for downstream consumers. +class ARROW_EXPORT Statistics { + public: + Statistics() = default; + Statistics(std::shared_ptr owner, + const ::orc::ColumnStatistics* column_statistics) + : owner_(std::move(owner)), column_statistics_(column_statistics) {} + explicit Statistics(const ::orc::ColumnStatistics* column_statistics) + : column_statistics_(column_statistics) {} + + bool valid() const { return column_statistics_ != nullptr; } + bool has_null() const; + std::optional null_count() const; + int64_t num_values() const; + bool HasMinMax() const; + + const ::orc::ColumnStatistics* raw() const { return column_statistics_; } + const ::orc::IntegerColumnStatistics* integer() const; + const ::orc::DoubleColumnStatistics* floating_point() const; + const ::orc::StringColumnStatistics* string() const; + const ::orc::DateColumnStatistics* date() const; + const ::orc::TimestampColumnStatistics* timestamp() const; + const ::orc::DecimalColumnStatistics* decimal() const; + + private: + std::shared_ptr owner_; + const ::orc::ColumnStatistics* column_statistics_ = nullptr; +}; + +ARROW_EXPORT Result OrcStatisticsAsScalars( + const Statistics& statistics); + +} // namespace orc +} // namespace adapters +} // namespace arrow