Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions be/src/olap/byte_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,23 @@

namespace doris {

ByteBuffer::ByteBuffer() :
StorageByteBuffer::StorageByteBuffer() :
_array(NULL),
_capacity(0),
_limit(0),
_position(0),
_is_mmap(false) {}

ByteBuffer::BufDeleter::BufDeleter() :
StorageByteBuffer::BufDeleter::BufDeleter() :
_is_mmap(false),
_mmap_length(0) {}

void ByteBuffer::BufDeleter::set_mmap(size_t mmap_length) {
void StorageByteBuffer::BufDeleter::set_mmap(size_t mmap_length) {
_is_mmap = true;
_mmap_length = mmap_length;
}

void ByteBuffer::BufDeleter::operator()(char* p) {
void StorageByteBuffer::BufDeleter::operator()(char* p) {
if (NULL == p) {
return;
}
Expand All @@ -53,9 +53,9 @@ void ByteBuffer::BufDeleter::operator()(char* p) {
}

// 创建ByteBuffer与array
ByteBuffer* ByteBuffer::create(uint64_t capacity) {
StorageByteBuffer* StorageByteBuffer::create(uint64_t capacity) {
char* memory = new(std::nothrow) char[capacity];
ByteBuffer* buf = new(std::nothrow) ByteBuffer;
StorageByteBuffer* buf = new(std::nothrow) StorageByteBuffer;

if (buf != NULL && memory != NULL) {
buf->_buf = boost::shared_ptr<char>(memory, BufDeleter());
Expand All @@ -70,7 +70,7 @@ ByteBuffer* ByteBuffer::create(uint64_t capacity) {
return NULL;
}

ByteBuffer* ByteBuffer::reference_buffer(ByteBuffer* reference,
StorageByteBuffer* StorageByteBuffer::reference_buffer(StorageByteBuffer* reference,
uint64_t offset,
uint64_t length) {
if (NULL == reference || 0 == length) {
Expand All @@ -81,7 +81,7 @@ ByteBuffer* ByteBuffer::reference_buffer(ByteBuffer* reference,
return NULL;
}

ByteBuffer* buf = new(std::nothrow) ByteBuffer();
StorageByteBuffer* buf = new(std::nothrow) StorageByteBuffer();

if (NULL == buf) {
return NULL;
Expand All @@ -96,7 +96,7 @@ ByteBuffer* ByteBuffer::reference_buffer(ByteBuffer* reference,
return buf;
}

ByteBuffer* ByteBuffer::mmap(void* start, uint64_t length, int prot, int flags,
StorageByteBuffer* StorageByteBuffer::mmap(void* start, uint64_t length, int prot, int flags,
int fd, uint64_t offset) {
char* memory = (char*)::mmap(start, length, prot, flags, fd, offset);

Expand All @@ -108,11 +108,11 @@ ByteBuffer* ByteBuffer::mmap(void* start, uint64_t length, int prot, int flags,
BufDeleter deleter;
deleter.set_mmap(length);

ByteBuffer* buf = new(std::nothrow) ByteBuffer();
StorageByteBuffer* buf = new(std::nothrow) StorageByteBuffer();

if (NULL == buf) {
deleter(memory);
OLAP_LOG_WARNING("fail to allocate ByteBuffer.");
OLAP_LOG_WARNING("fail to allocate StorageByteBuffer.");
return NULL;
}

Expand All @@ -124,7 +124,7 @@ ByteBuffer* ByteBuffer::mmap(void* start, uint64_t length, int prot, int flags,
return buf;
}

ByteBuffer* ByteBuffer::mmap(FileHandler* handler, uint64_t offset, int prot, int flags) {
StorageByteBuffer* StorageByteBuffer::mmap(FileHandler* handler, uint64_t offset, int prot, int flags) {
if (NULL == handler) {
OLAP_LOG_WARNING("invalid file handler");
return NULL;
Expand All @@ -142,11 +142,11 @@ ByteBuffer* ByteBuffer::mmap(FileHandler* handler, uint64_t offset, int prot, in
BufDeleter deleter;
deleter.set_mmap(length);

ByteBuffer* buf = new(std::nothrow) ByteBuffer();
StorageByteBuffer* buf = new(std::nothrow) StorageByteBuffer();

if (NULL == buf) {
deleter(memory);
OLAP_LOG_WARNING("fail to allocate ByteBuffer.");
OLAP_LOG_WARNING("fail to allocate StorageByteBuffer.");
return NULL;
}

Expand All @@ -158,7 +158,7 @@ ByteBuffer* ByteBuffer::mmap(FileHandler* handler, uint64_t offset, int prot, in
return buf;
}

OLAPStatus ByteBuffer::put(char src) {
OLAPStatus StorageByteBuffer::put(char src) {
if (_position < _limit) {
_array[_position++] = src;
return OLAP_SUCCESS;
Expand All @@ -167,7 +167,7 @@ OLAPStatus ByteBuffer::put(char src) {
return OLAP_ERR_BUFFER_OVERFLOW;
}

OLAPStatus ByteBuffer::put(uint64_t index, char src) {
OLAPStatus StorageByteBuffer::put(uint64_t index, char src) {
if (index < _limit) {
_array[index] = src;
return OLAP_SUCCESS;
Expand All @@ -176,7 +176,7 @@ OLAPStatus ByteBuffer::put(uint64_t index, char src) {
return OLAP_ERR_BUFFER_OVERFLOW;
}

OLAPStatus ByteBuffer::put(const char* src, uint64_t src_size, uint64_t offset,
OLAPStatus StorageByteBuffer::put(const char* src, uint64_t src_size, uint64_t offset,
uint64_t length) {
//没有足够的空间可以写
if (length > remaining()) {
Expand Down
22 changes: 11 additions & 11 deletions be/src/olap/byte_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,20 @@ namespace doris {
// limit - 最大使用限制, 这个值小于等于capacity, position始终小于limit
//
// ByteBuffer支持直接利用拷贝构造函数或者=操作符安全的进行数据的浅拷贝
class ByteBuffer {
class StorageByteBuffer {
public:
// 通过new方法创建一个容量为capacity的ByteBuffer.
// 通过new方法创建一个容量为capacity的StorageByteBuffer.
// 新buffer的position为0, limit为capacity
// 调用者获得新建的ByteBuffer的所有权,并需使用delete删除获得的ByteBuffer
// 调用者获得新建的ByteBuffer的所有权,并需使用delete删除获得的StorageByteBuffer
//
// TODO. 我认为这里create用法应该是直接返回ByteBuffer本身而不是?
// ??针,否则智能指针就无法发挥作用
// 目前内存的管理还是手动的。而且需要认为deleta。
static ByteBuffer* create(uint64_t capacity);
static StorageByteBuffer* create(uint64_t capacity);

// 通过引用另一个ByteBuffer的内存创建一个新的ByteBuffer
// 通过引用另一个ByteBuffer的内存创建一个新的StorageByteBuffer
// 新buffer的position为0, limit为length
// 调用者获得新建的ByteBuffer的所有权,并需使用delete删除获得的ByteBuffer
// 调用者获得新建的ByteBuffer的所有权,并需使用delete删除获得的StorageByteBuffer
// Inputs:
// - reference 引用的内存
// - offset 引用的Buffer在原ByteBuffer中的位置, 即&reference->array()[offset]
Expand All @@ -58,20 +58,20 @@ class ByteBuffer {
// offset + length < reference->capacity
//
// TODO. 同create
static ByteBuffer* reference_buffer(ByteBuffer* reference,
static StorageByteBuffer* reference_buffer(StorageByteBuffer* reference,
uint64_t offset,
uint64_t length);

// 通过mmap创建一个ByteBuffer, mmap成功后的内存由ByteBuffer托管
// start, length, prot, flags, fd, offset都是mmap函数的参数
// 调用者获得新建的ByteBuffer的所有权,并需使用delete删除获得的ByteBuffer
static ByteBuffer* mmap(void* start, uint64_t length, int prot, int flags,
// 调用者获得新建的ByteBuffer的所有权,并需使用delete删除获得的StorageByteBuffer
static StorageByteBuffer* mmap(void* start, uint64_t length, int prot, int flags,
int fd, uint64_t offset);

// 由于olap的文件都是用FileHandler封装的,因此稍微修?
// ??下接口,省略掉的参数可以在handler中取到
// 旧接口仍然保留,或许会用到?
static ByteBuffer* mmap(FileHandler* handler, uint64_t offset, int prot, int flags);
static StorageByteBuffer* mmap(FileHandler* handler, uint64_t offset, int prot, int flags);

inline uint64_t capacity() const {
return _capacity;
Expand Down Expand Up @@ -214,7 +214,7 @@ class ByteBuffer {
};
private:
// 不支持直接创建ByteBuffer, 而是通过create方法创建
ByteBuffer();
StorageByteBuffer();

private:
boost::shared_ptr<char> _buf; // 托管的内存
Expand Down
6 changes: 3 additions & 3 deletions be/src/olap/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ OLAPStatus StringColumnDictionaryReader::init(std::map<StreamName, ReadOnlyFileS
return OLAP_ERR_COLUMN_STREAM_NOT_EXIST;
}
if (dictionary_data_stream->stream_length() > 0) {
_dictionary_data_buffer = ByteBuffer::create(
_dictionary_data_buffer = StorageByteBuffer::create(
dictionary_data_stream->estimate_uncompressed_length());
size_t offset = 0;
size_t length = 0;
Expand Down Expand Up @@ -405,9 +405,9 @@ OLAPStatus StringColumnDictionaryReader::init(
size_t length_remain = 0;
size_t length_to_read = 0;
size_t read_buffer_size = 1024;
ByteBuffer* read_buffer = ByteBuffer::create(read_buffer_size);
StorageByteBuffer* read_buffer = StorageByteBuffer::create(read_buffer_size);
if (NULL == read_buffer) {
OLAP_LOG_WARNING("fail to malloc ByteBuffer");
OLAP_LOG_WARNING("fail to malloc StorageByteBuffer");
return OLAP_ERR_MALLOC_ERROR;
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ class StringColumnDictionaryReader {
char* _read_buffer;
//uint64_t _dictionary_size;
//uint64_t* _offset_dictionary; // 用来查找响应数据的数字对应的offset
//ByteBuffer* _dictionary_data_buffer; // 保存dict数据
//StorageByteBuffer* _dictionary_data_buffer; // 保存dict数据
std::vector<std::string> _dictionary;
RunLengthIntegerReader* _data_reader; // 用来读实际的数据(用一个integer表示)
};
Expand Down
8 changes: 4 additions & 4 deletions be/src/olap/compress.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

namespace doris {

OLAPStatus lzo_compress(ByteBuffer* in, ByteBuffer* out, bool* smaller) {
OLAPStatus lzo_compress(StorageByteBuffer* in, StorageByteBuffer* out, bool* smaller) {
size_t out_length = 0;
OLAPStatus res = OLAP_SUCCESS;
*smaller = false;
Expand All @@ -43,7 +43,7 @@ OLAPStatus lzo_compress(ByteBuffer* in, ByteBuffer* out, bool* smaller) {
return res;
}

OLAPStatus lzo_decompress(ByteBuffer* in, ByteBuffer* out) {
OLAPStatus lzo_decompress(StorageByteBuffer* in, StorageByteBuffer* out) {
size_t out_length = 0;
OLAPStatus res = OLAP_SUCCESS;
res = olap_decompress(&(in->array()[in->position()]),
Expand All @@ -60,7 +60,7 @@ OLAPStatus lzo_decompress(ByteBuffer* in, ByteBuffer* out) {
return res;
}

OLAPStatus lz4_compress(ByteBuffer* in, ByteBuffer* out, bool* smaller) {
OLAPStatus lz4_compress(StorageByteBuffer* in, StorageByteBuffer* out, bool* smaller) {
size_t out_length = 0;
OLAPStatus res = OLAP_SUCCESS;
*smaller = false;
Expand All @@ -81,7 +81,7 @@ OLAPStatus lz4_compress(ByteBuffer* in, ByteBuffer* out, bool* smaller) {
return res;
}

OLAPStatus lz4_decompress(ByteBuffer* in, ByteBuffer* out) {
OLAPStatus lz4_decompress(StorageByteBuffer* in, StorageByteBuffer* out) {
size_t out_length = 0;
OLAPStatus res = OLAP_SUCCESS;
res = olap_decompress(&(in->array()[in->position()]),
Expand Down
14 changes: 7 additions & 7 deletions be/src/olap/compress.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

namespace doris {

class ByteBuffer;
class StorageByteBuffer;

// 定义压缩函数,将in中剩余的内存压缩,并保存到out中剩余的空间
// Inputs:
Expand All @@ -32,7 +32,7 @@ class ByteBuffer;
// Returns:
// OLAP_ERR_BUFFER_OVERFLOW - out中的剩余空间不足
// OLAP_ERR_COMPRESS_ERROR - 压缩错误
typedef OLAPStatus(*Compressor)(ByteBuffer* in, ByteBuffer* out, bool* smaller);
typedef OLAPStatus(*Compressor)(StorageByteBuffer* in, StorageByteBuffer* out, bool* smaller);

// 定义解压缩函数,将in中剩余的内存解压缩,并保存到out中剩余的空间
// Inputs:
Expand All @@ -41,13 +41,13 @@ typedef OLAPStatus(*Compressor)(ByteBuffer* in, ByteBuffer* out, bool* smaller);
// Returns:
// OLAP_ERR_BUFFER_OVERFLOW - out中的剩余空间不足
// OLAP_ERR_DECOMPRESS_ERROR - 解压缩错误
typedef OLAPStatus(*Decompressor)(ByteBuffer* in, ByteBuffer* out);
typedef OLAPStatus(*Decompressor)(StorageByteBuffer* in, StorageByteBuffer* out);

OLAPStatus lzo_compress(ByteBuffer* in, ByteBuffer* out, bool* smaller);
OLAPStatus lzo_decompress(ByteBuffer* in, ByteBuffer* out);
OLAPStatus lzo_compress(StorageByteBuffer* in, StorageByteBuffer* out, bool* smaller);
OLAPStatus lzo_decompress(StorageByteBuffer* in, StorageByteBuffer* out);

OLAPStatus lz4_compress(ByteBuffer* in, ByteBuffer* out, bool* smaller);
OLAPStatus lz4_decompress(ByteBuffer* in, ByteBuffer* out);
OLAPStatus lz4_compress(StorageByteBuffer* in, StorageByteBuffer* out, bool* smaller);
OLAPStatus lz4_decompress(StorageByteBuffer* in, StorageByteBuffer* out);

} // namespace doris
#endif // DORIS_BE_SRC_OLAP_COLUMN_FILE_COMPRESS_H
6 changes: 3 additions & 3 deletions be/src/olap/file_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace doris {

ReadOnlyFileStream::ReadOnlyFileStream(
FileHandler* handler,
ByteBuffer** shared_buffer,
StorageByteBuffer** shared_buffer,
Decompressor decompressor,
uint32_t compress_buffer_size,
OlapReaderStatistics* stats)
Expand All @@ -40,7 +40,7 @@ ReadOnlyFileStream::ReadOnlyFileStream(

ReadOnlyFileStream::ReadOnlyFileStream(
FileHandler* handler,
ByteBuffer** shared_buffer,
StorageByteBuffer** shared_buffer,
uint64_t offset,
uint64_t length,
Decompressor decompressor,
Expand Down Expand Up @@ -85,7 +85,7 @@ OLAPStatus ReadOnlyFileStream::_assure_data() {
}

if (header.type == StreamHead::UNCOMPRESSED) {
ByteBuffer* tmp = _compressed_helper;
StorageByteBuffer* tmp = _compressed_helper;
_compressed_helper = *_shared_buffer;
*_shared_buffer = tmp;
} else {
Expand Down
14 changes: 7 additions & 7 deletions be/src/olap/file_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class ReadOnlyFileStream {
// 构造方法, 使用一组ByteBuffer创建一个InStream
// 输入的ByteBuffer在流中的位置可以不连续,例如通过Index确定某些数据不需要
// 读取后,则不读入这部分的数据. 但InStream封装了ByteBuffer不连续这一事实,
// 从上层使用者来看,依旧是在访问一段连续的流.上层使用者应该保证不读取ByteBuffer
// 从上层使用者来看,依旧是在访问一段连续的流.上层使用者应该保证不读取StorageByteBuffer
// 之间没有数据的空洞位置.
//
// 当使用mmap的时候,这里会退化为只有一个ByteBuffer, 是否使用mmap取决于在性能
Expand All @@ -53,13 +53,13 @@ class ReadOnlyFileStream {
// Decompressor - 如果流被压缩过,则提供一个解压缩函数,否则为NULL
// compress_buffer_size - 如果使用压缩,给出压缩的块大小
ReadOnlyFileStream(FileHandler* handler,
ByteBuffer** shared_buffer,
StorageByteBuffer** shared_buffer,
Decompressor decompressor,
uint32_t compress_buffer_size,
OlapReaderStatistics* stats);

ReadOnlyFileStream(FileHandler* handler,
ByteBuffer** shared_buffer,
StorageByteBuffer** shared_buffer,
uint64_t offset,
uint64_t length,
Decompressor decompressor,
Expand All @@ -71,7 +71,7 @@ class ReadOnlyFileStream {
}

inline OLAPStatus init() {
_compressed_helper = ByteBuffer::create(_compress_buffer_size);
_compressed_helper = StorageByteBuffer::create(_compress_buffer_size);
if (NULL == _compressed_helper) {
OLAP_LOG_WARNING("fail to create compressed buffer");
return OLAP_ERR_MALLOC_ERROR;
Expand Down Expand Up @@ -234,9 +234,9 @@ class ReadOnlyFileStream {
OLAPStatus _fill_compressed(size_t length);

FileCursor _file_cursor;
ByteBuffer* _compressed_helper;
ByteBuffer* _uncompressed;
ByteBuffer** _shared_buffer;
StorageByteBuffer* _compressed_helper;
StorageByteBuffer* _uncompressed;
StorageByteBuffer** _shared_buffer;

Decompressor _decompressor;
size_t _compress_buffer_size;
Expand Down
Loading