Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions cpp/src/arrow/dataset/file_orc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <memory>
#include <optional>
#include <unordered_set>

#include "arrow/adapters/orc/adapter.h"
#include "arrow/compute/api_scalar.h"
Expand Down Expand Up @@ -374,6 +375,114 @@ Future<std::optional<int64_t>> OrcFileFormat::CountRows(
}));
}

//
// StripeStatisticsCache
//

struct StripeStatisticsCache {
std::vector<compute::Expression> stripe_guarantees;
std::unordered_set<std::string> fields_processed;
std::vector<bool> statistics_complete;
};

//
// OrcFileFragment implementation
//

OrcFileFragment::OrcFileFragment(FileSource source, std::shared_ptr<FileFormat> format,
compute::Expression partition_expression,
std::shared_ptr<Schema> physical_schema,
std::optional<std::vector<int>> stripes)
: FileFragment(std::move(source), std::move(format), std::move(partition_expression),
std::move(physical_schema)),
orc_format_(internal::checked_cast<OrcFileFormat&>(*format_)),
stripes_(std::move(stripes)) {}

void* OrcFileFragment::metadata() {
auto lock = physical_schema_mutex_.Lock();
return orc_reader_;
}

Status OrcFileFragment::EnsureCompleteMetadata(void* reader) {
auto lock = physical_schema_mutex_.Lock();

if (cache_status_ == OrcCacheStatus::Cached) {
return Status::OK();
}

if (cache_status_ == OrcCacheStatus::Loading) {
return Status::Invalid("Metadata is currently being loaded by another thread");
}

cache_status_ = OrcCacheStatus::Loading;

if (reader == nullptr) {
ARROW_ASSIGN_OR_RAISE(auto orc_reader, OpenORCReader(source_));
orc_reader_ = orc_reader.release();
} else {
orc_reader_ = reader;
}

cache_status_ = OrcCacheStatus::Cached;
return Status::OK();
}

Status OrcFileFragment::ClearCachedMetadata() {
auto lock = physical_schema_mutex_.Lock();
orc_reader_ = nullptr;
manifest_ = nullptr;
statistics_cache_ = nullptr;
cache_status_ = OrcCacheStatus::Uncached;
return Status::OK();
}

Status OrcFileFragment::SetMetadata(void* reader,
std::shared_ptr<OrcSchemaManifest> manifest) {
auto lock = physical_schema_mutex_.Lock();
if (cache_status_ == OrcCacheStatus::Cached) {
return Status::OK();
}
orc_reader_ = reader;
manifest_ = std::move(manifest);
cache_status_ = OrcCacheStatus::Cached;
return Status::OK();
}

Result<std::shared_ptr<Fragment>> OrcFileFragment::Subset(
compute::Expression predicate) {
ARROW_ASSIGN_OR_RAISE(auto stripes, FilterStripes(std::move(predicate)));
return Subset(std::move(stripes));
}

Result<std::shared_ptr<Fragment>> OrcFileFragment::Subset(std::vector<int> stripe_ids) {
auto subset_fragment = std::shared_ptr<OrcFileFragment>(new OrcFileFragment(
source_, format_, partition_expression_, physical_schema_, std::move(stripe_ids)));
if (cache_status_ == OrcCacheStatus::Cached) {
ARROW_RETURN_NOT_OK(subset_fragment->SetMetadata(orc_reader_, manifest_));
}
return subset_fragment;
}

Result<FragmentVector> OrcFileFragment::SplitByStripe(compute::Expression predicate) {
ARROW_RETURN_NOT_OK(EnsureCompleteMetadata());
return Status::NotImplemented("SplitByStripe not yet implemented");
}

Result<std::vector<int>> OrcFileFragment::FilterStripes(compute::Expression predicate) {
ARROW_RETURN_NOT_OK(EnsureCompleteMetadata());
return Status::NotImplemented("FilterStripes not yet implemented");
}

Result<std::vector<compute::Expression>> OrcFileFragment::TestStripes(
compute::Expression predicate) {
return Status::NotImplemented("TestStripes not yet implemented");
}

Result<std::optional<int64_t>> OrcFileFragment::TryCountRows(
compute::Expression predicate) {
return std::nullopt;
}

// //
// // OrcFileWriter, OrcFileWriteOptions
// //
Expand Down
78 changes: 78 additions & 0 deletions cpp/src/arrow/dataset/file_orc.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,86 @@ struct ARROW_DS_EXPORT OrcSchemaManifest {
}
};

/// \brief Cache status for ORC fragment metadata
enum class OrcCacheStatus {
/// No metadata has been cached
Uncached,
/// Metadata is being loaded (to prevent concurrent loads)
Loading,
/// Metadata is fully cached
Cached
};

/// \brief Forward declaration for StripeStatisticsCache
/// Full definition in file_orc.cc to avoid exposing implementation details
struct StripeStatisticsCache;

/// \brief Forward declarations for ORC types
/// These avoid including ORC headers in this public header
namespace orc {
class Reader;
class Statistics;
} // namespace orc

constexpr char kOrcTypeName[] = "orc";

/// \brief A FileFragment with ORC-specific predicate pushdown capabilities.
///
/// Extends FileFragment with support for stripe-level filtering based on
/// ORC statistics. Similar to ParquetFileFragment but adapted for ORC's
/// stripe-based structure (vs Parquet's row groups).
class ARROW_DS_EXPORT OrcFileFragment : public FileFragment {
public:
/// \brief Split this fragment into one fragment per stripe.
Result<FragmentVector> SplitByStripe(compute::Expression predicate);

/// \brief Return the stripes selected by this fragment.
const std::vector<int>& stripes() const {
if (stripes_) return *stripes_;
static std::vector<int> empty;
return empty;
}

/// \brief Return the ORC file metadata associated with this fragment.
void* metadata();

/// \brief Ensure this fragment's metadata is loaded into memory.
Status EnsureCompleteMetadata(void* reader = nullptr);

/// \brief Clear all cached metadata and statistics.
Status ClearCachedMetadata() override;

/// \brief Return a fragment selecting a filtered subset of this fragment's stripes.
Result<std::shared_ptr<Fragment>> Subset(compute::Expression predicate);
Result<std::shared_ptr<Fragment>> Subset(std::vector<int> stripe_ids);

private:
OrcFileFragment(FileSource source, std::shared_ptr<FileFormat> format,
compute::Expression partition_expression,
std::shared_ptr<Schema> physical_schema,
std::optional<std::vector<int>> stripes);

Status SetMetadata(void* reader, std::shared_ptr<OrcSchemaManifest> manifest);

Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override {
ARROW_RETURN_NOT_OK(EnsureCompleteMetadata());
return physical_schema_;
}

Result<std::vector<int>> FilterStripes(compute::Expression predicate);
Result<std::vector<compute::Expression>> TestStripes(compute::Expression predicate);
Result<std::optional<int64_t>> TryCountRows(compute::Expression predicate);

OrcFileFormat& orc_format_;
std::optional<std::vector<int>> stripes_;
OrcCacheStatus cache_status_ = OrcCacheStatus::Uncached;
void* orc_reader_ = nullptr;
std::shared_ptr<OrcSchemaManifest> manifest_;
std::unique_ptr<StripeStatisticsCache> statistics_cache_;

friend class OrcFileFormat;
};

/// \brief A FileFormat implementation that reads from and writes to ORC files
class ARROW_DS_EXPORT OrcFileFormat : public FileFormat {
public:
Expand Down
Loading