Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/olap/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,5 @@ add_library(Olap STATIC
task/engine_publish_version_task.cpp
task/engine_alter_tablet_task.cpp
olap_snapshot_converter.cpp
byte_buffer_stream.cpp
)
65 changes: 65 additions & 0 deletions be/src/olap/base_stream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "olap/olap_define.h"

namespace doris {

class PositionProvider;

// base of stream
// add this base class for ReadOnlyFileStream and ByteBufferStream
// to minimize the modification of RunLengthIntegerReader.
class BaseStream {
public:
virtual ~BaseStream() { }

virtual OLAPStatus init() = 0;

virtual void reset(uint64_t offset, uint64_t length) = 0;

virtual OLAPStatus read(char* byte) = 0;

virtual OLAPStatus read(char* buffer, uint64_t* buf_size) = 0;

virtual OLAPStatus read_all(char* buffer, uint64_t* buf_size) = 0;

virtual OLAPStatus seek(PositionProvider* position) = 0;

virtual OLAPStatus skip(uint64_t skip_length) = 0;

virtual uint64_t stream_length() = 0;

virtual bool eof() = 0;

// 返回当前块剩余可读字节数
virtual uint64_t available() = 0;

virtual size_t get_buffer_size() = 0;

virtual void get_buf(char** buf, uint32_t* remaining_bytes) = 0;

virtual void get_position(uint32_t* position) = 0;

virtual void set_position(uint32_t pos) = 0;

virtual int remaining() = 0;
};

} // namespace doris
118 changes: 118 additions & 0 deletions be/src/olap/byte_buffer_stream.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "olap/byte_buffer_stream.h"

#include "olap/out_stream.h"

namespace doris {

OLAPStatus ByteBufferStream::read(char* byte) {
OLAPStatus res = _assure_data();

if (OLAP_SUCCESS != res) {
return res;
}

*byte = _data.data[_cur_pos++];
return res;
}

OLAPStatus ByteBufferStream::read(char* buffer, uint64_t* buf_size) {
OLAPStatus res;
uint64_t read_length = *buf_size;
*buf_size = 0;

do {
res = _assure_data();
if (OLAP_SUCCESS != res) {
break;
}

uint64_t actual_length = std::min(read_length - *buf_size, (uint64_t)remaining());

memcpy(buffer, _data.data, actual_length);
_cur_pos += actual_length;
*buf_size += actual_length;
buffer += actual_length;
} while (*buf_size < read_length);

return res;
}

OLAPStatus ByteBufferStream::read_all(char* buffer, uint64_t* buf_size) {
OLAPStatus res = OLAP_SUCCESS;
uint64_t read_length = 0;
uint64_t buffer_remain = *buf_size;

while (OLAP_SUCCESS == _assure_data()) {
read_length = remaining();

if (buffer_remain < read_length) {
res = OLAP_ERR_BUFFER_OVERFLOW;
break;
}

memcpy(buffer, _data.data, read_length);

buffer_remain -= read_length;
buffer += read_length;
}

if (eof()) {
*buf_size -= buffer_remain;
return OLAP_SUCCESS;
}

return res;
}

OLAPStatus ByteBufferStream::skip(uint64_t skip_length) {
OLAPStatus res = _assure_data();

if (OLAP_SUCCESS != res) {
return res;
}

uint64_t skip_byte = 0;
uint64_t byte_to_skip = skip_length;

do {
skip_byte = std::min((uint64_t)remaining(), byte_to_skip);
_cur_pos += skip_byte;
byte_to_skip -= skip_byte;
// call assure_data to read next StorageByteBuffer if necessary
res = _assure_data();
} while (byte_to_skip != 0 && res == OLAP_SUCCESS);

return res;
}

OLAPStatus ByteBufferStream::_assure_data() {
if (_cur_pos < _byte_buffer_pos + _byte_buffer_length) {
return OLAP_SUCCESS;
} else if (eof()) {
return OLAP_ERR_COLUMN_STREAM_EOF;
}
StreamHead* header = reinterpret_cast<StreamHead*>(&_data.data[_cur_pos]);
_cur_pos += sizeof(StreamHead);
_byte_buffer_length = header->length;
_byte_buffer_pos = _cur_pos;
return OLAP_SUCCESS;
}

} // namespace doris
128 changes: 128 additions & 0 deletions be/src/olap/byte_buffer_stream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "util/slice.h"
#include "olap/olap_define.h"
#include "olap/file_stream.h"

class PositionProvider;

namespace doris {

// To reuse RunLengthIntegerReader in segment v2, add ByteBufferStream which derive from
// BaseStream. RunLengthIntegerReader will accept BaseStream* instead of ReadOnlyFileStream*
// as argument.
// Unlike ReadOnlyFileStream, ByteBufferStream read data from StorageByteBuffers instead of file.
class ByteBufferStream : public BaseStream {
public:
ByteBufferStream(const Slice& data) :
_data(data), _cur_pos(0),
_byte_buffer_pos(0), _byte_buffer_length(0) { }

OLAPStatus init() {
return _assure_data();
}

void reset(uint64_t offset, uint64_t length) override {
_cur_pos = 0;
_byte_buffer_pos = 0;
_byte_buffer_length = 0;
}

// read one byte from the stream and modify the offset
// If reach the enc of stream, return OLAP_ERR_COLUMN_STREAM_EOF
OLAPStatus read(char* byte) override;

// read buf_size data from stream to buffer
// return OLAP_ERR_COLUMN_STREAM_EOF if reach end of stream
OLAPStatus read(char* buffer, uint64_t* buf_size) override;

// read all data from stream to buffer and set the size to buf_size
OLAPStatus read_all(char* buffer, uint64_t* buf_size) override;

// seek to position
OLAPStatus seek(PositionProvider* position) override {
return OLAP_ERR_FUNC_NOT_IMPLEMENTED;
}

// skip skip_length bytes
OLAPStatus skip(uint64_t skip_length) override;

// return stream length
uint64_t stream_length() override {
return _data.size;
}

// end of stream
bool eof() override {
return _cur_pos >= _data.size;
}

// remain bytes of the stream
uint64_t available() override {
return _data.size - _cur_pos;
}

// get the default buffer size
size_t get_buffer_size() override {
return _data.size;
}

// return the memory buffer's first byte address and the remain bytes
inline void get_buf(char** buf, uint32_t* remaining_bytes) override {
if (_byte_buffer_length == 0) {
*buf = nullptr;
*remaining_bytes = 0;
} else {
*buf = _data.data;
*remaining_bytes = remaining();
}
}

// return the current read offset position
inline void get_position(uint32_t* position) override {
*position = _cur_pos;
}

// set the read offset
void set_position(uint32_t pos) override {
_cur_pos = pos;
}

// return the remain bytes in the stream
int remaining() override {
return _byte_buffer_pos + _byte_buffer_length - _cur_pos;
}

private:
// read the next StorageByteBuffer data
OLAPStatus _assure_data();

private:
// memory byte data
Slice _data;
// current data offset
uint32_t _cur_pos;
// offset in current StorageByteBuffer
uint32_t _byte_buffer_pos;
// length of current StorageByteBuffer
uint32_t _byte_buffer_length;
};

} // namespace doris
3 changes: 2 additions & 1 deletion be/src/olap/file_stream.h
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <streambuf>
#include <vector>

#include "olap/base_stream.h"
#include "olap/byte_buffer.h"
#include "olap/compress.h"
#include "olap/stream_index_reader.h"
Expand All @@ -35,7 +36,7 @@
namespace doris {

// 定义输入数据流接口
class ReadOnlyFileStream {
class ReadOnlyFileStream : public BaseStream {
public:
// 构造方法, 使用一组ByteBuffer创建一个InStream
// 输入的ByteBuffer在流中的位置可以不连续,例如通过Index确定某些数据不需要
Expand Down
7 changes: 7 additions & 0 deletions be/src/olap/out_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,13 @@ class OutStream {
}
}

uint64_t current_buffer_byte() {
if (_current != nullptr) {
return _current->position();
}
return 0;
}

private:
OLAPStatus _create_new_input_buffer();
OLAPStatus _write_head(StorageByteBuffer* buf,
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/run_length_integer_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

namespace doris {

RunLengthIntegerReader::RunLengthIntegerReader(ReadOnlyFileStream* input, bool is_singed)
RunLengthIntegerReader::RunLengthIntegerReader(BaseStream* input, bool is_singed)
: _input(input),
_signed(is_singed),
_num_literals(0),
Expand Down
8 changes: 4 additions & 4 deletions be/src/olap/rowset/run_length_integer_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@
#ifndef DORIS_BE_SRC_OLAP_ROWSET_RUN_LENGTH_INTEGER_READER_H
#define DORIS_BE_SRC_OLAP_ROWSET_RUN_LENGTH_INTEGER_READER_H

#include "olap/file_stream.h"
#include "olap/base_stream.h"
#include "olap/rowset/run_length_integer_writer.h"
#include "olap/stream_index_reader.h"
#include "olap/olap_define.h"
#include "util/runtime_profile.h"

namespace doris {

class ReadOnlyFileStream;
class BaseStream;
class PositionProvider;

class RunLengthIntegerReader {
public:
explicit RunLengthIntegerReader(ReadOnlyFileStream* input, bool is_singed);
explicit RunLengthIntegerReader(BaseStream* input, bool is_singed);
~RunLengthIntegerReader() {}
inline bool has_next() const {
return _used != _num_literals || !_input->eof();
Expand Down Expand Up @@ -63,7 +63,7 @@ class RunLengthIntegerReader {
OLAPStatus _read_direct_values(uint8_t first_byte);
OLAPStatus _read_short_repeat_values(uint8_t first_byte);

ReadOnlyFileStream* _input;
BaseStream* _input;
bool _signed;
int64_t _literals[RunLengthIntegerWriter::MAX_SCOPE];
int32_t _num_literals;
Expand Down
Loading