Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions c++/include/orc/Common.hh
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,46 @@ namespace orc {

std::string columnEncodingKindToString(ColumnEncodingKind kind);

class StreamId {
public:
StreamId(uint64_t columnId, StreamKind streamKind)
: _columnId(columnId), _streamKind(streamKind) {}

size_t hash() const {
size_t h1 = std::hash<uint64_t>{}(_columnId);
size_t h2 = std::hash<int>{}(static_cast<int>(_streamKind));
return h1 ^ (h2 << 1);
}

bool operator==(const StreamId& other) const {
return _columnId == other._columnId && _streamKind == other._streamKind;
}

bool operator<(const StreamId& other) const {
if (_columnId != other._columnId) {
return _columnId < other._columnId;
}
return static_cast<int>(_streamKind) < static_cast<int>(other._streamKind);
}

std::string toString() const {
std::ostringstream oss;
oss << "[columnId=" << _columnId << ", streamKind=" << static_cast<int>(_streamKind) << "]";
return oss.str();
}

uint64_t columnId() const {
return _columnId;
}
StreamKind streamKind() const {
return _streamKind;
}

private:
uint64_t _columnId;
StreamKind _streamKind;
};

class StripeInformation {
public:
virtual ~StripeInformation();
Expand Down Expand Up @@ -306,4 +346,13 @@ namespace orc {

} // namespace orc

namespace std {
template <>
struct hash<orc::StreamId> {
size_t operator()(const orc::StreamId& id) const {
return id.hash();
}
};
} // namespace std

#endif
6 changes: 4 additions & 2 deletions c++/include/orc/OrcFile.hh
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@ namespace orc {
*/
virtual const std::string& getName() const = 0;

virtual void beforeReadStripe(std::unique_ptr<StripeInformation> currentStripeInformation,
std::vector<bool> selectedColumns);
virtual void beforeReadStripe(
std::unique_ptr<StripeInformation> currentStripeInformation,
std::vector<bool> selectedColumns,
std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>& streams);
};

/**
Expand Down
58 changes: 33 additions & 25 deletions c++/src/Reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -549,11 +549,14 @@ namespace orc {
if (selectedColumns[colId] && pbStream.has_kind() &&
(pbStream.kind() == proto::Stream_Kind_ROW_INDEX ||
pbStream.kind() == proto::Stream_Kind_BLOOM_FILTER_UTF8)) {
std::unique_ptr<SeekableInputStream> inStream = createDecompressor(
getCompression(),
std::unique_ptr<SeekableInputStream>(new SeekableFileInputStream(
contents->stream.get(), offset, pbStream.length(), *contents->pool)),
getCompressionSize(), *contents->pool, contents->readerMetrics);
auto iter = streams.find({colId, static_cast<StreamKind>(pbStream.kind())});
InputStream* inputStream =
(iter != streams.end()) ? iter->second.get() : contents->stream.get();
std::unique_ptr<SeekableInputStream> inStream =
createDecompressor(getCompression(),
std::unique_ptr<SeekableInputStream>(new SeekableFileInputStream(
inputStream, offset, pbStream.length(), *contents->pool)),
getCompressionSize(), *contents->pool, contents->readerMetrics);

if (pbStream.kind() == proto::Stream_Kind_ROW_INDEX) {
proto::RowIndex rowIndex;
Expand Down Expand Up @@ -951,7 +954,7 @@ namespace orc {
readMetadata();
}

std::vector<int> allStripesNeeded(numberOfStripes,1);
std::vector<int> allStripesNeeded(numberOfStripes, 1);

if (opts.getSearchArgument() && footer->rowindexstride() > 0) {
auto sargs = opts.getSearchArgument();
Expand All @@ -963,19 +966,20 @@ namespace orc {
return allStripesNeeded;
}

for ( uint64_t currentStripeIndex = 0;currentStripeIndex < numberOfStripes ; currentStripeIndex ++) {
for (uint64_t currentStripeIndex = 0; currentStripeIndex < numberOfStripes;
currentStripeIndex++) {
const auto& currentStripeStats =
contents->metadata->stripestats(static_cast<int>(currentStripeIndex));
//Not need add mMetrics,so use 0.
allStripesNeeded[currentStripeIndex] = sargsApplier->evaluateStripeStatistics(currentStripeStats, 0);;
contents->metadata->stripestats(static_cast<int>(currentStripeIndex));
// Not need add mMetrics,so use 0.
allStripesNeeded[currentStripeIndex] =
sargsApplier->evaluateStripeStatistics(currentStripeStats, 0);
;
}
contents->sargsApplier = std::move(sargsApplier);
}
return allStripesNeeded;
}



uint64_t maxStreamsForType(const proto::Type& type) {
switch (static_cast<int64_t>(type.kind())) {
case proto::Type_Kind_STRUCT:
Expand Down Expand Up @@ -1161,6 +1165,7 @@ namespace orc {
reader.reset(); // ColumnReaders use lots of memory; free old memory first
rowIndexes.clear();
bloomFilterIndex.clear();
streams.clear();
followRowInStripe = 0;

// evaluate file statistics if it exists
Expand Down Expand Up @@ -1189,13 +1194,13 @@ namespace orc {
if (sargsApplier) {
bool isStripeNeeded = true;
if (contents->metadata) {
const auto &currentStripeStats =
contents->metadata->stripestats(static_cast<int>(currentStripe));
const auto& currentStripeStats =
contents->metadata->stripestats(static_cast<int>(currentStripe));
// skip this stripe after stats fail to satisfy sargs
uint64_t stripeRowGroupCount =
(rowsInCurrentStripe + footer->rowindexstride() - 1) / footer->rowindexstride();
(rowsInCurrentStripe + footer->rowindexstride() - 1) / footer->rowindexstride();
isStripeNeeded =
sargsApplier->evaluateStripeStatistics(currentStripeStats, stripeRowGroupCount);
sargsApplier->evaluateStripeStatistics(currentStripeStats, stripeRowGroupCount);
}
if (!isStripeNeeded) {
// advance to next stripe when current stripe has no matching rows
Expand All @@ -1209,11 +1214,12 @@ namespace orc {
processingStripe = currentStripe;

std::unique_ptr<StripeInformation> currentStripeInformation(new StripeInformationImpl(
currentStripeInfo.offset(), currentStripeInfo.indexlength(),
currentStripeInfo.datalength(), currentStripeInfo.footerlength(),
currentStripeInfo.numberofrows(), contents->stream.get(), *contents->pool,
contents->compression, contents->blockSize, contents->readerMetrics));
contents->stream->beforeReadStripe(std::move(currentStripeInformation), selectedColumns);
currentStripeInfo.offset(), currentStripeInfo.indexlength(),
currentStripeInfo.datalength(), currentStripeInfo.footerlength(),
currentStripeInfo.numberofrows(), contents->stream.get(), *contents->pool,
contents->compression, contents->blockSize, contents->readerMetrics));
contents->stream->beforeReadStripe(std::move(currentStripeInformation), selectedColumns,
streams);

if (sargsApplier) {
bool isStripeNeeded = true;
Expand All @@ -1237,8 +1243,8 @@ namespace orc {
? getTimezoneByName(currentStripeFooter.writertimezone())
: localTimezone;
StripeStreamsImpl stripeStreams(*this, currentStripe, currentStripeInfo, currentStripeFooter,
currentStripeInfo.offset(), *contents->stream, writerTimezone,
readerTimezone);
currentStripeInfo.offset(), *contents->stream, streams,
writerTimezone, readerTimezone);
reader = buildReader(*contents->schema, stripeStreams, useTightNumericVector);

if (stringDictFilter != nullptr) {
Expand Down Expand Up @@ -1760,7 +1766,9 @@ namespace orc {
// PASS
};

void InputStream::beforeReadStripe(std::unique_ptr<StripeInformation> currentStripeInformation,
std::vector<bool> selectedColumns) {}
void InputStream::beforeReadStripe(
std::unique_ptr<StripeInformation> currentStripeInformation,
std::vector<bool> selectedColumns,
std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>& streams) {}

} // namespace orc
12 changes: 6 additions & 6 deletions c++/src/Reader.hh
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ namespace orc {

// contents
std::shared_ptr<FileContents> contents;
std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>> streams;
const bool throwOnHive11DecimalOverflow;
const int32_t forcedScaleOnHive11Decimal;

Expand Down Expand Up @@ -322,10 +323,9 @@ namespace orc {
// internal methods
void readMetadata() const;
void checkOrcVersion();
void getRowIndexStatistics(
const proto::StripeInformation& stripeInfo, uint64_t stripeIndex,
const proto::StripeFooter& currentStripeFooter,
std::vector<std::vector<proto::ColumnStatistics> >* indexStats) const;
void getRowIndexStatistics(const proto::StripeInformation& stripeInfo, uint64_t stripeIndex,
const proto::StripeFooter& currentStripeFooter,
std::vector<std::vector<proto::ColumnStatistics>>* indexStats) const;

// metadata
mutable bool isMetadataLoaded;
Expand Down Expand Up @@ -425,8 +425,8 @@ namespace orc {
return contents->stream.get();
}

void setStream(std::unique_ptr<InputStream> inputStreamUPtr) override{
contents->stream = std::move(inputStreamUPtr);
void setStream(std::unique_ptr<InputStream> inputStreamUPtr) override {
contents->stream = std::move(inputStreamUPtr);
}

uint64_t getMemoryUse(int stripeIx = -1) override;
Expand Down
17 changes: 10 additions & 7 deletions c++/src/StripeStream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,18 @@

namespace orc {

StripeStreamsImpl::StripeStreamsImpl(const RowReaderImpl& _reader, uint64_t _index,
const proto::StripeInformation& _stripeInfo,
const proto::StripeFooter& _footer, uint64_t _stripeStart,
InputStream& _input, const Timezone& _writerTimezone,
const Timezone& _readerTimezone)
StripeStreamsImpl::StripeStreamsImpl(
const RowReaderImpl& _reader, uint64_t _index, const proto::StripeInformation& _stripeInfo,
const proto::StripeFooter& _footer, uint64_t _stripeStart, InputStream& _input,
const std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>& _streams,
const Timezone& _writerTimezone, const Timezone& _readerTimezone)
: reader(_reader),
stripeInfo(_stripeInfo),
footer(_footer),
stripeIndex(_index),
stripeStart(_stripeStart),
input(_input),
streams(_streams),
writerTimezone(_writerTimezone),
readerTimezone(_readerTimezone) {
// PASS
Expand Down Expand Up @@ -87,8 +88,10 @@ namespace orc {
const proto::Stream& stream = footer.streams(i);
if (stream.has_kind() && stream.kind() == kind &&
stream.column() == static_cast<uint64_t>(columnId)) {
auto iter = streams.find({columnId, static_cast<StreamKind>(kind)});
InputStream* inputStream = (iter != streams.end()) ? iter->second.get() : &input;
uint64_t streamLength = stream.length();
uint64_t myBlock = shouldStream ? input.getNaturalReadSize() : streamLength;
uint64_t myBlock = shouldStream ? inputStream->getNaturalReadSize() : streamLength;
if (offset + streamLength > dataEnd) {
std::stringstream msg;
msg << "Malformed stream meta at stream index " << i << " in stripe " << stripeIndex
Expand All @@ -100,7 +103,7 @@ namespace orc {
}
return createDecompressor(reader.getCompression(),
std::make_unique<SeekableFileInputStream>(
&input, offset, stream.length(), *pool, myBlock),
inputStream, offset, stream.length(), *pool, myBlock),
reader.getCompressionSize(), *pool,
reader.getFileContents().readerMetrics);
}
Expand Down
10 changes: 6 additions & 4 deletions c++/src/StripeStream.hh
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,16 @@ namespace orc {
const uint64_t stripeIndex;
const uint64_t stripeStart;
InputStream& input;
const std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>& streams;
const Timezone& writerTimezone;
const Timezone& readerTimezone;

public:
StripeStreamsImpl(const RowReaderImpl& reader, uint64_t index,
const proto::StripeInformation& stripeInfo, const proto::StripeFooter& footer,
uint64_t stripeStart, InputStream& input, const Timezone& writerTimezone,
const Timezone& readerTimezone);
StripeStreamsImpl(
const RowReaderImpl& reader, uint64_t index, const proto::StripeInformation& stripeInfo,
const proto::StripeFooter& footer, uint64_t stripeStart, InputStream& input,
const std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>& streams,
const Timezone& writerTimezone, const Timezone& readerTimezone);

virtual ~StripeStreamsImpl() override;

Expand Down