From f2cc497abd5407e7b008f9052eb1a794ee4e4103 Mon Sep 17 00:00:00 2001 From: daidai <2017501503@qq.com> Date: Thu, 17 Oct 2024 11:23:37 +0800 Subject: [PATCH 1/5] orc merge multi stripes io --- c++/include/orc/Reader.hh | 6 +++++- c++/src/Reader.cc | 30 ++++++++++++++++++++++++++++++ c++/src/Reader.hh | 4 +++- 3 files changed, 38 insertions(+), 2 deletions(-) diff --git a/c++/include/orc/Reader.hh b/c++/include/orc/Reader.hh index 5843d88c059..71f5545081a 100644 --- a/c++/include/orc/Reader.hh +++ b/c++/include/orc/Reader.hh @@ -40,7 +40,7 @@ namespace orc { // classes that hold data members so we can maintain binary compatibility struct ReaderOptionsPrivate; struct RowReaderOptionsPrivate; - + class InputStream; /** * Expose the reader metrics including the latency and * number of calls of the decompression/decoding/IO modules. @@ -633,6 +633,8 @@ namespace orc { */ virtual std::map getBloomFilters( uint32_t stripeIndex, const std::set& included) const = 0; + + virtual InputStream* getStream() const = 0; }; /** @@ -700,6 +702,8 @@ namespace orc { * Get number of rows in this range. */ virtual uint64_t getNumberOfRows() const = 0; + + virtual std::vector getAllStripesNeeded() const = 0; }; } // namespace orc diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc index 80a5cfd4b7d..b53d8d12ae2 100644 --- a/c++/src/Reader.cc +++ b/c++/src/Reader.cc @@ -1221,6 +1221,36 @@ namespace orc { } } + std::vector RowReaderImpl::getAllStripesNeeded() const { + std::vector allStripesNeeded ; + + auto numberOfStripes = static_cast(footer->stripes_size()); + if (sargsApplier == nullptr || contents->metadata == nullptr) { + return std::vector(numberOfStripes, 1); + } + + for ( uint64_t currentStripeIndex = 0;currentStripeIndex < numberOfStripes ; currentStripeIndex ++) { + bool isStripeNeeded = true; + const auto& currentStripeStats = + contents->metadata->stripestats(static_cast(currentStripeIndex)); + + //Not need add mMetrics,so use 0. + isStripeNeeded = sargsApplier->evaluateStripeStatistics(currentStripeStats, 0); + + //Maybe we should consider bloomFilter, but loadStripeIndex() will change some current status. +// if (isStripeNeeded) { +// // read row group statistics and bloom filters of current stripe +// loadStripeIndex(); +// // select row groups to read in the current stripe +// sargsApplier->pickRowGroups(rowsInCurrentStripe, rowIndexes, bloomFilterIndex); +// isStripeNeeded = sargsApplier->hasSelectedFrom(currentRowInStripe); +// } + + allStripesNeeded.emplace_back(isStripeNeeded); + } + return allStripesNeeded; + } + bool RowReaderImpl::next(ColumnVectorBatch& data) { return nextBatch(data, nullptr) != 0; } diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh index c0f891ef27c..f136536b29b 100644 --- a/c++/src/Reader.hh +++ b/c++/src/Reader.hh @@ -281,6 +281,8 @@ namespace orc { std::unique_ptr createRowBatch(uint64_t size) const override; + std::vector getAllStripesNeeded() const override; + uint64_t nextBatch(ColumnVectorBatch& data, void* arg = nullptr) override; bool next(ColumnVectorBatch& data) override; @@ -418,7 +420,7 @@ namespace orc { return contents->schema.get(); } - InputStream* getStream() const { + InputStream* getStream() const override { return contents->stream.get(); } From 77bdb4c8f3d386f22736f18ecc757c6ca02429d4 Mon Sep 17 00:00:00 2001 From: daidai <2017501503@qq.com> Date: Fri, 18 Oct 2024 01:33:33 +0800 Subject: [PATCH 2/5] orc cache --- c++/include/orc/Reader.hh | 2 ++ c++/src/Reader.cc | 13 +++++++++++-- c++/src/Reader.hh | 4 ++++ 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/c++/include/orc/Reader.hh b/c++/include/orc/Reader.hh index 71f5545081a..fa30f41025f 100644 --- a/c++/include/orc/Reader.hh +++ b/c++/include/orc/Reader.hh @@ -635,6 +635,8 @@ namespace orc { uint32_t stripeIndex, const std::set& included) const = 0; virtual InputStream* getStream() const = 0; + + virtual void setStream(std::unique_ptr) = 0; }; /** diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc index b53d8d12ae2..ba563f1640f 100644 --- a/c++/src/Reader.cc +++ b/c++/src/Reader.cc @@ -1126,7 +1126,6 @@ namespace orc { << ", footerLength=" << currentStripeInfo.footerlength() << ")"; throw ParseError(msg.str()); } - currentStripeFooter = getStripeFooter(currentStripeInfo, *contents.get()); rowsInCurrentStripe = currentStripeInfo.numberofrows(); processingStripe = currentStripe; @@ -1148,13 +1147,23 @@ namespace orc { isStripeNeeded = sargsApplier->evaluateStripeStatistics(currentStripeStats, stripeRowGroupCount); } + if (!isStripeNeeded) { + // advance to next stripe when current stripe has no matching rows + currentStripe += 1; + currentRowInStripe = 0; + continue; + } + } + currentStripeFooter = getStripeFooter(currentStripeInfo, *contents.get()); + if (sargsApplier) { + bool isStripeNeeded = true; if (isStripeNeeded) { // read row group statistics and bloom filters of current stripe loadStripeIndex(); // select row groups to read in the current stripe sargsApplier->pickRowGroups(rowsInCurrentStripe, rowIndexes, bloomFilterIndex); - isStripeNeeded = sargsApplier->hasSelectedFrom(currentRowInStripe); + isStripeNeeded = sargsApplier->hasSelectedFrom(currentRowInStripe); } if (!isStripeNeeded) { // advance to next stripe when current stripe has no matching rows diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh index f136536b29b..76d04081e56 100644 --- a/c++/src/Reader.hh +++ b/c++/src/Reader.hh @@ -424,6 +424,10 @@ namespace orc { return contents->stream.get(); } + void setStream(std::unique_ptr inputStreamUPtr) override{ + contents->stream = std::move(inputStreamUPtr); + } + uint64_t getMemoryUse(int stripeIx = -1) override; uint64_t getMemoryUseByFieldId(const std::list& include, int stripeIx = -1) override; From 7f8de14a7ac4c21a89d54325eb0ac2f1f8cb1419 Mon Sep 17 00:00:00 2001 From: daidai <2017501503@qq.com> Date: Fri, 18 Oct 2024 02:19:35 +0800 Subject: [PATCH 3/5] fix where in --- c++/src/Reader.cc | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc index ba563f1640f..b53d8d12ae2 100644 --- a/c++/src/Reader.cc +++ b/c++/src/Reader.cc @@ -1126,6 +1126,7 @@ namespace orc { << ", footerLength=" << currentStripeInfo.footerlength() << ")"; throw ParseError(msg.str()); } + currentStripeFooter = getStripeFooter(currentStripeInfo, *contents.get()); rowsInCurrentStripe = currentStripeInfo.numberofrows(); processingStripe = currentStripe; @@ -1147,23 +1148,13 @@ namespace orc { isStripeNeeded = sargsApplier->evaluateStripeStatistics(currentStripeStats, stripeRowGroupCount); } - if (!isStripeNeeded) { - // advance to next stripe when current stripe has no matching rows - currentStripe += 1; - currentRowInStripe = 0; - continue; - } - } - currentStripeFooter = getStripeFooter(currentStripeInfo, *contents.get()); - if (sargsApplier) { - bool isStripeNeeded = true; if (isStripeNeeded) { // read row group statistics and bloom filters of current stripe loadStripeIndex(); // select row groups to read in the current stripe sargsApplier->pickRowGroups(rowsInCurrentStripe, rowIndexes, bloomFilterIndex); - isStripeNeeded = sargsApplier->hasSelectedFrom(currentRowInStripe); + isStripeNeeded = sargsApplier->hasSelectedFrom(currentRowInStripe); } if (!isStripeNeeded) { // advance to next stripe when current stripe has no matching rows From 72b2b1c3b60887e59653bdba660ffa8290f6574a Mon Sep 17 00:00:00 2001 From: daidai <2017501503@qq.com> Date: Fri, 18 Oct 2024 12:11:47 +0800 Subject: [PATCH 4/5] append filter --- c++/src/Reader.cc | 37 +++++++++++++++++++++++-------------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc index b53d8d12ae2..32b2bd65607 100644 --- a/c++/src/Reader.cc +++ b/c++/src/Reader.cc @@ -1126,29 +1126,38 @@ namespace orc { << ", footerLength=" << currentStripeInfo.footerlength() << ")"; throw ParseError(msg.str()); } - currentStripeFooter = getStripeFooter(currentStripeInfo, *contents.get()); - rowsInCurrentStripe = currentStripeInfo.numberofrows(); - processingStripe = currentStripe; - - std::unique_ptr currentStripeInformation(new StripeInformationImpl( - currentStripeInfo.offset(), currentStripeInfo.indexlength(), - currentStripeInfo.datalength(), currentStripeInfo.footerlength(), - currentStripeInfo.numberofrows(), contents->stream.get(), *contents->pool, - contents->compression, contents->blockSize, contents->readerMetrics)); - contents->stream->beforeReadStripe(std::move(currentStripeInformation), selectedColumns); if (sargsApplier) { bool isStripeNeeded = true; if (contents->metadata) { - const auto& currentStripeStats = - contents->metadata->stripestats(static_cast(currentStripe)); + const auto ¤tStripeStats = + contents->metadata->stripestats(static_cast(currentStripe)); // skip this stripe after stats fail to satisfy sargs uint64_t stripeRowGroupCount = - (rowsInCurrentStripe + footer->rowindexstride() - 1) / footer->rowindexstride(); + (rowsInCurrentStripe + footer->rowindexstride() - 1) / footer->rowindexstride(); isStripeNeeded = - sargsApplier->evaluateStripeStatistics(currentStripeStats, stripeRowGroupCount); + sargsApplier->evaluateStripeStatistics(currentStripeStats, stripeRowGroupCount); } + if (!isStripeNeeded) { + // advance to next stripe when current stripe has no matching rows + currentStripe += 1; + currentRowInStripe = 0; + continue; + } + } + currentStripeFooter = getStripeFooter(currentStripeInfo, *contents.get()); + rowsInCurrentStripe = currentStripeInfo.numberofrows(); + processingStripe = currentStripe; + + std::unique_ptr currentStripeInformation(new StripeInformationImpl( + currentStripeInfo.offset(), currentStripeInfo.indexlength(), + currentStripeInfo.datalength(), currentStripeInfo.footerlength(), + currentStripeInfo.numberofrows(), contents->stream.get(), *contents->pool, + contents->compression, contents->blockSize, contents->readerMetrics)); + contents->stream->beforeReadStripe(std::move(currentStripeInformation), selectedColumns); + if (sargsApplier) { + bool isStripeNeeded = true; if (isStripeNeeded) { // read row group statistics and bloom filters of current stripe loadStripeIndex(); From f3d498d33454fd66f492587076ff7d795f01413f Mon Sep 17 00:00:00 2001 From: daidai <2017501503@qq.com> Date: Fri, 18 Oct 2024 21:12:58 +0800 Subject: [PATCH 5/5] change --- c++/include/orc/Reader.hh | 4 +-- c++/src/Reader.cc | 64 ++++++++++++++++++++------------------- c++/src/Reader.hh | 5 +-- 3 files changed, 38 insertions(+), 35 deletions(-) diff --git a/c++/include/orc/Reader.hh b/c++/include/orc/Reader.hh index fa30f41025f..c9be47e0d8b 100644 --- a/c++/include/orc/Reader.hh +++ b/c++/include/orc/Reader.hh @@ -637,6 +637,8 @@ namespace orc { virtual InputStream* getStream() const = 0; virtual void setStream(std::unique_ptr) = 0; + + virtual std::vector getNeedReadStripes(const RowReaderOptions& opts) = 0; }; /** @@ -704,8 +706,6 @@ namespace orc { * Get number of rows in this range. */ virtual uint64_t getNumberOfRows() const = 0; - - virtual std::vector getAllStripesNeeded() const = 0; }; } // namespace orc diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc index 32b2bd65607..eddeeee0b05 100644 --- a/c++/src/Reader.cc +++ b/c++/src/Reader.cc @@ -306,7 +306,8 @@ namespace orc { column_selector.updateSelected(selectedColumns, opts); // prepare SargsApplier if SearchArgument is available - if (opts.getSearchArgument() && footer->rowindexstride() > 0) { + sargsApplier = std::move(contents->sargsApplier); + if (sargsApplier == nullptr && opts.getSearchArgument() && footer->rowindexstride() > 0) { sargs = opts.getSearchArgument(); sargsApplier.reset(new SargsApplier(*contents->schema, sargs.get(), footer->rowindexstride(), getWriterVersionImpl(_contents.get()), @@ -917,6 +918,37 @@ namespace orc { return std::make_unique(contents, opts, filter, stringDictFilter); } + std::vector ReaderImpl::getNeedReadStripes(const RowReaderOptions& opts) { + if (opts.getSearchArgument() && !isMetadataLoaded) { + // load stripe statistics for PPD + readMetadata(); + } + + std::vector allStripesNeeded(numberOfStripes,1); + + if (opts.getSearchArgument() && footer->rowindexstride() > 0) { + auto sargs = opts.getSearchArgument(); + sargsApplier.reset(new SargsApplier(*contents->schema, sargs.get(), footer->rowindexstride(), + getWriterVersionImpl(contents.get()), + contents->readerMetrics)); + + if (sargsApplier == nullptr || contents->metadata == nullptr) { + return allStripesNeeded; + } + + for ( uint64_t currentStripeIndex = 0;currentStripeIndex < numberOfStripes ; currentStripeIndex ++) { + const auto& currentStripeStats = + contents->metadata->stripestats(static_cast(currentStripeIndex)); + //Not need add mMetrics,so use 0. + allStripesNeeded[currentStripeIndex] = sargsApplier->evaluateStripeStatistics(currentStripeStats, 0);; + } + contents->sargsApplier = std::move(sargsApplier); + } + return allStripesNeeded; + } + + + uint64_t maxStreamsForType(const proto::Type& type) { switch (static_cast(type.kind())) { case proto::Type_Kind_STRUCT: @@ -1230,36 +1262,6 @@ namespace orc { } } - std::vector RowReaderImpl::getAllStripesNeeded() const { - std::vector allStripesNeeded ; - - auto numberOfStripes = static_cast(footer->stripes_size()); - if (sargsApplier == nullptr || contents->metadata == nullptr) { - return std::vector(numberOfStripes, 1); - } - - for ( uint64_t currentStripeIndex = 0;currentStripeIndex < numberOfStripes ; currentStripeIndex ++) { - bool isStripeNeeded = true; - const auto& currentStripeStats = - contents->metadata->stripestats(static_cast(currentStripeIndex)); - - //Not need add mMetrics,so use 0. - isStripeNeeded = sargsApplier->evaluateStripeStatistics(currentStripeStats, 0); - - //Maybe we should consider bloomFilter, but loadStripeIndex() will change some current status. -// if (isStripeNeeded) { -// // read row group statistics and bloom filters of current stripe -// loadStripeIndex(); -// // select row groups to read in the current stripe -// sargsApplier->pickRowGroups(rowsInCurrentStripe, rowIndexes, bloomFilterIndex); -// isStripeNeeded = sargsApplier->hasSelectedFrom(currentRowInStripe); -// } - - allStripesNeeded.emplace_back(isStripeNeeded); - } - return allStripesNeeded; - } - bool RowReaderImpl::next(ColumnVectorBatch& data) { return nextBatch(data, nullptr) != 0; } diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh index 76d04081e56..9505022c558 100644 --- a/c++/src/Reader.hh +++ b/c++/src/Reader.hh @@ -100,6 +100,7 @@ namespace orc { bool isDecimalAsLong; std::unique_ptr metadata; ReaderMetrics* readerMetrics; + std::unique_ptr sargsApplier; }; proto::StripeFooter getStripeFooter(const proto::StripeInformation& info, @@ -281,8 +282,6 @@ namespace orc { std::unique_ptr createRowBatch(uint64_t size) const override; - std::vector getAllStripesNeeded() const override; - uint64_t nextBatch(ColumnVectorBatch& data, void* arg = nullptr) override; bool next(ColumnVectorBatch& data) override; @@ -316,6 +315,8 @@ namespace orc { // footer proto::Footer* footer; uint64_t numberOfStripes; + std::unique_ptr sargsApplier; + std::vector getNeedReadStripes(const RowReaderOptions& opts) override; uint64_t getMemoryUse(int stripeIx, std::vector& selectedColumns); // internal methods