Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 62 additions & 4 deletions cpp/src/arrow/dataset/file_orc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -416,11 +416,69 @@ Status OrcFileFragment::EnsureCompleteMetadata(void* reader) {

cache_status_ = OrcCacheStatus::Loading;

// If no reader provided, open one ourselves
if (reader == nullptr) {
ARROW_ASSIGN_OR_RAISE(auto orc_reader, OpenORCReader(source_));
orc_reader_ = orc_reader.release();
} else {
orc_reader_ = reader;
lock.Unlock();
auto scan_options = std::make_shared<ScanOptions>();
ARROW_ASSIGN_OR_RAISE(auto orc_reader, OpenORCReader(source_, scan_options));
// Recursively call with the reader
return EnsureCompleteMetadata(orc_reader.get());
}

// Cast reader to ORCFileReader
auto* orc_reader = reinterpret_cast<arrow::adapters::orc::ORCFileReader*>(reader);

// Get physical schema from ORC file
ARROW_ASSIGN_OR_RAISE(auto file_schema, orc_reader->ReadSchema());

// Validate against given schema if provided
if (given_physical_schema_ && !given_physical_schema_->Equals(*file_schema)) {
return Status::Invalid("Fragment initialized with physical schema ",
*given_physical_schema_, " but ", source_.path(),
" has schema ", *file_schema);
}

physical_schema_ = file_schema;

// Initialize stripes if not already set (select all stripes)
if (!stripes_) {
ARROW_ASSIGN_OR_RAISE(int64_t num_stripes, orc_reader->NumberOfStripes());
std::vector<int> all_stripes(num_stripes);
for (int64_t i = 0; i < num_stripes; ++i) {
all_stripes[i] = static_cast<int>(i);
}
stripes_ = std::move(all_stripes);
}

// Get ORC type tree for manifest building
const void* orc_type = orc_reader->GetORCType();
if (orc_type == nullptr) {
return Status::Invalid("Could not get ORC type information from ", source_.path());
}

// Build schema manifest
auto manifest = std::make_shared<OrcSchemaManifest>();
ARROW_RETURN_NOT_OK(
OrcSchemaManifest::Make(physical_schema_, orc_type, manifest.get()));

// Set metadata and manifest
orc_reader_ = reader;
manifest_ = std::move(manifest);

// Initialize statistics cache
statistics_cache_ = std::make_unique<StripeStatisticsCache>();
statistics_cache_->stripe_guarantees.resize(stripes_->size(), compute::literal(true));
statistics_cache_->statistics_complete.resize(
manifest_->column_index_to_field.size(), false);

// Validate stripe indices
ARROW_ASSIGN_OR_RAISE(int64_t total_stripes, orc_reader->NumberOfStripes());
for (int stripe_idx : *stripes_) {
if (stripe_idx < 0 || stripe_idx >= static_cast<int>(total_stripes)) {
return Status::IndexError("OrcFileFragment references stripe ", stripe_idx,
" but ", source_.path(), " only has ", total_stripes,
" stripes");
}
}

cache_status_ = OrcCacheStatus::Cached;
Expand Down
Loading