From c31b3ed204ba01ad3835d963ec2eb8343fd2981e Mon Sep 17 00:00:00 2001 From: Aliaksei Sandryhaila Date: Wed, 8 Jul 2015 08:06:57 -0700 Subject: [PATCH 1/4] Added a stream block size parameter to ReaderOptions to change the size of InputStreams' buffers. --- c++/include/orc/Reader.hh | 11 ++++++++++ c++/src/Reader.cc | 44 ++++++++++++++++++++++++++------------- 2 files changed, 41 insertions(+), 14 deletions(-) diff --git a/c++/include/orc/Reader.hh b/c++/include/orc/Reader.hh index 14921b08f4..42209ac365 100644 --- a/c++/include/orc/Reader.hh +++ b/c++/include/orc/Reader.hh @@ -542,6 +542,17 @@ namespace orc { * Get the serialized file tail that the user passed in. */ std::string getSerializedFileTail() const; + + /** + * Set the block size for seekable input streams + */ + ReaderOptions& setStreamBlockSize(uint64_t blocksize); + + /** + * Get the block size for seekable input streams + */ + uint64_t getStreamBlockSize(); + uint64_t getStreamBlockSize() const; }; /** diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc index 85f629f905..ac18b900ed 100644 --- a/c++/src/Reader.cc +++ b/c++/src/Reader.cc @@ -48,6 +48,7 @@ namespace orc { std::ostream* errorStream; MemoryPool* memoryPool; std::string serializedTail; + uint64_t streamBlockSize; ReaderOptionsPrivate() { includedColumns.assign(1,0); @@ -58,6 +59,7 @@ namespace orc { forcedScaleOnHive11Decimal = 6; errorStream = &std::cerr; memoryPool = getDefaultPool(); + streamBlockSize = 1024*1024; } }; @@ -176,6 +178,19 @@ namespace orc { return privateBits->serializedTail; } + ReaderOptions& ReaderOptions::setStreamBlockSize(uint64_t blocksize) { + privateBits->streamBlockSize = blocksize; + return *this; + } + + uint64_t ReaderOptions::getStreamBlockSize() { + return privateBits->streamBlockSize; + } + + uint64_t ReaderOptions::getStreamBlockSize() const { + return privateBits->streamBlockSize; + } + StripeInformation::~StripeInformation() { } @@ -1311,17 +1326,18 @@ namespace orc { info.datalength(); uint64_t stripeFooterLength = info.footerlength(); std::unique_ptr pbStream = - createDecompressor(compression, - std::unique_ptr - (new SeekableFileInputStream(stream.get(), - stripeFooterStart, - stripeFooterLength, - memoryPool, - static_cast - (blockSize) - )), - blockSize, - memoryPool); + createDecompressor( + compression, + std::unique_ptr + (new SeekableFileInputStream( + stream.get(), + stripeFooterStart, + stripeFooterLength, + memoryPool, + std::max(blockSize, options.getStreamBlockSize()) + )), + blockSize, + memoryPool); proto::StripeFooter result; if (!result.ParseFromZeroCopyStream(pbStream.get())) { throw ParseError(std::string("bad StripeFooter from ") + @@ -1400,9 +1416,9 @@ namespace orc { if (stream.has_kind() && stream.kind() == kind && stream.column() == static_cast(columnId)) { - int64_t myBlock = static_cast(shouldStream ? - 1024 * 1024 : - stream.length()); + int64_t myBlock = shouldStream ? + reader.getReaderOptions().getStreamBlockSize() : + static_cast(stream.length()); return createDecompressor(reader.getCompression(), std::unique_ptr (new SeekableFileInputStream From 7114d6e89b7f96762b07479b397298edb8412a8c Mon Sep 17 00:00:00 2001 From: Aliaksei Sandryhaila Date: Wed, 8 Jul 2015 08:14:18 -0700 Subject: [PATCH 2/4] Corrected type casting. --- c++/src/Reader.cc | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc index ac18b900ed..3281e1b00d 100644 --- a/c++/src/Reader.cc +++ b/c++/src/Reader.cc @@ -1329,13 +1329,14 @@ namespace orc { createDecompressor( compression, std::unique_ptr - (new SeekableFileInputStream( - stream.get(), - stripeFooterStart, - stripeFooterLength, - memoryPool, - std::max(blockSize, options.getStreamBlockSize()) - )), + (new SeekableFileInputStream( + stream.get(), + stripeFooterStart, + stripeFooterLength, + memoryPool, + static_cast(std::max(blockSize, + options.getStreamBlockSize())) + )), blockSize, memoryPool); proto::StripeFooter result; @@ -1416,9 +1417,9 @@ namespace orc { if (stream.has_kind() && stream.kind() == kind && stream.column() == static_cast(columnId)) { - int64_t myBlock = shouldStream ? + int64_t myBlock = static_cast(shouldStream ? reader.getReaderOptions().getStreamBlockSize() : - static_cast(stream.length()); + stream.length()); return createDecompressor(reader.getCompression(), std::unique_ptr (new SeekableFileInputStream From d40382662e9b564354160fa8a0d833ab5c699a08 Mon Sep 17 00:00:00 2001 From: Aliaksei Sandryhaila Date: Fri, 24 Jul 2015 13:17:26 -0700 Subject: [PATCH 3/4] Revert "Corrected type casting." This reverts commit 7114d6e89b7f96762b07479b397298edb8412a8c. --- c++/src/Reader.cc | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc index 3281e1b00d..ac18b900ed 100644 --- a/c++/src/Reader.cc +++ b/c++/src/Reader.cc @@ -1329,14 +1329,13 @@ namespace orc { createDecompressor( compression, std::unique_ptr - (new SeekableFileInputStream( - stream.get(), - stripeFooterStart, - stripeFooterLength, - memoryPool, - static_cast(std::max(blockSize, - options.getStreamBlockSize())) - )), + (new SeekableFileInputStream( + stream.get(), + stripeFooterStart, + stripeFooterLength, + memoryPool, + std::max(blockSize, options.getStreamBlockSize()) + )), blockSize, memoryPool); proto::StripeFooter result; @@ -1417,9 +1416,9 @@ namespace orc { if (stream.has_kind() && stream.kind() == kind && stream.column() == static_cast(columnId)) { - int64_t myBlock = static_cast(shouldStream ? + int64_t myBlock = shouldStream ? reader.getReaderOptions().getStreamBlockSize() : - stream.length()); + static_cast(stream.length()); return createDecompressor(reader.getCompression(), std::unique_ptr (new SeekableFileInputStream From 5992cc95d63bbcab362e685e65d0748277c7582d Mon Sep 17 00:00:00 2001 From: Aliaksei Sandryhaila Date: Fri, 24 Jul 2015 13:17:47 -0700 Subject: [PATCH 4/4] Revert "Added a stream block size parameter to ReaderOptions to change the size of InputStreams' buffers." This reverts commit c31b3ed204ba01ad3835d963ec2eb8343fd2981e. --- c++/include/orc/Reader.hh | 11 ---------- c++/src/Reader.cc | 44 +++++++++++++-------------------------- 2 files changed, 14 insertions(+), 41 deletions(-) diff --git a/c++/include/orc/Reader.hh b/c++/include/orc/Reader.hh index 42209ac365..14921b08f4 100644 --- a/c++/include/orc/Reader.hh +++ b/c++/include/orc/Reader.hh @@ -542,17 +542,6 @@ namespace orc { * Get the serialized file tail that the user passed in. */ std::string getSerializedFileTail() const; - - /** - * Set the block size for seekable input streams - */ - ReaderOptions& setStreamBlockSize(uint64_t blocksize); - - /** - * Get the block size for seekable input streams - */ - uint64_t getStreamBlockSize(); - uint64_t getStreamBlockSize() const; }; /** diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc index ac18b900ed..85f629f905 100644 --- a/c++/src/Reader.cc +++ b/c++/src/Reader.cc @@ -48,7 +48,6 @@ namespace orc { std::ostream* errorStream; MemoryPool* memoryPool; std::string serializedTail; - uint64_t streamBlockSize; ReaderOptionsPrivate() { includedColumns.assign(1,0); @@ -59,7 +58,6 @@ namespace orc { forcedScaleOnHive11Decimal = 6; errorStream = &std::cerr; memoryPool = getDefaultPool(); - streamBlockSize = 1024*1024; } }; @@ -178,19 +176,6 @@ namespace orc { return privateBits->serializedTail; } - ReaderOptions& ReaderOptions::setStreamBlockSize(uint64_t blocksize) { - privateBits->streamBlockSize = blocksize; - return *this; - } - - uint64_t ReaderOptions::getStreamBlockSize() { - return privateBits->streamBlockSize; - } - - uint64_t ReaderOptions::getStreamBlockSize() const { - return privateBits->streamBlockSize; - } - StripeInformation::~StripeInformation() { } @@ -1326,18 +1311,17 @@ namespace orc { info.datalength(); uint64_t stripeFooterLength = info.footerlength(); std::unique_ptr pbStream = - createDecompressor( - compression, - std::unique_ptr - (new SeekableFileInputStream( - stream.get(), - stripeFooterStart, - stripeFooterLength, - memoryPool, - std::max(blockSize, options.getStreamBlockSize()) - )), - blockSize, - memoryPool); + createDecompressor(compression, + std::unique_ptr + (new SeekableFileInputStream(stream.get(), + stripeFooterStart, + stripeFooterLength, + memoryPool, + static_cast + (blockSize) + )), + blockSize, + memoryPool); proto::StripeFooter result; if (!result.ParseFromZeroCopyStream(pbStream.get())) { throw ParseError(std::string("bad StripeFooter from ") + @@ -1416,9 +1400,9 @@ namespace orc { if (stream.has_kind() && stream.kind() == kind && stream.column() == static_cast(columnId)) { - int64_t myBlock = shouldStream ? - reader.getReaderOptions().getStreamBlockSize() : - static_cast(stream.length()); + int64_t myBlock = static_cast(shouldStream ? + 1024 * 1024 : + stream.length()); return createDecompressor(reader.getCompression(), std::unique_ptr (new SeekableFileInputStream