Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion c++/include/orc/Reader.hh
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ namespace orc {
// classes that hold data members so we can maintain binary compatibility
struct ReaderOptionsPrivate;
struct RowReaderOptionsPrivate;

class InputStream;
/**
* Expose the reader metrics including the latency and
* number of calls of the decompression/decoding/IO modules.
Expand Down Expand Up @@ -633,6 +633,12 @@ namespace orc {
*/
virtual std::map<uint32_t, BloomFilterIndex> getBloomFilters(
uint32_t stripeIndex, const std::set<uint32_t>& included) const = 0;

virtual InputStream* getStream() const = 0;

virtual void setStream(std::unique_ptr<InputStream>) = 0;

virtual std::vector<int> getNeedReadStripes(const RowReaderOptions& opts) = 0;
};

/**
Expand Down
71 changes: 56 additions & 15 deletions c++/src/Reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,8 @@ namespace orc {
column_selector.updateSelected(selectedColumns, opts);

// prepare SargsApplier if SearchArgument is available
if (opts.getSearchArgument() && footer->rowindexstride() > 0) {
sargsApplier = std::move(contents->sargsApplier);
if (sargsApplier == nullptr && opts.getSearchArgument() && footer->rowindexstride() > 0) {
sargs = opts.getSearchArgument();
sargsApplier.reset(new SargsApplier(*contents->schema, sargs.get(), footer->rowindexstride(),
getWriterVersionImpl(_contents.get()),
Expand Down Expand Up @@ -917,6 +918,37 @@ namespace orc {
return std::make_unique<RowReaderImpl>(contents, opts, filter, stringDictFilter);
}

std::vector<int> ReaderImpl::getNeedReadStripes(const RowReaderOptions& opts) {
if (opts.getSearchArgument() && !isMetadataLoaded) {
// load stripe statistics for PPD
readMetadata();
}

std::vector<int> allStripesNeeded(numberOfStripes,1);

if (opts.getSearchArgument() && footer->rowindexstride() > 0) {
auto sargs = opts.getSearchArgument();
sargsApplier.reset(new SargsApplier(*contents->schema, sargs.get(), footer->rowindexstride(),
getWriterVersionImpl(contents.get()),
contents->readerMetrics));

if (sargsApplier == nullptr || contents->metadata == nullptr) {
return allStripesNeeded;
}

for ( uint64_t currentStripeIndex = 0;currentStripeIndex < numberOfStripes ; currentStripeIndex ++) {
const auto& currentStripeStats =
contents->metadata->stripestats(static_cast<int>(currentStripeIndex));
//Not need add mMetrics,so use 0.
allStripesNeeded[currentStripeIndex] = sargsApplier->evaluateStripeStatistics(currentStripeStats, 0);;
}
contents->sargsApplier = std::move(sargsApplier);
}
return allStripesNeeded;
}



uint64_t maxStreamsForType(const proto::Type& type) {
switch (static_cast<int64_t>(type.kind())) {
case proto::Type_Kind_STRUCT:
Expand Down Expand Up @@ -1126,29 +1158,38 @@ namespace orc {
<< ", footerLength=" << currentStripeInfo.footerlength() << ")";
throw ParseError(msg.str());
}
currentStripeFooter = getStripeFooter(currentStripeInfo, *contents.get());
rowsInCurrentStripe = currentStripeInfo.numberofrows();
processingStripe = currentStripe;

std::unique_ptr<StripeInformation> currentStripeInformation(new StripeInformationImpl(
currentStripeInfo.offset(), currentStripeInfo.indexlength(),
currentStripeInfo.datalength(), currentStripeInfo.footerlength(),
currentStripeInfo.numberofrows(), contents->stream.get(), *contents->pool,
contents->compression, contents->blockSize, contents->readerMetrics));
contents->stream->beforeReadStripe(std::move(currentStripeInformation), selectedColumns);

if (sargsApplier) {
bool isStripeNeeded = true;
if (contents->metadata) {
const auto& currentStripeStats =
contents->metadata->stripestats(static_cast<int>(currentStripe));
const auto &currentStripeStats =
contents->metadata->stripestats(static_cast<int>(currentStripe));
// skip this stripe after stats fail to satisfy sargs
uint64_t stripeRowGroupCount =
(rowsInCurrentStripe + footer->rowindexstride() - 1) / footer->rowindexstride();
(rowsInCurrentStripe + footer->rowindexstride() - 1) / footer->rowindexstride();
isStripeNeeded =
sargsApplier->evaluateStripeStatistics(currentStripeStats, stripeRowGroupCount);
sargsApplier->evaluateStripeStatistics(currentStripeStats, stripeRowGroupCount);
}
if (!isStripeNeeded) {
// advance to next stripe when current stripe has no matching rows
currentStripe += 1;
currentRowInStripe = 0;
continue;
}
}
currentStripeFooter = getStripeFooter(currentStripeInfo, *contents.get());
rowsInCurrentStripe = currentStripeInfo.numberofrows();
processingStripe = currentStripe;

std::unique_ptr<StripeInformation> currentStripeInformation(new StripeInformationImpl(
currentStripeInfo.offset(), currentStripeInfo.indexlength(),
currentStripeInfo.datalength(), currentStripeInfo.footerlength(),
currentStripeInfo.numberofrows(), contents->stream.get(), *contents->pool,
contents->compression, contents->blockSize, contents->readerMetrics));
contents->stream->beforeReadStripe(std::move(currentStripeInformation), selectedColumns);

if (sargsApplier) {
bool isStripeNeeded = true;
if (isStripeNeeded) {
// read row group statistics and bloom filters of current stripe
loadStripeIndex();
Expand Down
9 changes: 8 additions & 1 deletion c++/src/Reader.hh
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ namespace orc {
bool isDecimalAsLong;
std::unique_ptr<proto::Metadata> metadata;
ReaderMetrics* readerMetrics;
std::unique_ptr<SargsApplier> sargsApplier;
};

proto::StripeFooter getStripeFooter(const proto::StripeInformation& info,
Expand Down Expand Up @@ -314,6 +315,8 @@ namespace orc {
// footer
proto::Footer* footer;
uint64_t numberOfStripes;
std::unique_ptr<SargsApplier> sargsApplier;
std::vector<int> getNeedReadStripes(const RowReaderOptions& opts) override;
uint64_t getMemoryUse(int stripeIx, std::vector<bool>& selectedColumns);

// internal methods
Expand Down Expand Up @@ -418,10 +421,14 @@ namespace orc {
return contents->schema.get();
}

InputStream* getStream() const {
InputStream* getStream() const override {
return contents->stream.get();
}

void setStream(std::unique_ptr<InputStream> inputStreamUPtr) override{
contents->stream = std::move(inputStreamUPtr);
}

uint64_t getMemoryUse(int stripeIx = -1) override;

uint64_t getMemoryUseByFieldId(const std::list<uint64_t>& include, int stripeIx = -1) override;
Expand Down