From f4e9229d8f2edc1b2dcc197765902f97b227b63b Mon Sep 17 00:00:00 2001 From: kaka11chen Date: Wed, 25 Dec 2024 23:26:21 +0800 Subject: [PATCH] [Feature] Add input stream of stripe streams in stripe reader. --- c++/include/orc/Common.hh | 49 ++++++++++++++++++++++++++++++++ c++/include/orc/OrcFile.hh | 6 ++-- c++/src/Reader.cc | 58 ++++++++++++++++++++++---------------- c++/src/Reader.hh | 12 ++++---- c++/src/StripeStream.cc | 17 ++++++----- c++/src/StripeStream.hh | 10 ++++--- 6 files changed, 108 insertions(+), 44 deletions(-) diff --git a/c++/include/orc/Common.hh b/c++/include/orc/Common.hh index beae9dd6f31..9df0b0ed6a8 100644 --- a/c++/include/orc/Common.hh +++ b/c++/include/orc/Common.hh @@ -156,6 +156,46 @@ namespace orc { std::string columnEncodingKindToString(ColumnEncodingKind kind); + class StreamId { + public: + StreamId(uint64_t columnId, StreamKind streamKind) + : _columnId(columnId), _streamKind(streamKind) {} + + size_t hash() const { + size_t h1 = std::hash{}(_columnId); + size_t h2 = std::hash{}(static_cast(_streamKind)); + return h1 ^ (h2 << 1); + } + + bool operator==(const StreamId& other) const { + return _columnId == other._columnId && _streamKind == other._streamKind; + } + + bool operator<(const StreamId& other) const { + if (_columnId != other._columnId) { + return _columnId < other._columnId; + } + return static_cast(_streamKind) < static_cast(other._streamKind); + } + + std::string toString() const { + std::ostringstream oss; + oss << "[columnId=" << _columnId << ", streamKind=" << static_cast(_streamKind) << "]"; + return oss.str(); + } + + uint64_t columnId() const { + return _columnId; + } + StreamKind streamKind() const { + return _streamKind; + } + + private: + uint64_t _columnId; + StreamKind _streamKind; + }; + class StripeInformation { public: virtual ~StripeInformation(); @@ -306,4 +346,13 @@ namespace orc { } // namespace orc +namespace std { + template <> + struct hash { + size_t operator()(const orc::StreamId& id) const { + return id.hash(); + } + }; +} // namespace std + #endif diff --git a/c++/include/orc/OrcFile.hh b/c++/include/orc/OrcFile.hh index c52b66b7210..c7826e13add 100644 --- a/c++/include/orc/OrcFile.hh +++ b/c++/include/orc/OrcFile.hh @@ -63,8 +63,10 @@ namespace orc { */ virtual const std::string& getName() const = 0; - virtual void beforeReadStripe(std::unique_ptr currentStripeInformation, - std::vector selectedColumns); + virtual void beforeReadStripe( + std::unique_ptr currentStripeInformation, + std::vector selectedColumns, + std::unordered_map>& streams); }; /** diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc index b8e1a914215..564c1f2aa6d 100644 --- a/c++/src/Reader.cc +++ b/c++/src/Reader.cc @@ -549,11 +549,14 @@ namespace orc { if (selectedColumns[colId] && pbStream.has_kind() && (pbStream.kind() == proto::Stream_Kind_ROW_INDEX || pbStream.kind() == proto::Stream_Kind_BLOOM_FILTER_UTF8)) { - std::unique_ptr inStream = createDecompressor( - getCompression(), - std::unique_ptr(new SeekableFileInputStream( - contents->stream.get(), offset, pbStream.length(), *contents->pool)), - getCompressionSize(), *contents->pool, contents->readerMetrics); + auto iter = streams.find({colId, static_cast(pbStream.kind())}); + InputStream* inputStream = + (iter != streams.end()) ? iter->second.get() : contents->stream.get(); + std::unique_ptr inStream = + createDecompressor(getCompression(), + std::unique_ptr(new SeekableFileInputStream( + inputStream, offset, pbStream.length(), *contents->pool)), + getCompressionSize(), *contents->pool, contents->readerMetrics); if (pbStream.kind() == proto::Stream_Kind_ROW_INDEX) { proto::RowIndex rowIndex; @@ -951,7 +954,7 @@ namespace orc { readMetadata(); } - std::vector allStripesNeeded(numberOfStripes,1); + std::vector allStripesNeeded(numberOfStripes, 1); if (opts.getSearchArgument() && footer->rowindexstride() > 0) { auto sargs = opts.getSearchArgument(); @@ -963,19 +966,20 @@ namespace orc { return allStripesNeeded; } - for ( uint64_t currentStripeIndex = 0;currentStripeIndex < numberOfStripes ; currentStripeIndex ++) { + for (uint64_t currentStripeIndex = 0; currentStripeIndex < numberOfStripes; + currentStripeIndex++) { const auto& currentStripeStats = - contents->metadata->stripestats(static_cast(currentStripeIndex)); - //Not need add mMetrics,so use 0. - allStripesNeeded[currentStripeIndex] = sargsApplier->evaluateStripeStatistics(currentStripeStats, 0);; + contents->metadata->stripestats(static_cast(currentStripeIndex)); + // Not need add mMetrics,so use 0. + allStripesNeeded[currentStripeIndex] = + sargsApplier->evaluateStripeStatistics(currentStripeStats, 0); + ; } contents->sargsApplier = std::move(sargsApplier); } return allStripesNeeded; } - - uint64_t maxStreamsForType(const proto::Type& type) { switch (static_cast(type.kind())) { case proto::Type_Kind_STRUCT: @@ -1161,6 +1165,7 @@ namespace orc { reader.reset(); // ColumnReaders use lots of memory; free old memory first rowIndexes.clear(); bloomFilterIndex.clear(); + streams.clear(); followRowInStripe = 0; // evaluate file statistics if it exists @@ -1189,13 +1194,13 @@ namespace orc { if (sargsApplier) { bool isStripeNeeded = true; if (contents->metadata) { - const auto ¤tStripeStats = - contents->metadata->stripestats(static_cast(currentStripe)); + const auto& currentStripeStats = + contents->metadata->stripestats(static_cast(currentStripe)); // skip this stripe after stats fail to satisfy sargs uint64_t stripeRowGroupCount = - (rowsInCurrentStripe + footer->rowindexstride() - 1) / footer->rowindexstride(); + (rowsInCurrentStripe + footer->rowindexstride() - 1) / footer->rowindexstride(); isStripeNeeded = - sargsApplier->evaluateStripeStatistics(currentStripeStats, stripeRowGroupCount); + sargsApplier->evaluateStripeStatistics(currentStripeStats, stripeRowGroupCount); } if (!isStripeNeeded) { // advance to next stripe when current stripe has no matching rows @@ -1209,11 +1214,12 @@ namespace orc { processingStripe = currentStripe; std::unique_ptr currentStripeInformation(new StripeInformationImpl( - currentStripeInfo.offset(), currentStripeInfo.indexlength(), - currentStripeInfo.datalength(), currentStripeInfo.footerlength(), - currentStripeInfo.numberofrows(), contents->stream.get(), *contents->pool, - contents->compression, contents->blockSize, contents->readerMetrics)); - contents->stream->beforeReadStripe(std::move(currentStripeInformation), selectedColumns); + currentStripeInfo.offset(), currentStripeInfo.indexlength(), + currentStripeInfo.datalength(), currentStripeInfo.footerlength(), + currentStripeInfo.numberofrows(), contents->stream.get(), *contents->pool, + contents->compression, contents->blockSize, contents->readerMetrics)); + contents->stream->beforeReadStripe(std::move(currentStripeInformation), selectedColumns, + streams); if (sargsApplier) { bool isStripeNeeded = true; @@ -1237,8 +1243,8 @@ namespace orc { ? getTimezoneByName(currentStripeFooter.writertimezone()) : localTimezone; StripeStreamsImpl stripeStreams(*this, currentStripe, currentStripeInfo, currentStripeFooter, - currentStripeInfo.offset(), *contents->stream, writerTimezone, - readerTimezone); + currentStripeInfo.offset(), *contents->stream, streams, + writerTimezone, readerTimezone); reader = buildReader(*contents->schema, stripeStreams, useTightNumericVector); if (stringDictFilter != nullptr) { @@ -1760,7 +1766,9 @@ namespace orc { // PASS }; - void InputStream::beforeReadStripe(std::unique_ptr currentStripeInformation, - std::vector selectedColumns) {} + void InputStream::beforeReadStripe( + std::unique_ptr currentStripeInformation, + std::vector selectedColumns, + std::unordered_map>& streams) {} } // namespace orc diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh index 9505022c558..7ec049ad963 100644 --- a/c++/src/Reader.hh +++ b/c++/src/Reader.hh @@ -162,6 +162,7 @@ namespace orc { // contents std::shared_ptr contents; + std::unordered_map> streams; const bool throwOnHive11DecimalOverflow; const int32_t forcedScaleOnHive11Decimal; @@ -322,10 +323,9 @@ namespace orc { // internal methods void readMetadata() const; void checkOrcVersion(); - void getRowIndexStatistics( - const proto::StripeInformation& stripeInfo, uint64_t stripeIndex, - const proto::StripeFooter& currentStripeFooter, - std::vector >* indexStats) const; + void getRowIndexStatistics(const proto::StripeInformation& stripeInfo, uint64_t stripeIndex, + const proto::StripeFooter& currentStripeFooter, + std::vector>* indexStats) const; // metadata mutable bool isMetadataLoaded; @@ -425,8 +425,8 @@ namespace orc { return contents->stream.get(); } - void setStream(std::unique_ptr inputStreamUPtr) override{ - contents->stream = std::move(inputStreamUPtr); + void setStream(std::unique_ptr inputStreamUPtr) override { + contents->stream = std::move(inputStreamUPtr); } uint64_t getMemoryUse(int stripeIx = -1) override; diff --git a/c++/src/StripeStream.cc b/c++/src/StripeStream.cc index 1f43da4f243..8efa23efa86 100644 --- a/c++/src/StripeStream.cc +++ b/c++/src/StripeStream.cc @@ -25,17 +25,18 @@ namespace orc { - StripeStreamsImpl::StripeStreamsImpl(const RowReaderImpl& _reader, uint64_t _index, - const proto::StripeInformation& _stripeInfo, - const proto::StripeFooter& _footer, uint64_t _stripeStart, - InputStream& _input, const Timezone& _writerTimezone, - const Timezone& _readerTimezone) + StripeStreamsImpl::StripeStreamsImpl( + const RowReaderImpl& _reader, uint64_t _index, const proto::StripeInformation& _stripeInfo, + const proto::StripeFooter& _footer, uint64_t _stripeStart, InputStream& _input, + const std::unordered_map>& _streams, + const Timezone& _writerTimezone, const Timezone& _readerTimezone) : reader(_reader), stripeInfo(_stripeInfo), footer(_footer), stripeIndex(_index), stripeStart(_stripeStart), input(_input), + streams(_streams), writerTimezone(_writerTimezone), readerTimezone(_readerTimezone) { // PASS @@ -87,8 +88,10 @@ namespace orc { const proto::Stream& stream = footer.streams(i); if (stream.has_kind() && stream.kind() == kind && stream.column() == static_cast(columnId)) { + auto iter = streams.find({columnId, static_cast(kind)}); + InputStream* inputStream = (iter != streams.end()) ? iter->second.get() : &input; uint64_t streamLength = stream.length(); - uint64_t myBlock = shouldStream ? input.getNaturalReadSize() : streamLength; + uint64_t myBlock = shouldStream ? inputStream->getNaturalReadSize() : streamLength; if (offset + streamLength > dataEnd) { std::stringstream msg; msg << "Malformed stream meta at stream index " << i << " in stripe " << stripeIndex @@ -100,7 +103,7 @@ namespace orc { } return createDecompressor(reader.getCompression(), std::make_unique( - &input, offset, stream.length(), *pool, myBlock), + inputStream, offset, stream.length(), *pool, myBlock), reader.getCompressionSize(), *pool, reader.getFileContents().readerMetrics); } diff --git a/c++/src/StripeStream.hh b/c++/src/StripeStream.hh index 74bebda6f25..57e51ef76f0 100644 --- a/c++/src/StripeStream.hh +++ b/c++/src/StripeStream.hh @@ -43,14 +43,16 @@ namespace orc { const uint64_t stripeIndex; const uint64_t stripeStart; InputStream& input; + const std::unordered_map>& streams; const Timezone& writerTimezone; const Timezone& readerTimezone; public: - StripeStreamsImpl(const RowReaderImpl& reader, uint64_t index, - const proto::StripeInformation& stripeInfo, const proto::StripeFooter& footer, - uint64_t stripeStart, InputStream& input, const Timezone& writerTimezone, - const Timezone& readerTimezone); + StripeStreamsImpl( + const RowReaderImpl& reader, uint64_t index, const proto::StripeInformation& stripeInfo, + const proto::StripeFooter& footer, uint64_t stripeStart, InputStream& input, + const std::unordered_map>& streams, + const Timezone& writerTimezone, const Timezone& readerTimezone); virtual ~StripeStreamsImpl() override;