From 8971cca55bad96c23f7f31ca2b0f8070a726d1ba Mon Sep 17 00:00:00 2001 From: Owen O'Malley Date: Fri, 24 Jul 2015 12:11:49 -0700 Subject: [PATCH] ORC-22. Fixes #3 - Allow InputStreams to set the natural read size for their underlying file system. I've set the local file system to use 128k, but hdfs and webhdfs would both have substantially larger. --- c++/include/orc/OrcFile.hh | 6 ++++++ c++/src/Compression.cc | 16 +++++++--------- c++/src/Compression.hh | 6 +++--- c++/src/OrcFile.cc | 4 ++++ c++/src/Reader.cc | 10 +++------- tools/test/TestReader.cc | 1 + 6 files changed, 24 insertions(+), 19 deletions(-) diff --git a/c++/include/orc/OrcFile.hh b/c++/include/orc/OrcFile.hh index a5371515ad..f8f13fb673 100644 --- a/c++/include/orc/OrcFile.hh +++ b/c++/include/orc/OrcFile.hh @@ -42,6 +42,12 @@ namespace orc { */ virtual uint64_t getLength() const = 0; + /** + * Get the natural size for reads. + * @return the number of bytes that should be read at once + */ + virtual uint64_t getNaturalReadSize() const = 0; + /** * Read length bytes from the file starting at offset into * the buffer starting at buf. diff --git a/c++/src/Compression.cc b/c++/src/Compression.cc index 81b2c1c8b0..a1b05dd5f1 100644 --- a/c++/src/Compression.cc +++ b/c++/src/Compression.cc @@ -69,20 +69,20 @@ namespace orc { SeekableArrayInputStream::SeekableArrayInputStream (const unsigned char* values, uint64_t size, - int64_t blkSize + uint64_t blkSize ): data(reinterpret_cast(values)) { length = size; position = 0; - blockSize = blkSize == -1 ? length : static_cast(blkSize); + blockSize = blkSize == 0 ? length : static_cast(blkSize); } SeekableArrayInputStream::SeekableArrayInputStream(const char* values, uint64_t size, - int64_t blkSize + uint64_t blkSize ): data(values) { length = size; position = 0; - blockSize = blkSize == -1 ? length : static_cast(blkSize); + blockSize = blkSize == 0 ? length : static_cast(blkSize); } bool SeekableArrayInputStream::Next(const void** buffer, int*size) { @@ -135,17 +135,15 @@ namespace orc { return result.str(); } - static uint64_t computeBlock(int64_t request, uint64_t length) { - return std::min(length, - static_cast(request < 0 ? - 256 * 1024 : request)); + static uint64_t computeBlock(uint64_t request, uint64_t length) { + return std::min(length, request == 0 ? 256 * 1024 : request); } SeekableFileInputStream::SeekableFileInputStream(InputStream* stream, uint64_t offset, uint64_t byteCount, MemoryPool& _pool, - int64_t _blockSize + uint64_t _blockSize ):pool(_pool), input(stream), start(offset), diff --git a/c++/src/Compression.hh b/c++/src/Compression.hh index 2c02584c60..efd374a6f5 100644 --- a/c++/src/Compression.hh +++ b/c++/src/Compression.hh @@ -70,10 +70,10 @@ namespace orc { public: SeekableArrayInputStream(const unsigned char* list, uint64_t length, - int64_t block_size = -1); + uint64_t block_size = 0); SeekableArrayInputStream(const char* list, uint64_t length, - int64_t block_size = -1); + uint64_t block_size = 0); virtual ~SeekableArrayInputStream(); virtual bool Next(const void** data, int*size) override; virtual void BackUp(int count) override; @@ -102,7 +102,7 @@ namespace orc { uint64_t offset, uint64_t byteCount, MemoryPool& pool, - int64_t blockSize = -1); + uint64_t blockSize = 0); virtual ~SeekableFileInputStream(); virtual bool Next(const void** data, int*size) override; diff --git a/c++/src/OrcFile.cc b/c++/src/OrcFile.cc index f8c22c42c0..d5d00fd0db 100644 --- a/c++/src/OrcFile.cc +++ b/c++/src/OrcFile.cc @@ -57,6 +57,10 @@ namespace orc { return totalLength; } + uint64_t getNaturalReadSize() const override { + return 128 * 1024; + } + void read(void* buf, uint64_t length, uint64_t offset) override { diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc index 85f629f905..684b9e4cc9 100644 --- a/c++/src/Reader.cc +++ b/c++/src/Reader.cc @@ -1316,10 +1316,7 @@ namespace orc { (new SeekableFileInputStream(stream.get(), stripeFooterStart, stripeFooterLength, - memoryPool, - static_cast - (blockSize) - )), + memoryPool)), blockSize, memoryPool); proto::StripeFooter result; @@ -1400,9 +1397,8 @@ namespace orc { if (stream.has_kind() && stream.kind() == kind && stream.column() == static_cast(columnId)) { - int64_t myBlock = static_cast(shouldStream ? - 1024 * 1024 : - stream.length()); + uint64_t myBlock = shouldStream ? input.getNaturalReadSize(): + stream.length(); return createDecompressor(reader.getCompression(), std::unique_ptr (new SeekableFileInputStream diff --git a/tools/test/TestReader.cc b/tools/test/TestReader.cc index 72aa5fd1cb..4d53a62a90 100644 --- a/tools/test/TestReader.cc +++ b/tools/test/TestReader.cc @@ -2914,6 +2914,7 @@ class MockInputStream: public InputStream { MOCK_CONST_METHOD0(getLength, uint64_t()); MOCK_CONST_METHOD0(getName, const std::string&()); MOCK_METHOD3(read, void (void*, uint64_t, uint64_t)); + MOCK_CONST_METHOD0(getNaturalReadSize, uint64_t()); }; MockInputStream::~MockInputStream() {