Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 7 additions & 27 deletions c++/include/orc/OrcFile.hh
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,6 @@

namespace orc {

/**
* An abstract interface for a buffer provided by the input stream.
*/
class Buffer {
public:
virtual ~Buffer();

/**
* Get the start of the buffer.
*/
virtual char *getStart() const = 0;

/**
* Get the length of the buffer in bytes.
*/
virtual uint64_t getLength() const = 0;
};

/**
* An abstract interface for providing ORC readers a stream of bytes.
*/
Expand All @@ -62,16 +44,14 @@ namespace orc {

/**
* Read length bytes from the file starting at offset into
* the buffer.
* @param offset the position in the file to read from
* @param length the number of bytes to read
* @param buffer a Buffer to reuse from a previous call to read. Ownership
* of this buffer passes to the InputStream object.
* @return the buffer with the requested data. The client owns the Buffer.
* the buffer starting at buf.
* @param buf the starting position of a buffer.
* @param length the number of bytes to read.
* @param offset the position in the stream to read from.
*/
virtual Buffer* read(uint64_t offset,
uint64_t length,
Buffer* buffer) = 0;
virtual void read(void* buf,
uint64_t length,
uint64_t offset) = 0;

/**
* Get the name of the stream for error messages.
Expand Down
41 changes: 16 additions & 25 deletions c++/src/orc/Compression.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,6 @@ namespace orc {
// PASS
}

#ifdef ORC_CXX_HAS_INITIALIZER_LIST
SeekableArrayInputStream::SeekableArrayInputStream
(std::initializer_list<unsigned char> values,
int64_t blkSize
):ownedData(new DataBuffer<char>(*getDefaultPool(), values.size())),
data(0) {
length = values.size();
memcpy(ownedData->data(), values.begin(), values.size());
position = 0;
blockSize = blkSize == -1 ? length : static_cast<uint64_t>(blkSize);
}
#endif

SeekableArrayInputStream::SeekableArrayInputStream
(const unsigned char* values,
uint64_t size,
Expand All @@ -102,7 +89,7 @@ namespace orc {
bool SeekableArrayInputStream::Next(const void** buffer, int*size) {
uint64_t currentSize = std::min(length - position, blockSize);
if (currentSize > 0) {
*buffer = (data ? data : ownedData->data()) + position;
*buffer = data + position;
*size = static_cast<int>(currentSize);
position += currentSize;
return true;
Expand Down Expand Up @@ -158,32 +145,36 @@ namespace orc {
SeekableFileInputStream::SeekableFileInputStream(InputStream* stream,
uint64_t offset,
uint64_t byteCount,
MemoryPool& _pool,
int64_t _blockSize
): input(stream),
start(offset),
length(byteCount),
blockSize(computeBlock
(_blockSize,
length)) {
):pool(_pool),
input(stream),
start(offset),
length(byteCount),
blockSize(computeBlock
(_blockSize,
length)) {

position = 0;
buffer = nullptr;
buffer.reset(new DataBuffer<char>(pool));
pushBack = 0;
}

SeekableFileInputStream::~SeekableFileInputStream() {
delete buffer;
// PASS
}

bool SeekableFileInputStream::Next(const void** data, int*size) {
uint64_t bytesRead;
if (pushBack != 0) {
*data = buffer->getStart() + (buffer->getLength() - pushBack);
*data = buffer->data() + (buffer->size() - pushBack);
bytesRead = pushBack;
} else {
bytesRead = std::min(length - position, blockSize);
buffer->resize(bytesRead);
if (bytesRead > 0) {
buffer = input->read(start + position, bytesRead, buffer);
*data = static_cast<void*>(buffer->getStart());
input->read(buffer->data(), bytesRead, start+position);
*data = static_cast<void*>(buffer->data());
}
}
position += bytesRead;
Expand Down
11 changes: 3 additions & 8 deletions c++/src/orc/Compression.hh
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,12 @@ namespace orc {
*/
class SeekableArrayInputStream: public SeekableInputStream {
private:
std::unique_ptr<DataBuffer<char> > ownedData;
const char* data;
uint64_t length;
uint64_t position;
uint64_t blockSize;

public:

#ifdef ORC_CXX_HAS_INITIALIZER_LIST
SeekableArrayInputStream(std::initializer_list<unsigned char> list,
int64_t block_size = -1);
#endif

SeekableArrayInputStream(const unsigned char* list,
uint64_t length,
int64_t block_size = -1);
Expand All @@ -94,18 +87,20 @@ namespace orc {
*/
class SeekableFileInputStream: public SeekableInputStream {
private:
MemoryPool& pool;
InputStream* const input;
const uint64_t start;
const uint64_t length;
const uint64_t blockSize;
Buffer* buffer;
std::unique_ptr<DataBuffer<char> > buffer;
uint64_t position;
uint64_t pushBack;

public:
SeekableFileInputStream(InputStream* input,
uint64_t offset,
uint64_t byteCount,
MemoryPool& pool,
int64_t blockSize = -1);
virtual ~SeekableFileInputStream();

Expand Down
151 changes: 7 additions & 144 deletions c++/src/orc/OrcFile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,36 +30,6 @@

namespace orc {

Buffer::~Buffer() {
// PASS
}

class HeapBuffer: public Buffer {
private:
char* start;
uint64_t length;

public:
HeapBuffer(uint64_t size) {
start = new char[size];
length = size;
}

virtual ~HeapBuffer();

virtual char *getStart() const override {
return start;
}

virtual uint64_t getLength() const override {
return length;
}
};

HeapBuffer::~HeapBuffer() {
delete[] start;
}

class FileInputStream : public InputStream {
private:
std::string filename ;
Expand All @@ -86,24 +56,20 @@ namespace orc {
return totalLength;
}

Buffer* read(uint64_t offset,
uint64_t length,
Buffer* buffer) override {
if (buffer == nullptr) {
buffer = new HeapBuffer(length);
} else if (buffer->getLength() < length) {
delete buffer;
buffer = new HeapBuffer(length);
void read(void* buf,
uint64_t length,
uint64_t offset) override {
if (!buf) {
throw ParseError("Buffer is null");
}
ssize_t bytesRead = pread(file, buffer->getStart(), length,
static_cast<off_t>(offset));
ssize_t bytesRead = pread(file, buf, length, static_cast<off_t>(offset));

if (bytesRead == -1) {
throw ParseError("Bad read of " + filename);
}
if (static_cast<uint64_t>(bytesRead) != length) {
throw ParseError("Short read of " + filename);
}
return buffer;
}

const std::string& getName() const override {
Expand All @@ -115,109 +81,6 @@ namespace orc {
close(file);
}

/**
* A buffer for use with an memmapped file where the Buffer doesn't own
* the memory that it references.
*/
class MmapBuffer: public Buffer {
private:
char* start;
uint64_t length;

public:
MmapBuffer(): start(nullptr), length(0) {
// PASS
}

virtual ~MmapBuffer();

void reset(char *_start, uint64_t _length) {
start = _start;
length = _length;
}

virtual char *getStart() const override {
return start;
}

virtual uint64_t getLength() const override {
return length;
}
};

MmapBuffer::~MmapBuffer() {
// PASS
}

/**
* An InputStream implementation that uses memory mapping to read the
* local file.
*/
class MmapInputStream : public InputStream {
private:
std::string filename ;
char* start;
uint64_t totalLength;

public:
MmapInputStream(std::string _filename);
~MmapInputStream();

uint64_t getLength() const override {
return totalLength;
}

const std::string& getName() const override {
return filename;
}

Buffer* read(uint64_t offset,
uint64_t length,
Buffer* buffer) override;
};

MmapInputStream::MmapInputStream(std::string _filename) {
filename = _filename ;
int file = open(filename.c_str(), O_RDONLY);
if (file == -1) {
throw ParseError("Can't open " + filename);
}
struct stat fileStat;
if (fstat(file, &fileStat) == -1) {
throw ParseError("Can't stat " + filename);
}
totalLength = static_cast<uint64_t>(fileStat.st_size);
start = static_cast<char*>(mmap(nullptr, totalLength, PROT_READ,
MAP_FILE|MAP_PRIVATE,
file, 0LL));
if (start == MAP_FAILED) {
throw std::runtime_error("mmap failed " + filename + " " +
strerror(errno));
}
close(file);
}

MmapInputStream::~MmapInputStream() {
int64_t result = munmap(reinterpret_cast<void*>(start), totalLength);
if (result != 0) {
throw std::runtime_error("Failed to unmap " + filename + " - " +
strerror(errno));
}
}

Buffer* MmapInputStream::read(uint64_t offset,
uint64_t length,
Buffer* buffer) {
if (buffer == nullptr) {
buffer = new MmapBuffer();
}
if (offset + length > totalLength) {
throw std::runtime_error("Read past end of file " + filename);
}
dynamic_cast<MmapBuffer*>(buffer)->reset(start + offset, length);
return buffer;
}

std::unique_ptr<InputStream> readLocalFile(const std::string& path) {
return std::unique_ptr<InputStream>(new FileInputStream(path));
}
Expand Down
Loading