From bcbb8f988e0f3eb72029870f67e0fe31c6b90098 Mon Sep 17 00:00:00 2001 From: Aliaksei Sandryhaila Date: Mon, 6 Jul 2015 10:15:29 -0700 Subject: [PATCH] Replaced Buffer with DataBuffer and converted InputStream::read() method to posix style. --- c++/include/orc/OrcFile.hh | 34 ++----- c++/src/orc/Compression.cc | 41 ++++----- c++/src/orc/Compression.hh | 11 +-- c++/src/orc/OrcFile.cc | 151 ++------------------------------ c++/src/orc/Reader.cc | 37 ++++---- c++/test/orc/TestCompression.cc | 8 +- tools-c++/test/TestReader.cc | 2 +- 7 files changed, 59 insertions(+), 225 deletions(-) diff --git a/c++/include/orc/OrcFile.hh b/c++/include/orc/OrcFile.hh index 0a4f4cad1f..a5371515ad 100644 --- a/c++/include/orc/OrcFile.hh +++ b/c++/include/orc/OrcFile.hh @@ -30,24 +30,6 @@ namespace orc { - /** - * An abstract interface for a buffer provided by the input stream. - */ - class Buffer { - public: - virtual ~Buffer(); - - /** - * Get the start of the buffer. - */ - virtual char *getStart() const = 0; - - /** - * Get the length of the buffer in bytes. - */ - virtual uint64_t getLength() const = 0; - }; - /** * An abstract interface for providing ORC readers a stream of bytes. */ @@ -62,16 +44,14 @@ namespace orc { /** * Read length bytes from the file starting at offset into - * the buffer. - * @param offset the position in the file to read from - * @param length the number of bytes to read - * @param buffer a Buffer to reuse from a previous call to read. Ownership - * of this buffer passes to the InputStream object. - * @return the buffer with the requested data. The client owns the Buffer. + * the buffer starting at buf. + * @param buf the starting position of a buffer. + * @param length the number of bytes to read. + * @param offset the position in the stream to read from. */ - virtual Buffer* read(uint64_t offset, - uint64_t length, - Buffer* buffer) = 0; + virtual void read(void* buf, + uint64_t length, + uint64_t offset) = 0; /** * Get the name of the stream for error messages. diff --git a/c++/src/orc/Compression.cc b/c++/src/orc/Compression.cc index bc1c11a2e2..06e10e0f6d 100644 --- a/c++/src/orc/Compression.cc +++ b/c++/src/orc/Compression.cc @@ -67,19 +67,6 @@ namespace orc { // PASS } - #ifdef ORC_CXX_HAS_INITIALIZER_LIST - SeekableArrayInputStream::SeekableArrayInputStream - (std::initializer_list values, - int64_t blkSize - ):ownedData(new DataBuffer(*getDefaultPool(), values.size())), - data(0) { - length = values.size(); - memcpy(ownedData->data(), values.begin(), values.size()); - position = 0; - blockSize = blkSize == -1 ? length : static_cast(blkSize); - } - #endif - SeekableArrayInputStream::SeekableArrayInputStream (const unsigned char* values, uint64_t size, @@ -102,7 +89,7 @@ namespace orc { bool SeekableArrayInputStream::Next(const void** buffer, int*size) { uint64_t currentSize = std::min(length - position, blockSize); if (currentSize > 0) { - *buffer = (data ? data : ownedData->data()) + position; + *buffer = data + position; *size = static_cast(currentSize); position += currentSize; return true; @@ -158,32 +145,36 @@ namespace orc { SeekableFileInputStream::SeekableFileInputStream(InputStream* stream, uint64_t offset, uint64_t byteCount, + MemoryPool& _pool, int64_t _blockSize - ): input(stream), - start(offset), - length(byteCount), - blockSize(computeBlock - (_blockSize, - length)) { + ):pool(_pool), + input(stream), + start(offset), + length(byteCount), + blockSize(computeBlock + (_blockSize, + length)) { + position = 0; - buffer = nullptr; + buffer.reset(new DataBuffer(pool)); pushBack = 0; } SeekableFileInputStream::~SeekableFileInputStream() { - delete buffer; + // PASS } bool SeekableFileInputStream::Next(const void** data, int*size) { uint64_t bytesRead; if (pushBack != 0) { - *data = buffer->getStart() + (buffer->getLength() - pushBack); + *data = buffer->data() + (buffer->size() - pushBack); bytesRead = pushBack; } else { bytesRead = std::min(length - position, blockSize); + buffer->resize(bytesRead); if (bytesRead > 0) { - buffer = input->read(start + position, bytesRead, buffer); - *data = static_cast(buffer->getStart()); + input->read(buffer->data(), bytesRead, start+position); + *data = static_cast(buffer->data()); } } position += bytesRead; diff --git a/c++/src/orc/Compression.hh b/c++/src/orc/Compression.hh index ac9c0949eb..222dc549eb 100644 --- a/c++/src/orc/Compression.hh +++ b/c++/src/orc/Compression.hh @@ -61,19 +61,12 @@ namespace orc { */ class SeekableArrayInputStream: public SeekableInputStream { private: - std::unique_ptr > ownedData; const char* data; uint64_t length; uint64_t position; uint64_t blockSize; public: - - #ifdef ORC_CXX_HAS_INITIALIZER_LIST - SeekableArrayInputStream(std::initializer_list list, - int64_t block_size = -1); - #endif - SeekableArrayInputStream(const unsigned char* list, uint64_t length, int64_t block_size = -1); @@ -94,11 +87,12 @@ namespace orc { */ class SeekableFileInputStream: public SeekableInputStream { private: + MemoryPool& pool; InputStream* const input; const uint64_t start; const uint64_t length; const uint64_t blockSize; - Buffer* buffer; + std::unique_ptr > buffer; uint64_t position; uint64_t pushBack; @@ -106,6 +100,7 @@ namespace orc { SeekableFileInputStream(InputStream* input, uint64_t offset, uint64_t byteCount, + MemoryPool& pool, int64_t blockSize = -1); virtual ~SeekableFileInputStream(); diff --git a/c++/src/orc/OrcFile.cc b/c++/src/orc/OrcFile.cc index 9e850de640..9ae9c56655 100644 --- a/c++/src/orc/OrcFile.cc +++ b/c++/src/orc/OrcFile.cc @@ -30,36 +30,6 @@ namespace orc { - Buffer::~Buffer() { - // PASS - } - - class HeapBuffer: public Buffer { - private: - char* start; - uint64_t length; - - public: - HeapBuffer(uint64_t size) { - start = new char[size]; - length = size; - } - - virtual ~HeapBuffer(); - - virtual char *getStart() const override { - return start; - } - - virtual uint64_t getLength() const override { - return length; - } - }; - - HeapBuffer::~HeapBuffer() { - delete[] start; - } - class FileInputStream : public InputStream { private: std::string filename ; @@ -86,24 +56,20 @@ namespace orc { return totalLength; } - Buffer* read(uint64_t offset, - uint64_t length, - Buffer* buffer) override { - if (buffer == nullptr) { - buffer = new HeapBuffer(length); - } else if (buffer->getLength() < length) { - delete buffer; - buffer = new HeapBuffer(length); + void read(void* buf, + uint64_t length, + uint64_t offset) override { + if (!buf) { + throw ParseError("Buffer is null"); } - ssize_t bytesRead = pread(file, buffer->getStart(), length, - static_cast(offset)); + ssize_t bytesRead = pread(file, buf, length, static_cast(offset)); + if (bytesRead == -1) { throw ParseError("Bad read of " + filename); } if (static_cast(bytesRead) != length) { throw ParseError("Short read of " + filename); } - return buffer; } const std::string& getName() const override { @@ -115,109 +81,6 @@ namespace orc { close(file); } - /** - * A buffer for use with an memmapped file where the Buffer doesn't own - * the memory that it references. - */ - class MmapBuffer: public Buffer { - private: - char* start; - uint64_t length; - - public: - MmapBuffer(): start(nullptr), length(0) { - // PASS - } - - virtual ~MmapBuffer(); - - void reset(char *_start, uint64_t _length) { - start = _start; - length = _length; - } - - virtual char *getStart() const override { - return start; - } - - virtual uint64_t getLength() const override { - return length; - } - }; - - MmapBuffer::~MmapBuffer() { - // PASS - } - - /** - * An InputStream implementation that uses memory mapping to read the - * local file. - */ - class MmapInputStream : public InputStream { - private: - std::string filename ; - char* start; - uint64_t totalLength; - - public: - MmapInputStream(std::string _filename); - ~MmapInputStream(); - - uint64_t getLength() const override { - return totalLength; - } - - const std::string& getName() const override { - return filename; - } - - Buffer* read(uint64_t offset, - uint64_t length, - Buffer* buffer) override; - }; - - MmapInputStream::MmapInputStream(std::string _filename) { - filename = _filename ; - int file = open(filename.c_str(), O_RDONLY); - if (file == -1) { - throw ParseError("Can't open " + filename); - } - struct stat fileStat; - if (fstat(file, &fileStat) == -1) { - throw ParseError("Can't stat " + filename); - } - totalLength = static_cast(fileStat.st_size); - start = static_cast(mmap(nullptr, totalLength, PROT_READ, - MAP_FILE|MAP_PRIVATE, - file, 0LL)); - if (start == MAP_FAILED) { - throw std::runtime_error("mmap failed " + filename + " " + - strerror(errno)); - } - close(file); - } - - MmapInputStream::~MmapInputStream() { - int64_t result = munmap(reinterpret_cast(start), totalLength); - if (result != 0) { - throw std::runtime_error("Failed to unmap " + filename + " - " + - strerror(errno)); - } - } - - Buffer* MmapInputStream::read(uint64_t offset, - uint64_t length, - Buffer* buffer) { - if (buffer == nullptr) { - buffer = new MmapBuffer(); - } - if (offset + length > totalLength) { - throw std::runtime_error("Read past end of file " + filename); - } - dynamic_cast(buffer)->reset(start + offset, length); - return buffer; - } - std::unique_ptr readLocalFile(const std::string& path) { return std::unique_ptr(new FileInputStream(path)); } diff --git a/c++/src/orc/Reader.cc b/c++/src/orc/Reader.cc index 37dd3e070e..2343e91f89 100644 --- a/c++/src/orc/Reader.cc +++ b/c++/src/orc/Reader.cc @@ -1232,7 +1232,8 @@ namespace orc { std::unique_ptr (new SeekableFileInputStream(stream.get(), metadataStart, - metadataSize)), + metadataSize, + memoryPool)), blockSize, memoryPool); metadata.reset(new proto::Metadata()); @@ -1314,6 +1315,7 @@ namespace orc { (new SeekableFileInputStream(stream.get(), stripeFooterStart, stripeFooterLength, + memoryPool, static_cast (blockSize) )), @@ -1406,6 +1408,7 @@ namespace orc { (&input, offset, stream.length(), + memoryPool, myBlock)), reader.getCompressionSize(), memoryPool); @@ -1553,13 +1556,13 @@ namespace orc { } void ensureOrcFooter(InputStream* stream, - Buffer *buffer, + DataBuffer *buffer, uint64_t postscriptLength) { const std::string MAGIC("ORC"); const uint64_t magicLength = MAGIC.length(); - const char * const bufferStart = buffer->getStart(); - const uint64_t bufferLength = buffer->getLength(); + const char * const bufferStart = buffer->data(); + const uint64_t bufferLength = buffer->size(); if (postscriptLength < magicLength || bufferLength < magicLength) { throw ParseError("Invalid ORC postscript length"); @@ -1570,10 +1573,10 @@ namespace orc { if (memcmp(magicStart, MAGIC.c_str(), magicLength) != 0) { // If there is no magic string at the end, check the beginning. // Only files written by Hive 0.11.0 don't have the tail ORC string. - Buffer *frontBuffer = stream->read(0, magicLength, nullptr); - bool foundMatch = - memcmp(frontBuffer->getStart(), MAGIC.c_str(), magicLength) == 0; - delete frontBuffer; + char *frontBuffer = new char[magicLength]; + stream->read(frontBuffer, magicLength, 0); + bool foundMatch = memcmp(frontBuffer, MAGIC.c_str(), magicLength) == 0; + delete[] frontBuffer; if (!foundMatch) { throw ParseError("Not an ORC file"); } @@ -1587,10 +1590,10 @@ namespace orc { * @param postscriptSize the length of postscript in bytes */ std::unique_ptr readPostscript(InputStream *stream, - Buffer *buffer, + DataBuffer *buffer, uint64_t postscriptSize) { - char *ptr = buffer->getStart(); - uint64_t readSize = buffer->getLength(); + char *ptr = buffer->data(); + uint64_t readSize = buffer->size(); ensureOrcFooter(stream, buffer, postscriptSize); @@ -1613,11 +1616,11 @@ namespace orc { * @param memoryPool the memory pool to use */ std::unique_ptr readFooter(InputStream* stream, - Buffer *&buffer, + DataBuffer *&buffer, uint64_t footerOffset, const proto::PostScript& ps, MemoryPool& memoryPool) { - char *footerPtr = buffer->getStart() + footerOffset; + char *footerPtr = buffer->data() + footerOffset; std::unique_ptr pbStream = createDecompressor(convertCompressionKind(ps), @@ -1662,9 +1665,10 @@ namespace orc { if (readSize < 4) { throw ParseError("File size too small"); } - Buffer *buffer = stream->read(size - readSize, readSize, nullptr); + DataBuffer *buffer = new DataBuffer(*memoryPool, readSize); + stream->read(buffer->data(), readSize, size - readSize); - uint64_t postscriptSize = buffer->getStart()[readSize - 1] & 0xff; + uint64_t postscriptSize = buffer->data()[readSize - 1] & 0xff; ps = readPostscript(stream.get(), buffer, postscriptSize); uint64_t footerSize = ps->footerlength(); uint64_t tailSize = 1 + postscriptSize + footerSize; @@ -1672,7 +1676,8 @@ namespace orc { uint64_t footerOffset; if (tailSize > readSize) { - buffer = stream->read(size - tailSize, footerSize, buffer); + buffer->resize(footerSize); + stream->read(buffer->data(), footerSize, size - tailSize); footerOffset = 0; } else { footerOffset = readSize - tailSize; diff --git a/c++/test/orc/TestCompression.cc b/c++/test/orc/TestCompression.cc index 465f741fb0..4b4f13f0cc 100644 --- a/c++/test/orc/TestCompression.cc +++ b/c++/test/orc/TestCompression.cc @@ -204,7 +204,7 @@ namespace orc { TEST_F(TestCompression, testFileBackup) { SCOPED_TRACE("testFileBackup"); std::unique_ptr file = readLocalFile(simpleFile); - SeekableFileInputStream stream(file.get(), 0, 200, 20); + SeekableFileInputStream stream(file.get(), 0, 200, *getDefaultPool(), 20); const void *ptr; int len; ASSERT_THROW(stream.BackUp(10), std::logic_error); @@ -235,7 +235,7 @@ namespace orc { TEST_F(TestCompression, testFileSkip) { SCOPED_TRACE("testFileSkip"); std::unique_ptr file = readLocalFile(simpleFile); - SeekableFileInputStream stream(file.get(), 0, 200, 20); + SeekableFileInputStream stream(file.get(), 0, 200, *getDefaultPool(), 20); const void *ptr; int len; ASSERT_EQ(true, stream.Next(&ptr, &len)); @@ -255,7 +255,7 @@ namespace orc { TEST_F(TestCompression, testFileCombo) { SCOPED_TRACE("testFileCombo"); std::unique_ptr file = readLocalFile(simpleFile); - SeekableFileInputStream stream(file.get(), 0, 200, 20); + SeekableFileInputStream stream(file.get(), 0, 200, *getDefaultPool(), 20); const void *ptr; int len; ASSERT_EQ(true, stream.Next(&ptr, &len)); @@ -275,7 +275,7 @@ namespace orc { TEST_F(TestCompression, testFileSeek) { SCOPED_TRACE("testFileSeek"); std::unique_ptr file = readLocalFile(simpleFile); - SeekableFileInputStream stream(file.get(), 0, 200, 20); + SeekableFileInputStream stream(file.get(), 0, 200, *getDefaultPool(), 20); const void *ptr; int len; EXPECT_EQ(0, stream.ByteCount()); diff --git a/tools-c++/test/TestReader.cc b/tools-c++/test/TestReader.cc index aae0f1da18..4f82d8af3f 100644 --- a/tools-c++/test/TestReader.cc +++ b/tools-c++/test/TestReader.cc @@ -2913,7 +2913,7 @@ class MockInputStream: public InputStream { ~MockInputStream(); MOCK_CONST_METHOD0(getLength, uint64_t()); MOCK_CONST_METHOD0(getName, const std::string&()); - MOCK_METHOD3(read, Buffer* (uint64_t, uint64_t, Buffer*)); + MOCK_METHOD3(read, void (void*, uint64_t, uint64_t)); }; MockInputStream::~MockInputStream() {