From 7f8e1760758f55d865356a59027a43965a7aec8a Mon Sep 17 00:00:00 2001 From: cyongli Date: Thu, 22 Nov 2018 23:08:22 +0800 Subject: [PATCH] Change ByteBuffer to StorageByteBuffer in olap/byte_buffer.h. Class definition of ByteBuffer duplicates between olap/byte_buffer.h and util/byte_buffer.h. All of the two classes has a function names as remaining(). Some place which want to call remaining() of util/byte_buffer.h is linked to the other remaining() function of olap/byte_buffer.h --- be/src/olap/byte_buffer.cpp | 34 +++++----- be/src/olap/byte_buffer.h | 22 +++---- be/src/olap/column_reader.cpp | 6 +- be/src/olap/column_reader.h | 2 +- be/src/olap/compress.cpp | 8 +-- be/src/olap/compress.h | 14 ++--- be/src/olap/file_stream.cpp | 6 +- be/src/olap/file_stream.h | 14 ++--- be/src/olap/in_stream.cpp | 20 +++--- be/src/olap/in_stream.h | 12 ++-- be/src/olap/out_stream.cpp | 22 +++---- be/src/olap/out_stream.h | 14 ++--- be/src/olap/segment_reader.cpp | 6 +- be/src/olap/segment_reader.h | 4 +- be/test/olap/bit_field_test.cpp | 4 +- be/test/olap/byte_buffer_test.cpp | 18 +++--- be/test/olap/column_reader_test.cpp | 16 ++--- be/test/olap/run_length_byte_test.cpp | 80 ++++++++++++------------ be/test/olap/run_length_integer_test.cpp | 8 +-- 19 files changed, 155 insertions(+), 155 deletions(-) diff --git a/be/src/olap/byte_buffer.cpp b/be/src/olap/byte_buffer.cpp index 6dbc48670e4cd0..18f41ec7f586fd 100644 --- a/be/src/olap/byte_buffer.cpp +++ b/be/src/olap/byte_buffer.cpp @@ -21,23 +21,23 @@ namespace doris { -ByteBuffer::ByteBuffer() : +StorageByteBuffer::StorageByteBuffer() : _array(NULL), _capacity(0), _limit(0), _position(0), _is_mmap(false) {} -ByteBuffer::BufDeleter::BufDeleter() : +StorageByteBuffer::BufDeleter::BufDeleter() : _is_mmap(false), _mmap_length(0) {} -void ByteBuffer::BufDeleter::set_mmap(size_t mmap_length) { +void StorageByteBuffer::BufDeleter::set_mmap(size_t mmap_length) { _is_mmap = true; _mmap_length = mmap_length; } -void ByteBuffer::BufDeleter::operator()(char* p) { +void StorageByteBuffer::BufDeleter::operator()(char* p) { if (NULL == p) { return; } @@ -53,9 +53,9 @@ void ByteBuffer::BufDeleter::operator()(char* p) { } // 创建ByteBuffer与array -ByteBuffer* ByteBuffer::create(uint64_t capacity) { +StorageByteBuffer* StorageByteBuffer::create(uint64_t capacity) { char* memory = new(std::nothrow) char[capacity]; - ByteBuffer* buf = new(std::nothrow) ByteBuffer; + StorageByteBuffer* buf = new(std::nothrow) StorageByteBuffer; if (buf != NULL && memory != NULL) { buf->_buf = boost::shared_ptr(memory, BufDeleter()); @@ -70,7 +70,7 @@ ByteBuffer* ByteBuffer::create(uint64_t capacity) { return NULL; } -ByteBuffer* ByteBuffer::reference_buffer(ByteBuffer* reference, +StorageByteBuffer* StorageByteBuffer::reference_buffer(StorageByteBuffer* reference, uint64_t offset, uint64_t length) { if (NULL == reference || 0 == length) { @@ -81,7 +81,7 @@ ByteBuffer* ByteBuffer::reference_buffer(ByteBuffer* reference, return NULL; } - ByteBuffer* buf = new(std::nothrow) ByteBuffer(); + StorageByteBuffer* buf = new(std::nothrow) StorageByteBuffer(); if (NULL == buf) { return NULL; @@ -96,7 +96,7 @@ ByteBuffer* ByteBuffer::reference_buffer(ByteBuffer* reference, return buf; } -ByteBuffer* ByteBuffer::mmap(void* start, uint64_t length, int prot, int flags, +StorageByteBuffer* StorageByteBuffer::mmap(void* start, uint64_t length, int prot, int flags, int fd, uint64_t offset) { char* memory = (char*)::mmap(start, length, prot, flags, fd, offset); @@ -108,11 +108,11 @@ ByteBuffer* ByteBuffer::mmap(void* start, uint64_t length, int prot, int flags, BufDeleter deleter; deleter.set_mmap(length); - ByteBuffer* buf = new(std::nothrow) ByteBuffer(); + StorageByteBuffer* buf = new(std::nothrow) StorageByteBuffer(); if (NULL == buf) { deleter(memory); - OLAP_LOG_WARNING("fail to allocate ByteBuffer."); + OLAP_LOG_WARNING("fail to allocate StorageByteBuffer."); return NULL; } @@ -124,7 +124,7 @@ ByteBuffer* ByteBuffer::mmap(void* start, uint64_t length, int prot, int flags, return buf; } -ByteBuffer* ByteBuffer::mmap(FileHandler* handler, uint64_t offset, int prot, int flags) { +StorageByteBuffer* StorageByteBuffer::mmap(FileHandler* handler, uint64_t offset, int prot, int flags) { if (NULL == handler) { OLAP_LOG_WARNING("invalid file handler"); return NULL; @@ -142,11 +142,11 @@ ByteBuffer* ByteBuffer::mmap(FileHandler* handler, uint64_t offset, int prot, in BufDeleter deleter; deleter.set_mmap(length); - ByteBuffer* buf = new(std::nothrow) ByteBuffer(); + StorageByteBuffer* buf = new(std::nothrow) StorageByteBuffer(); if (NULL == buf) { deleter(memory); - OLAP_LOG_WARNING("fail to allocate ByteBuffer."); + OLAP_LOG_WARNING("fail to allocate StorageByteBuffer."); return NULL; } @@ -158,7 +158,7 @@ ByteBuffer* ByteBuffer::mmap(FileHandler* handler, uint64_t offset, int prot, in return buf; } -OLAPStatus ByteBuffer::put(char src) { +OLAPStatus StorageByteBuffer::put(char src) { if (_position < _limit) { _array[_position++] = src; return OLAP_SUCCESS; @@ -167,7 +167,7 @@ OLAPStatus ByteBuffer::put(char src) { return OLAP_ERR_BUFFER_OVERFLOW; } -OLAPStatus ByteBuffer::put(uint64_t index, char src) { +OLAPStatus StorageByteBuffer::put(uint64_t index, char src) { if (index < _limit) { _array[index] = src; return OLAP_SUCCESS; @@ -176,7 +176,7 @@ OLAPStatus ByteBuffer::put(uint64_t index, char src) { return OLAP_ERR_BUFFER_OVERFLOW; } -OLAPStatus ByteBuffer::put(const char* src, uint64_t src_size, uint64_t offset, +OLAPStatus StorageByteBuffer::put(const char* src, uint64_t src_size, uint64_t offset, uint64_t length) { //没有足够的空间可以写 if (length > remaining()) { diff --git a/be/src/olap/byte_buffer.h b/be/src/olap/byte_buffer.h index 85a4d812b52ed0..9232ae656dc690 100644 --- a/be/src/olap/byte_buffer.h +++ b/be/src/olap/byte_buffer.h @@ -36,20 +36,20 @@ namespace doris { // limit - 最大使用限制, 这个值小于等于capacity, position始终小于limit // // ByteBuffer支持直接利用拷贝构造函数或者=操作符安全的进行数据的浅拷贝 -class ByteBuffer { +class StorageByteBuffer { public: - // 通过new方法创建一个容量为capacity的ByteBuffer. + // 通过new方法创建一个容量为capacity的StorageByteBuffer. // 新buffer的position为0, limit为capacity - // 调用者获得新建的ByteBuffer的所有权,并需使用delete删除获得的ByteBuffer + // 调用者获得新建的ByteBuffer的所有权,并需使用delete删除获得的StorageByteBuffer // // TODO. 我认为这里create用法应该是直接返回ByteBuffer本身而不是? // ??针,否则智能指针就无法发挥作用 // 目前内存的管理还是手动的。而且需要认为deleta。 - static ByteBuffer* create(uint64_t capacity); + static StorageByteBuffer* create(uint64_t capacity); - // 通过引用另一个ByteBuffer的内存创建一个新的ByteBuffer + // 通过引用另一个ByteBuffer的内存创建一个新的StorageByteBuffer // 新buffer的position为0, limit为length - // 调用者获得新建的ByteBuffer的所有权,并需使用delete删除获得的ByteBuffer + // 调用者获得新建的ByteBuffer的所有权,并需使用delete删除获得的StorageByteBuffer // Inputs: // - reference 引用的内存 // - offset 引用的Buffer在原ByteBuffer中的位置, 即&reference->array()[offset] @@ -58,20 +58,20 @@ class ByteBuffer { // offset + length < reference->capacity // // TODO. 同create - static ByteBuffer* reference_buffer(ByteBuffer* reference, + static StorageByteBuffer* reference_buffer(StorageByteBuffer* reference, uint64_t offset, uint64_t length); // 通过mmap创建一个ByteBuffer, mmap成功后的内存由ByteBuffer托管 // start, length, prot, flags, fd, offset都是mmap函数的参数 - // 调用者获得新建的ByteBuffer的所有权,并需使用delete删除获得的ByteBuffer - static ByteBuffer* mmap(void* start, uint64_t length, int prot, int flags, + // 调用者获得新建的ByteBuffer的所有权,并需使用delete删除获得的StorageByteBuffer + static StorageByteBuffer* mmap(void* start, uint64_t length, int prot, int flags, int fd, uint64_t offset); // 由于olap的文件都是用FileHandler封装的,因此稍微修? // ??下接口,省略掉的参数可以在handler中取到 // 旧接口仍然保留,或许会用到? - static ByteBuffer* mmap(FileHandler* handler, uint64_t offset, int prot, int flags); + static StorageByteBuffer* mmap(FileHandler* handler, uint64_t offset, int prot, int flags); inline uint64_t capacity() const { return _capacity; @@ -214,7 +214,7 @@ class ByteBuffer { }; private: // 不支持直接创建ByteBuffer, 而是通过create方法创建 - ByteBuffer(); + StorageByteBuffer(); private: boost::shared_ptr _buf; // 托管的内存 diff --git a/be/src/olap/column_reader.cpp b/be/src/olap/column_reader.cpp index ff5c37d340d781..875a6235895b78 100644 --- a/be/src/olap/column_reader.cpp +++ b/be/src/olap/column_reader.cpp @@ -298,7 +298,7 @@ OLAPStatus StringColumnDictionaryReader::init(std::mapstream_length() > 0) { - _dictionary_data_buffer = ByteBuffer::create( + _dictionary_data_buffer = StorageByteBuffer::create( dictionary_data_stream->estimate_uncompressed_length()); size_t offset = 0; size_t length = 0; @@ -405,9 +405,9 @@ OLAPStatus StringColumnDictionaryReader::init( size_t length_remain = 0; size_t length_to_read = 0; size_t read_buffer_size = 1024; - ByteBuffer* read_buffer = ByteBuffer::create(read_buffer_size); + StorageByteBuffer* read_buffer = StorageByteBuffer::create(read_buffer_size); if (NULL == read_buffer) { - OLAP_LOG_WARNING("fail to malloc ByteBuffer"); + OLAP_LOG_WARNING("fail to malloc StorageByteBuffer"); return OLAP_ERR_MALLOC_ERROR; } diff --git a/be/src/olap/column_reader.h b/be/src/olap/column_reader.h index e84de2affad0da..1ffe2e62b5192a 100644 --- a/be/src/olap/column_reader.h +++ b/be/src/olap/column_reader.h @@ -163,7 +163,7 @@ class StringColumnDictionaryReader { char* _read_buffer; //uint64_t _dictionary_size; //uint64_t* _offset_dictionary; // 用来查找响应数据的数字对应的offset - //ByteBuffer* _dictionary_data_buffer; // 保存dict数据 + //StorageByteBuffer* _dictionary_data_buffer; // 保存dict数据 std::vector _dictionary; RunLengthIntegerReader* _data_reader; // 用来读实际的数据(用一个integer表示) }; diff --git a/be/src/olap/compress.cpp b/be/src/olap/compress.cpp index 11700c470e94e5..c5b6ef7f55fbfb 100644 --- a/be/src/olap/compress.cpp +++ b/be/src/olap/compress.cpp @@ -22,7 +22,7 @@ namespace doris { -OLAPStatus lzo_compress(ByteBuffer* in, ByteBuffer* out, bool* smaller) { +OLAPStatus lzo_compress(StorageByteBuffer* in, StorageByteBuffer* out, bool* smaller) { size_t out_length = 0; OLAPStatus res = OLAP_SUCCESS; *smaller = false; @@ -43,7 +43,7 @@ OLAPStatus lzo_compress(ByteBuffer* in, ByteBuffer* out, bool* smaller) { return res; } -OLAPStatus lzo_decompress(ByteBuffer* in, ByteBuffer* out) { +OLAPStatus lzo_decompress(StorageByteBuffer* in, StorageByteBuffer* out) { size_t out_length = 0; OLAPStatus res = OLAP_SUCCESS; res = olap_decompress(&(in->array()[in->position()]), @@ -60,7 +60,7 @@ OLAPStatus lzo_decompress(ByteBuffer* in, ByteBuffer* out) { return res; } -OLAPStatus lz4_compress(ByteBuffer* in, ByteBuffer* out, bool* smaller) { +OLAPStatus lz4_compress(StorageByteBuffer* in, StorageByteBuffer* out, bool* smaller) { size_t out_length = 0; OLAPStatus res = OLAP_SUCCESS; *smaller = false; @@ -81,7 +81,7 @@ OLAPStatus lz4_compress(ByteBuffer* in, ByteBuffer* out, bool* smaller) { return res; } -OLAPStatus lz4_decompress(ByteBuffer* in, ByteBuffer* out) { +OLAPStatus lz4_decompress(StorageByteBuffer* in, StorageByteBuffer* out) { size_t out_length = 0; OLAPStatus res = OLAP_SUCCESS; res = olap_decompress(&(in->array()[in->position()]), diff --git a/be/src/olap/compress.h b/be/src/olap/compress.h index cf772006d2477c..832f0e4cd1cb70 100644 --- a/be/src/olap/compress.h +++ b/be/src/olap/compress.h @@ -22,7 +22,7 @@ namespace doris { -class ByteBuffer; +class StorageByteBuffer; // 定义压缩函数,将in中剩余的内存压缩,并保存到out中剩余的空间 // Inputs: @@ -32,7 +32,7 @@ class ByteBuffer; // Returns: // OLAP_ERR_BUFFER_OVERFLOW - out中的剩余空间不足 // OLAP_ERR_COMPRESS_ERROR - 压缩错误 -typedef OLAPStatus(*Compressor)(ByteBuffer* in, ByteBuffer* out, bool* smaller); +typedef OLAPStatus(*Compressor)(StorageByteBuffer* in, StorageByteBuffer* out, bool* smaller); // 定义解压缩函数,将in中剩余的内存解压缩,并保存到out中剩余的空间 // Inputs: @@ -41,13 +41,13 @@ typedef OLAPStatus(*Compressor)(ByteBuffer* in, ByteBuffer* out, bool* smaller); // Returns: // OLAP_ERR_BUFFER_OVERFLOW - out中的剩余空间不足 // OLAP_ERR_DECOMPRESS_ERROR - 解压缩错误 -typedef OLAPStatus(*Decompressor)(ByteBuffer* in, ByteBuffer* out); +typedef OLAPStatus(*Decompressor)(StorageByteBuffer* in, StorageByteBuffer* out); -OLAPStatus lzo_compress(ByteBuffer* in, ByteBuffer* out, bool* smaller); -OLAPStatus lzo_decompress(ByteBuffer* in, ByteBuffer* out); +OLAPStatus lzo_compress(StorageByteBuffer* in, StorageByteBuffer* out, bool* smaller); +OLAPStatus lzo_decompress(StorageByteBuffer* in, StorageByteBuffer* out); -OLAPStatus lz4_compress(ByteBuffer* in, ByteBuffer* out, bool* smaller); -OLAPStatus lz4_decompress(ByteBuffer* in, ByteBuffer* out); +OLAPStatus lz4_compress(StorageByteBuffer* in, StorageByteBuffer* out, bool* smaller); +OLAPStatus lz4_decompress(StorageByteBuffer* in, StorageByteBuffer* out); } // namespace doris #endif // DORIS_BE_SRC_OLAP_COLUMN_FILE_COMPRESS_H diff --git a/be/src/olap/file_stream.cpp b/be/src/olap/file_stream.cpp index 1ab586e9426728..3af2ad7e56178f 100755 --- a/be/src/olap/file_stream.cpp +++ b/be/src/olap/file_stream.cpp @@ -24,7 +24,7 @@ namespace doris { ReadOnlyFileStream::ReadOnlyFileStream( FileHandler* handler, - ByteBuffer** shared_buffer, + StorageByteBuffer** shared_buffer, Decompressor decompressor, uint32_t compress_buffer_size, OlapReaderStatistics* stats) @@ -40,7 +40,7 @@ ReadOnlyFileStream::ReadOnlyFileStream( ReadOnlyFileStream::ReadOnlyFileStream( FileHandler* handler, - ByteBuffer** shared_buffer, + StorageByteBuffer** shared_buffer, uint64_t offset, uint64_t length, Decompressor decompressor, @@ -85,7 +85,7 @@ OLAPStatus ReadOnlyFileStream::_assure_data() { } if (header.type == StreamHead::UNCOMPRESSED) { - ByteBuffer* tmp = _compressed_helper; + StorageByteBuffer* tmp = _compressed_helper; _compressed_helper = *_shared_buffer; *_shared_buffer = tmp; } else { diff --git a/be/src/olap/file_stream.h b/be/src/olap/file_stream.h index 7e167d4098ab64..aeb1c78c1a3100 100755 --- a/be/src/olap/file_stream.h +++ b/be/src/olap/file_stream.h @@ -40,7 +40,7 @@ class ReadOnlyFileStream { // 构造方法, 使用一组ByteBuffer创建一个InStream // 输入的ByteBuffer在流中的位置可以不连续,例如通过Index确定某些数据不需要 // 读取后,则不读入这部分的数据. 但InStream封装了ByteBuffer不连续这一事实, - // 从上层使用者来看,依旧是在访问一段连续的流.上层使用者应该保证不读取ByteBuffer + // 从上层使用者来看,依旧是在访问一段连续的流.上层使用者应该保证不读取StorageByteBuffer // 之间没有数据的空洞位置. // // 当使用mmap的时候,这里会退化为只有一个ByteBuffer, 是否使用mmap取决于在性能 @@ -53,13 +53,13 @@ class ReadOnlyFileStream { // Decompressor - 如果流被压缩过,则提供一个解压缩函数,否则为NULL // compress_buffer_size - 如果使用压缩,给出压缩的块大小 ReadOnlyFileStream(FileHandler* handler, - ByteBuffer** shared_buffer, + StorageByteBuffer** shared_buffer, Decompressor decompressor, uint32_t compress_buffer_size, OlapReaderStatistics* stats); ReadOnlyFileStream(FileHandler* handler, - ByteBuffer** shared_buffer, + StorageByteBuffer** shared_buffer, uint64_t offset, uint64_t length, Decompressor decompressor, @@ -71,7 +71,7 @@ class ReadOnlyFileStream { } inline OLAPStatus init() { - _compressed_helper = ByteBuffer::create(_compress_buffer_size); + _compressed_helper = StorageByteBuffer::create(_compress_buffer_size); if (NULL == _compressed_helper) { OLAP_LOG_WARNING("fail to create compressed buffer"); return OLAP_ERR_MALLOC_ERROR; @@ -234,9 +234,9 @@ class ReadOnlyFileStream { OLAPStatus _fill_compressed(size_t length); FileCursor _file_cursor; - ByteBuffer* _compressed_helper; - ByteBuffer* _uncompressed; - ByteBuffer** _shared_buffer; + StorageByteBuffer* _compressed_helper; + StorageByteBuffer* _uncompressed; + StorageByteBuffer** _shared_buffer; Decompressor _decompressor; size_t _compress_buffer_size; diff --git a/be/src/olap/in_stream.cpp b/be/src/olap/in_stream.cpp index 92ab4a9b109135..20bf6b59a8f0c3 100644 --- a/be/src/olap/in_stream.cpp +++ b/be/src/olap/in_stream.cpp @@ -23,7 +23,7 @@ namespace doris { InStream::InStream( - std::vector* inputs, + std::vector* inputs, const std::vector& offsets, uint64_t length, Decompressor decompressor, @@ -44,14 +44,14 @@ InStream::~InStream() { SAFE_DELETE(_uncompressed); } -OLAPStatus InStream::_slice(uint64_t chunk_size, ByteBuffer** out_slice) { +OLAPStatus InStream::_slice(uint64_t chunk_size, StorageByteBuffer** out_slice) { uint64_t len = chunk_size; uint64_t old_offset = _current_offset; - ByteBuffer* slice = NULL; + StorageByteBuffer* slice = NULL; //如果buffer够读,拿出一个chunksize,并设置position if (OLAP_LIKELY(_compressed->remaining() >= len)) { - slice = ByteBuffer::reference_buffer(_compressed, + slice = StorageByteBuffer::reference_buffer(_compressed, _compressed->position(), len); @@ -71,7 +71,7 @@ OLAPStatus InStream::_slice(uint64_t chunk_size, ByteBuffer** out_slice) { } // 这里并不分配chuck_size, 而是分配一个最大值, 这样利于减少内存碎片 - slice = ByteBuffer::create(_compress_buffer_size); + slice = StorageByteBuffer::create(_compress_buffer_size); if (OLAP_UNLIKELY(NULL == slice)) { return OLAP_ERR_MALLOC_ERROR; @@ -90,7 +90,7 @@ OLAPStatus InStream::_slice(uint64_t chunk_size, ByteBuffer** out_slice) { while (len > 0 && _current_range < _inputs.size()) { SAFE_DELETE(_compressed); // 再取一部分压缩过的buffer - _compressed = ByteBuffer::reference_buffer(_inputs[_current_range], + _compressed = StorageByteBuffer::reference_buffer(_inputs[_current_range], _inputs[_current_range]->position(), _inputs[_current_range]->remaining()); @@ -162,7 +162,7 @@ OLAPStatus InStream::_assure_data() { // 向后移动整体偏移 _current_offset += sizeof(StreamHead); - ByteBuffer* slice = NULL; + StorageByteBuffer* slice = NULL; // 根据head取一块buf,这里应该要调整_current_offset res = _slice(head.length, &slice); @@ -176,7 +176,7 @@ OLAPStatus InStream::_assure_data() { if (head.type == StreamHead::UNCOMPRESSED) { _uncompressed = slice; } else { - _uncompressed = ByteBuffer::create(_compress_buffer_size); + _uncompressed = StorageByteBuffer::create(_compress_buffer_size); if (OLAP_UNLIKELY(NULL == _uncompressed)) { res = OLAP_ERR_MALLOC_ERROR; @@ -216,7 +216,7 @@ OLAPStatus InStream::_seek(uint64_t position) { if (!(_current_range == i && NULL != _compressed)) { _current_range = i; SAFE_DELETE(_compressed); - _compressed = ByteBuffer::reference_buffer(_inputs[i], 0, + _compressed = StorageByteBuffer::reference_buffer(_inputs[i], 0, _inputs[i]->remaining()); } @@ -231,7 +231,7 @@ OLAPStatus InStream::_seek(uint64_t position) { if (!_inputs.empty() && position == _offsets.back() + _inputs.back()->remaining()) { _current_range = _inputs.size() - 1; - _compressed = ByteBuffer::reference_buffer(_inputs[_current_range], 0, + _compressed = StorageByteBuffer::reference_buffer(_inputs[_current_range], 0, _inputs[_current_range]->limit()); _current_offset = position; return OLAP_SUCCESS; diff --git a/be/src/olap/in_stream.h b/be/src/olap/in_stream.h index f2f742e8890fc8..1ff90807f4b327 100644 --- a/be/src/olap/in_stream.h +++ b/be/src/olap/in_stream.h @@ -44,7 +44,7 @@ class InStream { // 读取后,则不读入这部分的数据. 但InStream封装 // 了ByteBuffer不连续这一事实, // 从上层使用者来看,依旧是在访问一段连续的流. - // 上层使用者应该保证不读取ByteBuffer + // 上层使用者应该保证不读取StorageByteBuffer // 之间没有数据的空洞位置. // // 当使用mmap的时候,这里会退化为只有一个ByteBuffer, 是 @@ -57,7 +57,7 @@ class InStream { // length - 流的总字节长度 // Decompressor - 如果流被压缩过,则提供一个解压缩函数,否则为NULL // compress_buffer_size - 如果使用压缩,给出压缩的块大小 - explicit InStream(std::vector* inputs, + explicit InStream(std::vector* inputs, const std::vector& offsets, uint64_t length, Decompressor decompressor, @@ -115,18 +115,18 @@ class InStream { } private: OLAPStatus _assure_data(); - OLAPStatus _slice(uint64_t chunk_size, ByteBuffer** out_slice); + OLAPStatus _slice(uint64_t chunk_size, StorageByteBuffer** out_slice); OLAPStatus _seek(uint64_t position); - std::vector _inputs; + std::vector _inputs; std::vector _offsets; uint64_t _length; Decompressor _decompressor; uint32_t _compress_buffer_size; uint64_t _current_offset; uint64_t _current_range; - ByteBuffer* _compressed; - ByteBuffer* _uncompressed; + StorageByteBuffer* _compressed; + StorageByteBuffer* _uncompressed; DISALLOW_COPY_AND_ASSIGN(InStream); }; diff --git a/be/src/olap/out_stream.cpp b/be/src/olap/out_stream.cpp index b581d249ca0e53..53079349e0f002 100644 --- a/be/src/olap/out_stream.cpp +++ b/be/src/olap/out_stream.cpp @@ -86,7 +86,7 @@ OutStream::~OutStream() { SAFE_DELETE(_compressed); SAFE_DELETE(_overflow); - for (std::vector::iterator it = _output_buffers.begin(); + for (std::vector::iterator it = _output_buffers.begin(); it != _output_buffers.end(); ++it) { SAFE_DELETE(*it); } @@ -94,7 +94,7 @@ OutStream::~OutStream() { OLAPStatus OutStream::_create_new_input_buffer() { SAFE_DELETE(_current); - _current = ByteBuffer::create(_buffer_size + sizeof(StreamHead)); + _current = StorageByteBuffer::create(_buffer_size + sizeof(StreamHead)); if (NULL != _current) { _current->set_position(sizeof(StreamHead)); @@ -104,7 +104,7 @@ OLAPStatus OutStream::_create_new_input_buffer() { } } -OLAPStatus OutStream::_write_head(ByteBuffer* buf, +OLAPStatus OutStream::_write_head(StorageByteBuffer* buf, uint64_t position, StreamHead::StreamType type, uint32_t length) { @@ -119,8 +119,8 @@ OLAPStatus OutStream::_write_head(ByteBuffer* buf, return OLAP_SUCCESS; } -OLAPStatus OutStream::_compress(ByteBuffer* input, ByteBuffer* output, - ByteBuffer* overflow, bool* smaller) { +OLAPStatus OutStream::_compress(StorageByteBuffer* input, StorageByteBuffer* output, + StorageByteBuffer* overflow, bool* smaller) { OLAPStatus res = OLAP_SUCCESS; res = _compressor(input, overflow, smaller); @@ -165,7 +165,7 @@ void OutStream::_output_compressed() { OLAPStatus OutStream::_make_sure_output_buffer() { if (NULL == _compressed) { - _compressed = ByteBuffer::create(_buffer_size + sizeof(StreamHead)); + _compressed = StorageByteBuffer::create(_buffer_size + sizeof(StreamHead)); if (NULL == _compressed) { return OLAP_ERR_MALLOC_ERROR; @@ -173,7 +173,7 @@ OLAPStatus OutStream::_make_sure_output_buffer() { } if (NULL == _overflow) { - _overflow = ByteBuffer::create(_buffer_size + sizeof(StreamHead)); + _overflow = StorageByteBuffer::create(_buffer_size + sizeof(StreamHead)); if (NULL == _overflow) { return OLAP_ERR_MALLOC_ERROR; @@ -315,7 +315,7 @@ void OutStream::get_position(PositionEntryWriter* index_entry) const { uint64_t OutStream::get_stream_length() const { uint64_t result = 0; - for (std::vector::const_iterator it = _output_buffers.begin(); + for (std::vector::const_iterator it = _output_buffers.begin(); it != _output_buffers.end(); ++it) { result += (*it)->limit(); } @@ -326,7 +326,7 @@ uint64_t OutStream::get_stream_length() const { uint64_t OutStream::get_total_buffer_size() const { uint64_t result = 0; - for (std::vector::const_iterator it = _output_buffers.begin(); + for (std::vector::const_iterator it = _output_buffers.begin(); it != _output_buffers.end(); ++it) { result += (*it)->capacity(); } @@ -355,7 +355,7 @@ OLAPStatus OutStream::write_to_file(FileHandler* file_handle, speed_limit_watch.reset(); - for (std::vector::const_iterator it = _output_buffers.begin(); + for (std::vector::const_iterator it = _output_buffers.begin(); it != _output_buffers.end(); ++it) { OLAP_LOG_DEBUG("write stream begin: %lu", file_handle->tell()); @@ -406,7 +406,7 @@ OLAPStatus OutStream::flush() { uint32_t OutStream::crc32(uint32_t checksum) const { uint32_t result = CRC32_INIT; - for (std::vector::const_iterator it = _output_buffers.begin(); + for (std::vector::const_iterator it = _output_buffers.begin(); it != _output_buffers.end(); ++it) { result = olap_crc32(result, (*it)->array(), (*it)->limit()); } diff --git a/be/src/olap/out_stream.h b/be/src/olap/out_stream.h index 07ff7255472faf..0dc2228ff28ce6 100644 --- a/be/src/olap/out_stream.h +++ b/be/src/olap/out_stream.h @@ -101,7 +101,7 @@ class OutStream { OLAPStatus flush(); // 计算输出数据的crc32值 uint32_t crc32(uint32_t checksum) const; - const std::vector& output_buffers() { + const std::vector& output_buffers() { return _output_buffers; } @@ -117,12 +117,12 @@ class OutStream { private: OLAPStatus _create_new_input_buffer(); - OLAPStatus _write_head(ByteBuffer* buf, + OLAPStatus _write_head(StorageByteBuffer* buf, uint64_t position, StreamHead::StreamType type, uint32_t length); OLAPStatus _spill(); - OLAPStatus _compress(ByteBuffer* input, ByteBuffer* output, ByteBuffer* overflow, + OLAPStatus _compress(StorageByteBuffer* input, StorageByteBuffer* output, StorageByteBuffer* overflow, bool* smaller); void _output_uncompress(); void _output_compressed(); @@ -130,11 +130,11 @@ class OutStream { uint32_t _buffer_size; // 压缩块大小 Compressor _compressor; // 压缩函数,如果为NULL表示不压缩 - std::vector _output_buffers;// 缓冲所有的输出 + std::vector _output_buffers;// 缓冲所有的输出 bool _is_suppressed; // 流是否被终止 - ByteBuffer* _current; // 缓存未压缩的数据 - ByteBuffer* _compressed; // 即将输出到output_buffers中的字节 - ByteBuffer* _overflow; // _output中放不下的字节 + StorageByteBuffer* _current; // 缓存未压缩的数据 + StorageByteBuffer* _compressed; // 即将输出到output_buffers中的字节 + StorageByteBuffer* _overflow; // _output中放不下的字节 uint64_t _spilled_bytes; // 已经输出到output的字节数 DISALLOW_COPY_AND_ASSIGN(OutStream); diff --git a/be/src/olap/segment_reader.cpp b/be/src/olap/segment_reader.cpp index 8357ae42af0722..89b9a8a43a5f64 100644 --- a/be/src/olap/segment_reader.cpp +++ b/be/src/olap/segment_reader.cpp @@ -151,7 +151,7 @@ OLAPStatus SegmentReader::_load_segment_file() { // 如果需要mmap,则进行映射 if (_is_using_mmap) { - _mmap_buffer = ByteBuffer::mmap(&_file_handler, 0, PROT_READ, MAP_PRIVATE); + _mmap_buffer = StorageByteBuffer::mmap(&_file_handler, 0, PROT_READ, MAP_PRIVATE); if (NULL == _mmap_buffer) { OLAP_LOG_WARNING("fail to call mmap, using default mode"); @@ -215,10 +215,10 @@ OLAPStatus SegmentReader::init(bool is_using_cache) { return res; } - _shared_buffer = ByteBuffer::create( + _shared_buffer = StorageByteBuffer::create( _header_message().stream_buffer_size() + sizeof(StreamHead)); if (_shared_buffer == NULL) { - OLAP_LOG_WARNING("fail to create shared buffer. [size=%lu]", sizeof(ByteBuffer)); + OLAP_LOG_WARNING("fail to create shared buffer. [size=%lu]", sizeof(StorageByteBuffer)); return OLAP_ERR_MALLOC_ERROR; } diff --git a/be/src/olap/segment_reader.h b/be/src/olap/segment_reader.h index 9c382c77a0d91d..f51a5e48208e34 100644 --- a/be/src/olap/segment_reader.h +++ b/be/src/olap/segment_reader.h @@ -326,7 +326,7 @@ class SegmentReader { UniqueIdEncodingMap _encodings_map; // 保存encoding std::map _bloom_filters; Decompressor _decompressor; //根据压缩格式,设置的解压器 - ByteBuffer* _mmap_buffer; + StorageByteBuffer* _mmap_buffer; /* * _include_blocks is used for saving the state of block when encountering delete conditions, @@ -352,7 +352,7 @@ class SegmentReader { std::unique_ptr _mem_pool; RuntimeState* _runtime_state; // 用于统计内存消耗等运行时信息 - ByteBuffer* _shared_buffer; + StorageByteBuffer* _shared_buffer; // Set when seek_to_block is called, valid until next seek_to_block is called. bool _without_filter = false; diff --git a/be/test/olap/bit_field_test.cpp b/be/test/olap/bit_field_test.cpp index 941f624ff3fbc4..44859bfc1c6aed 100755 --- a/be/test/olap/bit_field_test.cpp +++ b/be/test/olap/bit_field_test.cpp @@ -61,7 +61,7 @@ class TestBitField : public testing::Test { ASSERT_EQ(OLAP_SUCCESS, _helper.open_with_mode("tmp_file", O_RDONLY, S_IRUSR | S_IWUSR)); - _shared_buffer = ByteBuffer::create( + _shared_buffer = StorageByteBuffer::create( OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE + sizeof(StreamHead)); ASSERT_TRUE(_shared_buffer != NULL); @@ -84,7 +84,7 @@ class TestBitField : public testing::Test { OutStream* _out_stream; BitFieldWriter* _writer; FileHandler _helper; - ByteBuffer* _shared_buffer; + StorageByteBuffer* _shared_buffer; ReadOnlyFileStream* _stream; OlapReaderStatistics _stats; }; diff --git a/be/test/olap/byte_buffer_test.cpp b/be/test/olap/byte_buffer_test.cpp index 7ec4ae05545f7a..d9180bdda76c31 100755 --- a/be/test/olap/byte_buffer_test.cpp +++ b/be/test/olap/byte_buffer_test.cpp @@ -36,9 +36,9 @@ class TestByteBuffer : public testing::Test { // 测试基本的读写功能 TEST_F(TestByteBuffer, TestReadWrite) { - ByteBuffer *buf1 = NULL; + StorageByteBuffer *buf1 = NULL; - buf1 = ByteBuffer::create(100); + buf1 = StorageByteBuffer::create(100); ASSERT_TRUE(buf1 != NULL); char in[10] = {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j'}; @@ -106,21 +106,21 @@ TEST_F(TestByteBuffer, TestReadWrite) { // 测试ByteBuffer对内存的引用, 尤其是智能指针的引用传递 // 使用valgrind进行内存泄露检查 TEST_F(TestByteBuffer, TestRef) { - ByteBuffer *buf1 = NULL; + StorageByteBuffer *buf1 = NULL; - buf1 = ByteBuffer::create(1000); + buf1 = StorageByteBuffer::create(1000); ASSERT_TRUE(buf1 != NULL); for (int i = 0; i < 256; i++) { ASSERT_EQ(OLAP_SUCCESS, buf1->put(i)); } - ByteBuffer buf2 = *buf1; + StorageByteBuffer buf2 = *buf1; ASSERT_EQ(buf2.array(), buf1->array()); - ByteBuffer buf4(*buf1); + StorageByteBuffer buf4(*buf1); ASSERT_EQ(buf2.array(), buf1->array()); - ByteBuffer *buf3 = NULL; - buf3 = ByteBuffer::reference_buffer(buf1, 10, 90); + StorageByteBuffer *buf3 = NULL; + buf3 = StorageByteBuffer::reference_buffer(buf1, 10, 90); ASSERT_EQ(90u, buf3->capacity()); ASSERT_EQ(90u, buf3->limit()); @@ -154,7 +154,7 @@ TEST_F(TestByteBuffer, TestMmap) { res = file_handle.open(file_name, O_RDWR); ASSERT_EQ(OLAP_SUCCESS, res); - ByteBuffer * buf1 = ByteBuffer::mmap(NULL, 80, PROT_READ | PROT_WRITE, MAP_SHARED, + StorageByteBuffer * buf1 = StorageByteBuffer::mmap(NULL, 80, PROT_READ | PROT_WRITE, MAP_SHARED, file_handle.fd(), 0); // mmap完成后就可以关闭原fd file_handle.close(); diff --git a/be/test/olap/column_reader_test.cpp b/be/test/olap/column_reader_test.cpp index a1145f43078cb0..88fa0b08056df3 100644 --- a/be/test/olap/column_reader_test.cpp +++ b/be/test/olap/column_reader_test.cpp @@ -146,7 +146,7 @@ class TestColumn : public testing::Test { for (; it != _stream_factory->streams().end(); ++it) { StreamName stream_name = it->first; OutStream *out_stream = it->second; - std::vector *buffers; + std::vector *buffers; if (out_stream->is_suppressed()) { continue; @@ -180,7 +180,7 @@ class TestColumn : public testing::Test { ASSERT_EQ(OLAP_SUCCESS, helper.open_with_mode("tmp_file", O_RDONLY, S_IRUSR | S_IWUSR)); - _shared_buffer = ByteBuffer::create( + _shared_buffer = StorageByteBuffer::create( OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE + sizeof(StreamHead)); ASSERT_TRUE(_shared_buffer != NULL); @@ -239,17 +239,17 @@ class TestColumn : public testing::Test { std::vector _offsets; - std::vector _present_buffers; + std::vector _present_buffers; - std::vector _data_buffers; + std::vector _data_buffers; - std::vector _second_buffers; + std::vector _second_buffers; - std::vector _dictionary_buffers; + std::vector _dictionary_buffers; - std::vector _length_buffers; + std::vector _length_buffers; - ByteBuffer* _shared_buffer; + StorageByteBuffer* _shared_buffer; std::map _map_in_streams; diff --git a/be/test/olap/run_length_byte_test.cpp b/be/test/olap/run_length_byte_test.cpp index db352bd373fd7a..65db8a833b3aa9 100755 --- a/be/test/olap/run_length_byte_test.cpp +++ b/be/test/olap/run_length_byte_test.cpp @@ -46,7 +46,7 @@ TEST(TestStream, UncompressOutStream) { ASSERT_EQ(out_stream->output_buffers().size(), 1); - std::vector::const_iterator it = out_stream->output_buffers().begin(); + std::vector::const_iterator it = out_stream->output_buffers().begin(); ASSERT_EQ((*it)->position(), 0); StreamHead head; (*it)->get((char *)&head, sizeof(head)); @@ -78,10 +78,10 @@ TEST(TestStream, UncompressOutStream2) { ASSERT_EQ(out_stream->output_buffers().size(), 2); - std::vector inputs; - std::vector::const_iterator it = out_stream->output_buffers().begin(); + std::vector inputs; + std::vector::const_iterator it = out_stream->output_buffers().begin(); for (; it != out_stream->output_buffers().end(); ++it) { - ByteBuffer *tmp_byte_buffer = ByteBuffer::reference_buffer(*it, 0, (*it)->limit()); + StorageByteBuffer *tmp_byte_buffer = StorageByteBuffer::reference_buffer(*it, 0, (*it)->limit()); inputs.push_back(tmp_byte_buffer); } std::vector offsets; @@ -130,10 +130,10 @@ TEST(TestStream, UncompressOutStream3) { ASSERT_EQ(out_stream->output_buffers().size(), 2); - std::vector inputs; - std::vector::const_iterator it = out_stream->output_buffers().begin(); + std::vector inputs; + std::vector::const_iterator it = out_stream->output_buffers().begin(); for (; it != out_stream->output_buffers().end(); ++it) { - ByteBuffer *tmp_byte_buffer = ByteBuffer::reference_buffer(*it, 0, (*it)->limit()); + StorageByteBuffer *tmp_byte_buffer = StorageByteBuffer::reference_buffer(*it, 0, (*it)->limit()); inputs.push_back(tmp_byte_buffer); } @@ -176,10 +176,10 @@ TEST(TestStream, UncompressInStream) { out_stream->flush(); // read data - std::vector inputs; - std::vector::const_iterator it = out_stream->output_buffers().begin(); + std::vector inputs; + std::vector::const_iterator it = out_stream->output_buffers().begin(); ASSERT_NE(it, out_stream->output_buffers().end()); - ByteBuffer *tmp_byte_buffer = ByteBuffer::reference_buffer(*it, 0, (*it)->capacity()); + StorageByteBuffer *tmp_byte_buffer = StorageByteBuffer::reference_buffer(*it, 0, (*it)->capacity()); inputs.push_back(tmp_byte_buffer); std::vector offsets; @@ -218,7 +218,7 @@ TEST(TestStream, CompressOutStream) { //ASSERT_EQ(out_stream->output_buffers().size(), 1); - std::vector::const_iterator it = out_stream->output_buffers().begin(); + std::vector::const_iterator it = out_stream->output_buffers().begin(); StreamHead head; (*it)->get((char *)&head, sizeof(head)); @@ -242,10 +242,10 @@ TEST(TestStream, CompressOutStream2) { out_stream->write(0x5a); out_stream->flush(); - std::vector inputs; - std::vector::const_iterator it = out_stream->output_buffers().begin(); + std::vector inputs; + std::vector::const_iterator it = out_stream->output_buffers().begin(); for (; it != out_stream->output_buffers().end(); ++it) { - ByteBuffer *tmp_byte_buffer = ByteBuffer::reference_buffer(*it, 0, (*it)->limit()); + StorageByteBuffer *tmp_byte_buffer = StorageByteBuffer::reference_buffer(*it, 0, (*it)->limit()); inputs.push_back(tmp_byte_buffer); } std::vector offsets; @@ -289,10 +289,10 @@ TEST(TestStream, CompressOutStream3) { out_stream->write(write_data, sizeof(write_data)); out_stream->flush(); - std::vector inputs; - std::vector::const_iterator it = out_stream->output_buffers().begin(); + std::vector inputs; + std::vector::const_iterator it = out_stream->output_buffers().begin(); for (; it != out_stream->output_buffers().end(); ++it) { - ByteBuffer *tmp_byte_buffer = ByteBuffer::reference_buffer(*it, 0, (*it)->limit()); + StorageByteBuffer *tmp_byte_buffer = StorageByteBuffer::reference_buffer(*it, 0, (*it)->limit()); inputs.push_back(tmp_byte_buffer); } std::vector offsets; @@ -341,10 +341,10 @@ TEST(TestStream, CompressOutStream4) { } out_stream->flush(); - std::vector inputs; - std::vector::const_iterator it = out_stream->output_buffers().begin(); + std::vector inputs; + std::vector::const_iterator it = out_stream->output_buffers().begin(); for (; it != out_stream->output_buffers().end(); ++it) { - ByteBuffer *tmp_byte_buffer = ByteBuffer::reference_buffer(*it, 0, (*it)->limit()); + StorageByteBuffer *tmp_byte_buffer = StorageByteBuffer::reference_buffer(*it, 0, (*it)->limit()); inputs.push_back(tmp_byte_buffer); } std::vector offsets; @@ -397,10 +397,10 @@ TEST(TestStream, CompressMassOutStream) { //out_stream->write(100); out_stream->flush(); - std::vector inputs; - std::vector::const_iterator it = out_stream->output_buffers().begin(); + std::vector inputs; + std::vector::const_iterator it = out_stream->output_buffers().begin(); for (; it != out_stream->output_buffers().end(); ++it) { - ByteBuffer *tmp_byte_buffer = ByteBuffer::reference_buffer(*it, 0, (*it)->limit()); + StorageByteBuffer *tmp_byte_buffer = StorageByteBuffer::reference_buffer(*it, 0, (*it)->limit()); inputs.push_back(tmp_byte_buffer); } std::vector offsets; @@ -443,10 +443,10 @@ TEST(TestStream, CompressInStream) { out_stream->flush(); // read data - std::vector inputs; - std::vector::const_iterator it = out_stream->output_buffers().begin(); + std::vector inputs; + std::vector::const_iterator it = out_stream->output_buffers().begin(); ASSERT_NE(it, out_stream->output_buffers().end()); - ByteBuffer *tmp_byte_buffer = ByteBuffer::reference_buffer(*it, 0, (*it)->capacity()); + StorageByteBuffer *tmp_byte_buffer = StorageByteBuffer::reference_buffer(*it, 0, (*it)->capacity()); inputs.push_back(tmp_byte_buffer); std::vector offsets; @@ -489,10 +489,10 @@ TEST(TestStream, SeekUncompress) { out_stream->flush(); // read data - std::vector inputs; - std::vector::const_iterator it = out_stream->output_buffers().begin(); + std::vector inputs; + std::vector::const_iterator it = out_stream->output_buffers().begin(); ASSERT_NE(it, out_stream->output_buffers().end()); - ByteBuffer *tmp_byte_buffer = ByteBuffer::reference_buffer(*it, 0, (*it)->capacity()); + StorageByteBuffer *tmp_byte_buffer = StorageByteBuffer::reference_buffer(*it, 0, (*it)->capacity()); inputs.push_back(tmp_byte_buffer); std::vector offsets; @@ -542,10 +542,10 @@ TEST(TestStream, SkipUncompress) { out_stream->flush(); // read data - std::vector inputs; - std::vector::const_iterator it = out_stream->output_buffers().begin(); + std::vector inputs; + std::vector::const_iterator it = out_stream->output_buffers().begin(); ASSERT_NE(it, out_stream->output_buffers().end()); - ByteBuffer *tmp_byte_buffer = ByteBuffer::reference_buffer(*it, 0, (*it)->capacity()); + StorageByteBuffer *tmp_byte_buffer = StorageByteBuffer::reference_buffer(*it, 0, (*it)->capacity()); inputs.push_back(tmp_byte_buffer); std::vector offsets; @@ -584,10 +584,10 @@ TEST(TestStream, SeekCompress) { out_stream->flush(); // read data - std::vector inputs; - std::vector::const_iterator it = out_stream->output_buffers().begin(); + std::vector inputs; + std::vector::const_iterator it = out_stream->output_buffers().begin(); ASSERT_NE(it, out_stream->output_buffers().end()); - ByteBuffer *tmp_byte_buffer = ByteBuffer::reference_buffer(*it, 0, (*it)->capacity()); + StorageByteBuffer *tmp_byte_buffer = StorageByteBuffer::reference_buffer(*it, 0, (*it)->capacity()); inputs.push_back(tmp_byte_buffer); std::vector offsets; @@ -630,10 +630,10 @@ TEST(TestStream, SkipCompress) { out_stream->flush(); // read data - std::vector inputs; - std::vector::const_iterator it = out_stream->output_buffers().begin(); + std::vector inputs; + std::vector::const_iterator it = out_stream->output_buffers().begin(); ASSERT_NE(it, out_stream->output_buffers().end()); - ByteBuffer *tmp_byte_buffer = ByteBuffer::reference_buffer(*it, 0, (*it)->capacity()); + StorageByteBuffer *tmp_byte_buffer = StorageByteBuffer::reference_buffer(*it, 0, (*it)->capacity()); inputs.push_back(tmp_byte_buffer); std::vector offsets; @@ -688,7 +688,7 @@ class TestRunLengthByte : public testing::Test { ASSERT_EQ(OLAP_SUCCESS, helper.open_with_mode("tmp_file", O_RDONLY, S_IRUSR | S_IWUSR)); - _shared_buffer = ByteBuffer::create( + _shared_buffer = StorageByteBuffer::create( OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE + sizeof(StreamHead)); ASSERT_TRUE(_shared_buffer != NULL); @@ -710,7 +710,7 @@ class TestRunLengthByte : public testing::Test { OutStream* _out_stream; RunLengthByteWriter* _writer; FileHandler helper; - ByteBuffer* _shared_buffer; + StorageByteBuffer* _shared_buffer; ReadOnlyFileStream* _stream; OlapReaderStatistics _stats; }; diff --git a/be/test/olap/run_length_integer_test.cpp b/be/test/olap/run_length_integer_test.cpp index 2920e516c9060c..fafb4a035a74b6 100755 --- a/be/test/olap/run_length_integer_test.cpp +++ b/be/test/olap/run_length_integer_test.cpp @@ -62,7 +62,7 @@ class TestRunLengthUnsignInteger : public testing::Test { ASSERT_EQ(OLAP_SUCCESS, helper.open_with_mode("tmp_file", O_RDONLY, S_IRUSR | S_IWUSR)); - _shared_buffer = ByteBuffer::create( + _shared_buffer = StorageByteBuffer::create( OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE + sizeof(StreamHead)); ASSERT_TRUE(_shared_buffer != NULL); @@ -84,7 +84,7 @@ class TestRunLengthUnsignInteger : public testing::Test { OutStream* _out_stream; RunLengthIntegerWriter* _writer; FileHandler helper; - ByteBuffer* _shared_buffer; + StorageByteBuffer* _shared_buffer; ReadOnlyFileStream* _stream; OlapReaderStatistics _stats; }; @@ -374,7 +374,7 @@ virtual void SetUp() { ASSERT_EQ(OLAP_SUCCESS, helper.open_with_mode("tmp_file", O_RDONLY, S_IRUSR | S_IWUSR)); - _shared_buffer = ByteBuffer::create( + _shared_buffer = StorageByteBuffer::create( OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE + sizeof(StreamHead)); ASSERT_TRUE(_shared_buffer != NULL); @@ -396,7 +396,7 @@ virtual void SetUp() { OutStream* _out_stream; RunLengthIntegerWriter* _writer; FileHandler helper; - ByteBuffer* _shared_buffer; + StorageByteBuffer* _shared_buffer; ReadOnlyFileStream* _stream; OlapReaderStatistics _stats; };