diff --git a/cpp/src/arrow/dataset/file_orc.cc b/cpp/src/arrow/dataset/file_orc.cc index 3c751657a691..8a56329c894e 100644 --- a/cpp/src/arrow/dataset/file_orc.cc +++ b/cpp/src/arrow/dataset/file_orc.cc @@ -19,6 +19,7 @@ #include #include +#include #include "arrow/adapters/orc/adapter.h" #include "arrow/compute/api_scalar.h" @@ -374,6 +375,114 @@ Future> OrcFileFormat::CountRows( })); } +// +// StripeStatisticsCache +// + +struct StripeStatisticsCache { + std::vector stripe_guarantees; + std::unordered_set fields_processed; + std::vector statistics_complete; +}; + +// +// OrcFileFragment implementation +// + +OrcFileFragment::OrcFileFragment(FileSource source, std::shared_ptr format, + compute::Expression partition_expression, + std::shared_ptr physical_schema, + std::optional> stripes) + : FileFragment(std::move(source), std::move(format), std::move(partition_expression), + std::move(physical_schema)), + orc_format_(internal::checked_cast(*format_)), + stripes_(std::move(stripes)) {} + +void* OrcFileFragment::metadata() { + auto lock = physical_schema_mutex_.Lock(); + return orc_reader_; +} + +Status OrcFileFragment::EnsureCompleteMetadata(void* reader) { + auto lock = physical_schema_mutex_.Lock(); + + if (cache_status_ == OrcCacheStatus::Cached) { + return Status::OK(); + } + + if (cache_status_ == OrcCacheStatus::Loading) { + return Status::Invalid("Metadata is currently being loaded by another thread"); + } + + cache_status_ = OrcCacheStatus::Loading; + + if (reader == nullptr) { + ARROW_ASSIGN_OR_RAISE(auto orc_reader, OpenORCReader(source_)); + orc_reader_ = orc_reader.release(); + } else { + orc_reader_ = reader; + } + + cache_status_ = OrcCacheStatus::Cached; + return Status::OK(); +} + +Status OrcFileFragment::ClearCachedMetadata() { + auto lock = physical_schema_mutex_.Lock(); + orc_reader_ = nullptr; + manifest_ = nullptr; + statistics_cache_ = nullptr; + cache_status_ = OrcCacheStatus::Uncached; + return Status::OK(); +} + +Status OrcFileFragment::SetMetadata(void* reader, + std::shared_ptr manifest) { + auto lock = physical_schema_mutex_.Lock(); + if (cache_status_ == OrcCacheStatus::Cached) { + return Status::OK(); + } + orc_reader_ = reader; + manifest_ = std::move(manifest); + cache_status_ = OrcCacheStatus::Cached; + return Status::OK(); +} + +Result> OrcFileFragment::Subset( + compute::Expression predicate) { + ARROW_ASSIGN_OR_RAISE(auto stripes, FilterStripes(std::move(predicate))); + return Subset(std::move(stripes)); +} + +Result> OrcFileFragment::Subset(std::vector stripe_ids) { + auto subset_fragment = std::shared_ptr(new OrcFileFragment( + source_, format_, partition_expression_, physical_schema_, std::move(stripe_ids))); + if (cache_status_ == OrcCacheStatus::Cached) { + ARROW_RETURN_NOT_OK(subset_fragment->SetMetadata(orc_reader_, manifest_)); + } + return subset_fragment; +} + +Result OrcFileFragment::SplitByStripe(compute::Expression predicate) { + ARROW_RETURN_NOT_OK(EnsureCompleteMetadata()); + return Status::NotImplemented("SplitByStripe not yet implemented"); +} + +Result> OrcFileFragment::FilterStripes(compute::Expression predicate) { + ARROW_RETURN_NOT_OK(EnsureCompleteMetadata()); + return Status::NotImplemented("FilterStripes not yet implemented"); +} + +Result> OrcFileFragment::TestStripes( + compute::Expression predicate) { + return Status::NotImplemented("TestStripes not yet implemented"); +} + +Result> OrcFileFragment::TryCountRows( + compute::Expression predicate) { + return std::nullopt; +} + // // // // OrcFileWriter, OrcFileWriteOptions // // diff --git a/cpp/src/arrow/dataset/file_orc.h b/cpp/src/arrow/dataset/file_orc.h index 3c36dcd5d8d9..053ad307f68c 100644 --- a/cpp/src/arrow/dataset/file_orc.h +++ b/cpp/src/arrow/dataset/file_orc.h @@ -104,8 +104,86 @@ struct ARROW_DS_EXPORT OrcSchemaManifest { } }; +/// \brief Cache status for ORC fragment metadata +enum class OrcCacheStatus { + /// No metadata has been cached + Uncached, + /// Metadata is being loaded (to prevent concurrent loads) + Loading, + /// Metadata is fully cached + Cached +}; + +/// \brief Forward declaration for StripeStatisticsCache +/// Full definition in file_orc.cc to avoid exposing implementation details +struct StripeStatisticsCache; + +/// \brief Forward declarations for ORC types +/// These avoid including ORC headers in this public header +namespace orc { +class Reader; +class Statistics; +} // namespace orc + constexpr char kOrcTypeName[] = "orc"; +/// \brief A FileFragment with ORC-specific predicate pushdown capabilities. +/// +/// Extends FileFragment with support for stripe-level filtering based on +/// ORC statistics. Similar to ParquetFileFragment but adapted for ORC's +/// stripe-based structure (vs Parquet's row groups). +class ARROW_DS_EXPORT OrcFileFragment : public FileFragment { + public: + /// \brief Split this fragment into one fragment per stripe. + Result SplitByStripe(compute::Expression predicate); + + /// \brief Return the stripes selected by this fragment. + const std::vector& stripes() const { + if (stripes_) return *stripes_; + static std::vector empty; + return empty; + } + + /// \brief Return the ORC file metadata associated with this fragment. + void* metadata(); + + /// \brief Ensure this fragment's metadata is loaded into memory. + Status EnsureCompleteMetadata(void* reader = nullptr); + + /// \brief Clear all cached metadata and statistics. + Status ClearCachedMetadata() override; + + /// \brief Return a fragment selecting a filtered subset of this fragment's stripes. + Result> Subset(compute::Expression predicate); + Result> Subset(std::vector stripe_ids); + + private: + OrcFileFragment(FileSource source, std::shared_ptr format, + compute::Expression partition_expression, + std::shared_ptr physical_schema, + std::optional> stripes); + + Status SetMetadata(void* reader, std::shared_ptr manifest); + + Result> ReadPhysicalSchemaImpl() override { + ARROW_RETURN_NOT_OK(EnsureCompleteMetadata()); + return physical_schema_; + } + + Result> FilterStripes(compute::Expression predicate); + Result> TestStripes(compute::Expression predicate); + Result> TryCountRows(compute::Expression predicate); + + OrcFileFormat& orc_format_; + std::optional> stripes_; + OrcCacheStatus cache_status_ = OrcCacheStatus::Uncached; + void* orc_reader_ = nullptr; + std::shared_ptr manifest_; + std::unique_ptr statistics_cache_; + + friend class OrcFileFormat; +}; + /// \brief A FileFormat implementation that reads from and writes to ORC files class ARROW_DS_EXPORT OrcFileFormat : public FileFormat { public: