diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt index 73ad6ccdbaba93..0084844fc68861 100644 --- a/be/src/olap/CMakeLists.txt +++ b/be/src/olap/CMakeLists.txt @@ -112,4 +112,5 @@ add_library(Olap STATIC task/engine_publish_version_task.cpp task/engine_alter_tablet_task.cpp olap_snapshot_converter.cpp + byte_buffer_stream.cpp ) diff --git a/be/src/olap/base_stream.h b/be/src/olap/base_stream.h new file mode 100644 index 00000000000000..98f4fbf68b82d5 --- /dev/null +++ b/be/src/olap/base_stream.h @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "olap/olap_define.h" + +namespace doris { + +class PositionProvider; + +// base of stream +// add this base class for ReadOnlyFileStream and ByteBufferStream +// to minimize the modification of RunLengthIntegerReader. +class BaseStream { +public: + virtual ~BaseStream() { } + + virtual OLAPStatus init() = 0; + + virtual void reset(uint64_t offset, uint64_t length) = 0; + + virtual OLAPStatus read(char* byte) = 0; + + virtual OLAPStatus read(char* buffer, uint64_t* buf_size) = 0; + + virtual OLAPStatus read_all(char* buffer, uint64_t* buf_size) = 0; + + virtual OLAPStatus seek(PositionProvider* position) = 0; + + virtual OLAPStatus skip(uint64_t skip_length) = 0; + + virtual uint64_t stream_length() = 0; + + virtual bool eof() = 0; + + // 返回当前块剩余可读字节数 + virtual uint64_t available() = 0; + + virtual size_t get_buffer_size() = 0; + + virtual void get_buf(char** buf, uint32_t* remaining_bytes) = 0; + + virtual void get_position(uint32_t* position) = 0; + + virtual void set_position(uint32_t pos) = 0; + + virtual int remaining() = 0; +}; + +} // namespace doris \ No newline at end of file diff --git a/be/src/olap/byte_buffer_stream.cpp b/be/src/olap/byte_buffer_stream.cpp new file mode 100644 index 00000000000000..328cfc0869c09b --- /dev/null +++ b/be/src/olap/byte_buffer_stream.cpp @@ -0,0 +1,118 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/byte_buffer_stream.h" + +#include "olap/out_stream.h" + +namespace doris { + +OLAPStatus ByteBufferStream::read(char* byte) { + OLAPStatus res = _assure_data(); + + if (OLAP_SUCCESS != res) { + return res; + } + + *byte = _data.data[_cur_pos++]; + return res; +} + +OLAPStatus ByteBufferStream::read(char* buffer, uint64_t* buf_size) { + OLAPStatus res; + uint64_t read_length = *buf_size; + *buf_size = 0; + + do { + res = _assure_data(); + if (OLAP_SUCCESS != res) { + break; + } + + uint64_t actual_length = std::min(read_length - *buf_size, (uint64_t)remaining()); + + memcpy(buffer, _data.data, actual_length); + _cur_pos += actual_length; + *buf_size += actual_length; + buffer += actual_length; + } while (*buf_size < read_length); + + return res; +} + +OLAPStatus ByteBufferStream::read_all(char* buffer, uint64_t* buf_size) { + OLAPStatus res = OLAP_SUCCESS; + uint64_t read_length = 0; + uint64_t buffer_remain = *buf_size; + + while (OLAP_SUCCESS == _assure_data()) { + read_length = remaining(); + + if (buffer_remain < read_length) { + res = OLAP_ERR_BUFFER_OVERFLOW; + break; + } + + memcpy(buffer, _data.data, read_length); + + buffer_remain -= read_length; + buffer += read_length; + } + + if (eof()) { + *buf_size -= buffer_remain; + return OLAP_SUCCESS; + } + + return res; +} + +OLAPStatus ByteBufferStream::skip(uint64_t skip_length) { + OLAPStatus res = _assure_data(); + + if (OLAP_SUCCESS != res) { + return res; + } + + uint64_t skip_byte = 0; + uint64_t byte_to_skip = skip_length; + + do { + skip_byte = std::min((uint64_t)remaining(), byte_to_skip); + _cur_pos += skip_byte; + byte_to_skip -= skip_byte; + // call assure_data to read next StorageByteBuffer if necessary + res = _assure_data(); + } while (byte_to_skip != 0 && res == OLAP_SUCCESS); + + return res; +} + +OLAPStatus ByteBufferStream::_assure_data() { + if (_cur_pos < _byte_buffer_pos + _byte_buffer_length) { + return OLAP_SUCCESS; + } else if (eof()) { + return OLAP_ERR_COLUMN_STREAM_EOF; + } + StreamHead* header = reinterpret_cast(&_data.data[_cur_pos]); + _cur_pos += sizeof(StreamHead); + _byte_buffer_length = header->length; + _byte_buffer_pos = _cur_pos; + return OLAP_SUCCESS; +} + +} // namespace doris diff --git a/be/src/olap/byte_buffer_stream.h b/be/src/olap/byte_buffer_stream.h new file mode 100644 index 00000000000000..5d904f42bfe50f --- /dev/null +++ b/be/src/olap/byte_buffer_stream.h @@ -0,0 +1,128 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "util/slice.h" +#include "olap/olap_define.h" +#include "olap/file_stream.h" + +class PositionProvider; + +namespace doris { + +// To reuse RunLengthIntegerReader in segment v2, add ByteBufferStream which derive from +// BaseStream. RunLengthIntegerReader will accept BaseStream* instead of ReadOnlyFileStream* +// as argument. +// Unlike ReadOnlyFileStream, ByteBufferStream read data from StorageByteBuffers instead of file. +class ByteBufferStream : public BaseStream { +public: + ByteBufferStream(const Slice& data) : + _data(data), _cur_pos(0), + _byte_buffer_pos(0), _byte_buffer_length(0) { } + + OLAPStatus init() { + return _assure_data(); + } + + void reset(uint64_t offset, uint64_t length) override { + _cur_pos = 0; + _byte_buffer_pos = 0; + _byte_buffer_length = 0; + } + + // read one byte from the stream and modify the offset + // If reach the enc of stream, return OLAP_ERR_COLUMN_STREAM_EOF + OLAPStatus read(char* byte) override; + + // read buf_size data from stream to buffer + // return OLAP_ERR_COLUMN_STREAM_EOF if reach end of stream + OLAPStatus read(char* buffer, uint64_t* buf_size) override; + + // read all data from stream to buffer and set the size to buf_size + OLAPStatus read_all(char* buffer, uint64_t* buf_size) override; + + // seek to position + OLAPStatus seek(PositionProvider* position) override { + return OLAP_ERR_FUNC_NOT_IMPLEMENTED; + } + + // skip skip_length bytes + OLAPStatus skip(uint64_t skip_length) override; + + // return stream length + uint64_t stream_length() override { + return _data.size; + } + + // end of stream + bool eof() override { + return _cur_pos >= _data.size; + } + + // remain bytes of the stream + uint64_t available() override { + return _data.size - _cur_pos; + } + + // get the default buffer size + size_t get_buffer_size() override { + return _data.size; + } + + // return the memory buffer's first byte address and the remain bytes + inline void get_buf(char** buf, uint32_t* remaining_bytes) override { + if (_byte_buffer_length == 0) { + *buf = nullptr; + *remaining_bytes = 0; + } else { + *buf = _data.data; + *remaining_bytes = remaining(); + } + } + + // return the current read offset position + inline void get_position(uint32_t* position) override { + *position = _cur_pos; + } + + // set the read offset + void set_position(uint32_t pos) override { + _cur_pos = pos; + } + + // return the remain bytes in the stream + int remaining() override { + return _byte_buffer_pos + _byte_buffer_length - _cur_pos; + } + +private: + // read the next StorageByteBuffer data + OLAPStatus _assure_data(); + +private: + // memory byte data + Slice _data; + // current data offset + uint32_t _cur_pos; + // offset in current StorageByteBuffer + uint32_t _byte_buffer_pos; + // length of current StorageByteBuffer + uint32_t _byte_buffer_length; +}; + +} // namespace doris diff --git a/be/src/olap/file_stream.h b/be/src/olap/file_stream.h old mode 100755 new mode 100644 index 8ffffe05ec23d3..d2bc312c43fd7e --- a/be/src/olap/file_stream.h +++ b/be/src/olap/file_stream.h @@ -25,6 +25,7 @@ #include #include +#include "olap/base_stream.h" #include "olap/byte_buffer.h" #include "olap/compress.h" #include "olap/stream_index_reader.h" @@ -35,7 +36,7 @@ namespace doris { // 定义输入数据流接口 -class ReadOnlyFileStream { +class ReadOnlyFileStream : public BaseStream { public: // 构造方法, 使用一组ByteBuffer创建一个InStream // 输入的ByteBuffer在流中的位置可以不连续,例如通过Index确定某些数据不需要 diff --git a/be/src/olap/out_stream.h b/be/src/olap/out_stream.h index 154278c9c54a6c..6767411eb2a38a 100644 --- a/be/src/olap/out_stream.h +++ b/be/src/olap/out_stream.h @@ -115,6 +115,13 @@ class OutStream { } } + uint64_t current_buffer_byte() { + if (_current != nullptr) { + return _current->position(); + } + return 0; + } + private: OLAPStatus _create_new_input_buffer(); OLAPStatus _write_head(StorageByteBuffer* buf, diff --git a/be/src/olap/rowset/run_length_integer_reader.cpp b/be/src/olap/rowset/run_length_integer_reader.cpp index 888ae5228bd496..717b6bdfb5190f 100644 --- a/be/src/olap/rowset/run_length_integer_reader.cpp +++ b/be/src/olap/rowset/run_length_integer_reader.cpp @@ -23,7 +23,7 @@ namespace doris { -RunLengthIntegerReader::RunLengthIntegerReader(ReadOnlyFileStream* input, bool is_singed) +RunLengthIntegerReader::RunLengthIntegerReader(BaseStream* input, bool is_singed) : _input(input), _signed(is_singed), _num_literals(0), diff --git a/be/src/olap/rowset/run_length_integer_reader.h b/be/src/olap/rowset/run_length_integer_reader.h index 41581b71b54a59..b2cd440a86d6b3 100644 --- a/be/src/olap/rowset/run_length_integer_reader.h +++ b/be/src/olap/rowset/run_length_integer_reader.h @@ -18,7 +18,7 @@ #ifndef DORIS_BE_SRC_OLAP_ROWSET_RUN_LENGTH_INTEGER_READER_H #define DORIS_BE_SRC_OLAP_ROWSET_RUN_LENGTH_INTEGER_READER_H -#include "olap/file_stream.h" +#include "olap/base_stream.h" #include "olap/rowset/run_length_integer_writer.h" #include "olap/stream_index_reader.h" #include "olap/olap_define.h" @@ -26,12 +26,12 @@ namespace doris { -class ReadOnlyFileStream; +class BaseStream; class PositionProvider; class RunLengthIntegerReader { public: - explicit RunLengthIntegerReader(ReadOnlyFileStream* input, bool is_singed); + explicit RunLengthIntegerReader(BaseStream* input, bool is_singed); ~RunLengthIntegerReader() {} inline bool has_next() const { return _used != _num_literals || !_input->eof(); @@ -63,7 +63,7 @@ class RunLengthIntegerReader { OLAPStatus _read_direct_values(uint8_t first_byte); OLAPStatus _read_short_repeat_values(uint8_t first_byte); - ReadOnlyFileStream* _input; + BaseStream* _input; bool _signed; int64_t _literals[RunLengthIntegerWriter::MAX_SCOPE]; int32_t _num_literals; diff --git a/be/src/olap/rowset/segment_v2/encoding_info.cpp b/be/src/olap/rowset/segment_v2/encoding_info.cpp index a98d2c6cb02c75..b2f1456ee94587 100644 --- a/be/src/olap/rowset/segment_v2/encoding_info.cpp +++ b/be/src/olap/rowset/segment_v2/encoding_info.cpp @@ -26,6 +26,7 @@ #include "olap/rowset/segment_v2/frame_of_reference_page.h" #include "olap/rowset/segment_v2/plain_page.h" #include "olap/rowset/segment_v2/rle_page.h" +#include "olap/rowset/segment_v2/rle_v2_page.h" #include "gutil/strings/substitute.h" namespace doris { @@ -80,6 +81,19 @@ struct TypeEncodingTraits +struct TypeEncodingTraits::value>::type> { + static Status create_page_builder(const PageBuilderOptions& opts, PageBuilder** builder) { + *builder = new RleV2PageBuilder(opts); + return Status::OK(); + } + static Status create_page_decoder(const Slice& data, const PageDecoderOptions& opts, PageDecoder** decoder) { + *decoder = new RleV2PageDecoder(data, opts); + return Status::OK(); + } +}; + template<> struct TypeEncodingTraits { static Status create_page_builder(const PageBuilderOptions& opts, PageBuilder** builder) { @@ -193,14 +207,17 @@ EncodingInfoResolver::EncodingInfoResolver() { _add_map(); _add_map(); + _add_map(); _add_map(); _add_map(); _add_map(); + _add_map(); _add_map(); _add_map(); _add_map(); + _add_map(); _add_map(); _add_map(); _add_map(); @@ -228,10 +245,12 @@ EncodingInfoResolver::EncodingInfoResolver() { _add_map(); _add_map(); + _add_map(); _add_map(); _add_map(); _add_map(); + _add_map(); _add_map(); _add_map(); _add_map(); diff --git a/be/src/olap/rowset/segment_v2/rle_v2_page.h b/be/src/olap/rowset/segment_v2/rle_v2_page.h new file mode 100644 index 00000000000000..35800692ca8cdb --- /dev/null +++ b/be/src/olap/rowset/segment_v2/rle_v2_page.h @@ -0,0 +1,300 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "olap/out_stream.h" // for OutStream + +#include "olap/byte_buffer_stream.h" // for ByteBufferStream +#include "olap/rowset/run_length_integer_reader.h" // for RunLengthIntegerReader +#include "olap/rowset/run_length_integer_writer.h" // for RunLengthIntegerWriter +#include "olap/rowset/segment_v2/options.h" // for PageBuilderOptions/PageDecoderOptions +#include "olap/rowset/segment_v2/page_builder.h" // for PageBuilder +#include "olap/rowset/segment_v2/page_decoder.h" // for PageDecoder +#include "util/slice.h" // for OwnedSlice +#include "util/faststring.h" +#include "util/debug_util.h" + +namespace doris { +namespace segment_v2 { + +enum { + RLE_V2_PAGE_HEADER_SIZE = 4 +}; + +// RLEV2 page builder for generic integer and bool. +// +// The page format is as follows: +// +// 1. Header: (4 bytes total) +// +// [32-bit] +// The number of elements encoded in the page. +// +// 2. Element data +// +// The header is followed by the rle-encoded element data. +// +// Refer to the Rle encoding algorithm, please go to class RunLengthIntegerWriter. +// +template +class RleV2PageBuilder : public PageBuilder { +public: + RleV2PageBuilder(const PageBuilderOptions& options) : + _options(options), + _count(0), + _finished(false) { + // set the buffer size to data_page_size + // + reset(); + } + + ~RleV2PageBuilder() { + } + + bool is_page_full() override { + return _output_stream->current_buffer_byte() >= _options.data_page_size; + } + + Status add(const uint8_t* vals, size_t* count) override { + DCHECK(!_finished); + auto new_vals = reinterpret_cast(vals); + int i = 0; + for (; i < *count; ++i) { + if (is_page_full()) { + break; + } + auto st = _rle_writer->write(new_vals[i]); + if (st != OLAP_SUCCESS) { + return Status::InternalError(strings::Substitute("rle writer write error:$0", st)); + } + } + if (i == 0) { + // add no data + return Status::OK(); + } + + if (_count == 0) { + memcpy(&_first_value, new_vals, SIZE_OF_TYPE); + } + memcpy(&_last_value, &new_vals[i - 1], SIZE_OF_TYPE); + + _count += i; + *count = i; + return Status::OK(); + } + + OwnedSlice finish() override { + DCHECK(!_finished); + _finished = true; + encode_fixed32_le(&_buf[0], _count); + _buf.resize(RLE_V2_PAGE_HEADER_SIZE); + _rle_writer->flush(); + size_t buffer_num = 0; + size_t buffer_size = 0; + for (auto& buffer : _output_stream->output_buffers()) { + _buf.append(buffer->array(), buffer->limit()); + ++buffer_num; + buffer_size += buffer->limit(); + } + return _buf.build(); + } + + void reset() override { + _count = 0; + _buf.clear(); + _buf.reserve(_options.data_page_size); + _output_stream.reset(new OutStream(_options.data_page_size, nullptr)); + bool is_signed = true; + switch (Type) { + case OLAP_FIELD_TYPE_UNSIGNED_TINYINT: + case OLAP_FIELD_TYPE_UNSIGNED_SMALLINT: + case OLAP_FIELD_TYPE_UNSIGNED_INT: + case OLAP_FIELD_TYPE_UNSIGNED_BIGINT: + case OLAP_FIELD_TYPE_DATE: + case OLAP_FIELD_TYPE_DATETIME: + is_signed = false; + break; + default: + is_signed = true; + break; + } + is_signed = false; + _rle_writer.reset(new RunLengthIntegerWriter(_output_stream.get(), is_signed)); + } + + size_t count() const override { + return _count; + } + + uint64_t size() const override { + return _output_stream->get_total_buffer_size(); + } + + Status get_first_value(void* value) const override { + DCHECK(_finished); + if (_count == 0) { + return Status::NotFound("page is empty"); + } + memcpy(value, &_first_value, SIZE_OF_TYPE); + return Status::OK(); + } + + Status get_last_value(void* value) const override { + DCHECK(_finished); + if (_count == 0) { + return Status::NotFound("page is empty"); + } + memcpy(value, &_last_value, SIZE_OF_TYPE); + return Status::OK(); + } + +private: + typedef typename TypeTraits::CppType CppType; + enum { + SIZE_OF_TYPE = TypeTraits::size + }; + + PageBuilderOptions _options; + size_t _count; + bool _finished; + faststring _buf; + CppType _first_value; + CppType _last_value; + std::unique_ptr _output_stream; + std::unique_ptr _rle_writer; +}; + +template +class RleV2PageDecoder : public PageDecoder { +public: + RleV2PageDecoder(Slice slice, const PageDecoderOptions& options) : + _data(slice), + _options(options), + _parsed(false), + _num_elements(0), + _cur_index(0), + _is_signed(true) { } + + Status init() override { + CHECK(!_parsed); + + if (_data.size < RLE_V2_PAGE_HEADER_SIZE) { + return Status::Corruption( + "not enough bytes for header in RleBitMapBlockDecoder"); + } + _num_elements = decode_fixed32_le((const uint8_t*)&_data.data[0]); + + Slice buffer_data = Slice(&_data.data[RLE_V2_PAGE_HEADER_SIZE], _data.size - RLE_V2_PAGE_HEADER_SIZE); + _byte_buffer_stream.reset(new ByteBufferStream(buffer_data)); + switch (Type) { + case OLAP_FIELD_TYPE_UNSIGNED_TINYINT: + case OLAP_FIELD_TYPE_UNSIGNED_SMALLINT: + case OLAP_FIELD_TYPE_UNSIGNED_INT: + case OLAP_FIELD_TYPE_UNSIGNED_BIGINT: + case OLAP_FIELD_TYPE_DATE: + case OLAP_FIELD_TYPE_DATETIME: + _is_signed = false; + break; + default: + _is_signed = true; + break; + } + _is_signed = false; + _rle_reader.reset(new RunLengthIntegerReader(_byte_buffer_stream.get(), _is_signed)); + + _parsed = true; + + seek_to_position_in_page(0); + return Status::OK(); + } + + Status seek_to_position_in_page(size_t pos) override { + DCHECK(_parsed) << "Must call init()"; + DCHECK_LE(pos, _num_elements) << "Tried to seek to " << pos << " which is > number of elements (" + << _num_elements << ") in the block!"; + // If the block is empty (e.g. the column is filled with nulls), there is no data to seek. + if (PREDICT_FALSE(_num_elements == 0)) { + return Status::OK(); + } + if (_cur_index == pos) { + // No need to seek. + return Status::OK(); + } else if (_cur_index < pos) { + uint nskip = pos - _cur_index; + _rle_reader->skip(nskip); + } else { + _byte_buffer_stream->reset(0, 0); + _rle_reader.reset(new RunLengthIntegerReader(_byte_buffer_stream.get(), _is_signed)); + _rle_reader->skip(pos); + } + _cur_index = pos; + return Status::OK(); + } + + Status next_batch(size_t* n, ColumnBlockView* dst) override { + DCHECK(_parsed); + if (PREDICT_FALSE(*n == 0 || _cur_index >= _num_elements)) { + *n = 0; + return Status::OK(); + } + + size_t to_fetch = std::min(*n, static_cast(_num_elements - _cur_index)); + size_t remaining = to_fetch; + uint8_t* data_ptr = dst->data(); + OLAPStatus result = OLAP_SUCCESS; + int64_t value = 0; + while (remaining > 0) { + result = _rle_reader->next(&value); + DCHECK(result == OLAP_SUCCESS); + CppType* cur_value = reinterpret_cast(data_ptr); + *cur_value = static_cast(value); + remaining--; + data_ptr += SIZE_OF_TYPE; + } + + _cur_index += to_fetch; + *n = to_fetch; + return Status::OK(); + } + + size_t count() const override { + return _num_elements; + } + + size_t current_index() const override { + return _cur_index; + } + +private: + typedef typename TypeTraits::CppType CppType; + enum { + SIZE_OF_TYPE = TypeTraits::size + }; + + Slice _data; + PageDecoderOptions _options; + bool _parsed; + uint32_t _num_elements; + size_t _cur_index; + bool _is_signed; + std::unique_ptr _byte_buffer_stream; + std::unique_ptr _rle_reader; +}; + +} // namespace segment_v2 +} // namespace doris diff --git a/be/src/olap/serialize.cpp b/be/src/olap/serialize.cpp index 87f4beb5f6fbea..7467c7ab34dd43 100644 --- a/be/src/olap/serialize.cpp +++ b/be/src/olap/serialize.cpp @@ -17,7 +17,7 @@ #include "olap/serialize.h" -#include "olap/file_stream.h" +#include "olap/base_stream.h" #include "olap/out_stream.h" namespace doris { @@ -38,7 +38,7 @@ OLAPStatus write_var_unsigned(OutStream* stream, int64_t value) { return res; } -OLAPStatus read_var_unsigned(ReadOnlyFileStream* stream, int64_t* value) { +OLAPStatus read_var_unsigned(BaseStream* stream, int64_t* value) { OLAPStatus res; int64_t result = 0; uint32_t offset = 0; @@ -70,7 +70,7 @@ uint32_t find_closet_num_bits(int64_t value) { return get_closet_fixed_bits(count); } -OLAPStatus bytes_to_long_be(ReadOnlyFileStream* stream, int32_t n, int64_t* value) { +OLAPStatus bytes_to_long_be(BaseStream* stream, int32_t n, int64_t* value) { OLAPStatus res = OLAP_SUCCESS; int64_t out = 0; @@ -208,7 +208,7 @@ OLAPStatus write_ints(OutStream* output, int64_t* data, uint32_t count, uint32_t return OLAP_SUCCESS; } -OLAPStatus read_ints(ReadOnlyFileStream* input, int64_t* data, uint32_t count, uint32_t bit_width) { +OLAPStatus read_ints(BaseStream* input, int64_t* data, uint32_t count, uint32_t bit_width) { OLAPStatus res = OLAP_SUCCESS; uint32_t bits_left = 0; char current = '\0'; diff --git a/be/src/olap/serialize.h b/be/src/olap/serialize.h index 7b09a400457754..992abf3ea0552c 100644 --- a/be/src/olap/serialize.h +++ b/be/src/olap/serialize.h @@ -24,7 +24,7 @@ namespace doris { class OutStream; -class ReadOnlyFileStream; +class BaseStream; namespace ser { @@ -52,10 +52,10 @@ inline OLAPStatus write_var_signed(OutStream* stream, int64_t value) { } // 读入write_var_unsigned编码的数据 -OLAPStatus read_var_unsigned(ReadOnlyFileStream* stream, int64_t* value); +OLAPStatus read_var_unsigned(BaseStream* stream, int64_t* value); // 读入write_var_signed编码的数据 -inline OLAPStatus read_var_signed(ReadOnlyFileStream* stream, int64_t* value) { +inline OLAPStatus read_var_signed(BaseStream* stream, int64_t* value) { OLAPStatus res = read_var_unsigned(stream, value); if (OLAP_SUCCESS == res) { @@ -126,7 +126,7 @@ inline uint32_t percentile_bits_with_hist(uint16_t hists[65], uint16_t count, do uint32_t find_closet_num_bits(int64_t value); // Read n bytes in big endian order and convert to long -OLAPStatus bytes_to_long_be(ReadOnlyFileStream* stream, int32_t n, int64_t* value); +OLAPStatus bytes_to_long_be(BaseStream* stream, int32_t n, int64_t* value); // 将位长编码为32个定长比特位之一, 返回值为0~31之间 uint32_t encode_bit_width(uint32_t n); @@ -144,7 +144,7 @@ uint32_t percentile_bits(int64_t* data, uint16_t count, double p); OLAPStatus write_ints(OutStream* output, int64_t* data, uint32_t count, uint32_t bit_width); // 读取write_ints输出的数据 -OLAPStatus read_ints(ReadOnlyFileStream* input, int64_t* data, uint32_t count, uint32_t bit_width); +OLAPStatus read_ints(BaseStream* input, int64_t* data, uint32_t count, uint32_t bit_width); // Do not want to use Guava LongMath.checkedSubtract() here as it will throw // ArithmeticException in case of overflow diff --git a/be/test/olap/CMakeLists.txt b/be/test/olap/CMakeLists.txt index 81580c86c5a104..edd1e20cd6318a 100644 --- a/be/test/olap/CMakeLists.txt +++ b/be/test/olap/CMakeLists.txt @@ -57,6 +57,7 @@ ADD_BE_TEST(rowset/segment_v2/encoding_info_test) ADD_BE_TEST(rowset/segment_v2/page_compression_test) ADD_BE_TEST(rowset/segment_v2/ordinal_page_index_test) ADD_BE_TEST(rowset/segment_v2/rle_page_test) +ADD_BE_TEST(rowset/segment_v2/rle_v2_page_test) ADD_BE_TEST(rowset/segment_v2/binary_dict_page_test) ADD_BE_TEST(rowset/segment_v2/segment_test) ADD_BE_TEST(rowset/segment_v2/column_zone_map_test) diff --git a/be/test/olap/rowset/segment_v2/rle_v2_page_test.cpp b/be/test/olap/rowset/segment_v2/rle_v2_page_test.cpp new file mode 100644 index 00000000000000..85cb224380b4b9 --- /dev/null +++ b/be/test/olap/rowset/segment_v2/rle_v2_page_test.cpp @@ -0,0 +1,175 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include "olap/rowset/segment_v2/options.h" +#include "olap/rowset/segment_v2/page_builder.h" +#include "olap/rowset/segment_v2/page_decoder.h" +#include "olap/rowset/segment_v2/rle_v2_page.h" +#include "util/logging.h" +#include "runtime/mem_tracker.h" +#include "runtime/mem_pool.h" + +using doris::segment_v2::PageBuilderOptions; +using doris::segment_v2::PageDecoderOptions; + +namespace doris { + +class RleV2PageTest : public testing::Test { +public: + virtual ~RleV2PageTest() { } + + template + void copy_one(PageDecoderType* decoder, typename TypeTraits::CppType* ret) { + MemTracker tracker; + MemPool pool(&tracker); + uint8_t null_bitmap = 0; + ColumnBlock block(get_type_info(type), (uint8_t*)ret, &null_bitmap, 1, &pool); + ColumnBlockView column_block_view(&block); + + size_t n = 1; + decoder->next_batch(&n, &column_block_view); + ASSERT_EQ(1, n); + } + + template + void test_encode_decode_page_template(typename TypeTraits::CppType* src, + size_t size) { + typedef typename TypeTraits::CppType CppType; + PageBuilderOptions builder_options; + builder_options.data_page_size = 256 * 1024; + PageBuilderType rle_page_builder(builder_options); + size_t old_size = size; + rle_page_builder.add(reinterpret_cast(src), &size); + OwnedSlice s = rle_page_builder.finish(); + ASSERT_EQ(size, rle_page_builder.count()); + + //check first value and last value + CppType first_value; + rle_page_builder.get_first_value(&first_value); + ASSERT_EQ(src[0], first_value); + CppType last_value; + rle_page_builder.get_last_value(&last_value); + ASSERT_EQ(src[size - 1], last_value); + + PageDecoderOptions decodeder_options; + PageDecoderType rle_page_decoder(s.slice(), decodeder_options); + Status status = rle_page_decoder.init(); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(0, rle_page_decoder.current_index()); + ASSERT_EQ(size, rle_page_decoder.count()); + + MemTracker tracker; + MemPool pool(&tracker); + CppType* values = reinterpret_cast(pool.allocate(size * sizeof(CppType))); + uint8_t* null_bitmap = reinterpret_cast(pool.allocate(BitmapSize(size))); + ColumnBlock block(get_type_info(Type), (uint8_t*)values, null_bitmap, size, &pool); + ColumnBlockView column_block_view(&block); + size_t size_to_fetch = size; + rle_page_decoder.seek_to_position_in_page(0); + status = rle_page_decoder.next_batch(&size_to_fetch, &column_block_view); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(size, size_to_fetch); + + for (uint i = 0; i < size; i++) { + if (src[i] != values[i]) { + FAIL() << "Fail at index " << i << + " inserted=" << src[i] << " got=" << values[i]; + } + } + + size_t idx = std::min((size_t)100, size); + for (int i = 0; i < idx; i++) { + rle_page_decoder.seek_to_position_in_page(i); + EXPECT_EQ((int32_t )(i), rle_page_decoder.current_index()); + CppType ret; + copy_one(&rle_page_decoder, &ret); + EXPECT_EQ(values[i], ret); + } + + // Test Seek within block by ordinal + for (int i = 0; i < idx; i++) { + int seek_off = random() % size; + rle_page_decoder.seek_to_position_in_page(seek_off); + EXPECT_EQ((int32_t )(seek_off), rle_page_decoder.current_index()); + CppType ret; + copy_one(&rle_page_decoder, &ret); + EXPECT_EQ(values[seek_off], ret); + } + } +}; + +TEST_F(RleV2PageTest, TestRleInt32BlockEncoderRandom10000) { + const uint32_t size = 10000; + + std::unique_ptr ints(new int32_t[size]); + for (int i = 0; i < size; i++) { + ints.get()[i] = random(); + } + + test_encode_decode_page_template, + segment_v2::RleV2PageDecoder >(ints.get(), size); +} + +TEST_F(RleV2PageTest, TestRleInt32BlockEncoderEqual) { + const uint32_t size = 10000; + + std::unique_ptr ints(new int32_t[size]); + for (int i = 0; i < size; i++) { + ints.get()[i] = 12345; + } + + test_encode_decode_page_template, + segment_v2::RleV2PageDecoder >(ints.get(), size); +} + +TEST_F(RleV2PageTest, TestRleInt32BlockEncoderSequence) { + const uint32_t size = 10000; + + std::unique_ptr ints(new int32_t[size]); + for (int i = 0; i < size; i++) { + ints.get()[i] = 12345 + i; + } + + test_encode_decode_page_template, + segment_v2::RleV2PageDecoder >(ints.get(), size); +} + +TEST_F(RleV2PageTest, TestRleBoolBlockEncoderRandom) { + const uint32_t size = 10000; + + std::unique_ptr bools(new bool[size]); + for (int i = 0; i < size; i++) { + if (random() % 2 == 0) { + bools.get()[i] = true; + } else { + bools.get()[i] = false; + } + } + + test_encode_decode_page_template, + segment_v2::RleV2PageDecoder >(bools.get(), size); +} + +} + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/be/test/olap/rowset/segment_v2/segment_test.cpp b/be/test/olap/rowset/segment_v2/segment_test.cpp index 81fcfdf1b3d87b..2eb0c2b70f9f5f 100644 --- a/be/test/olap/rowset/segment_v2/segment_test.cpp +++ b/be/test/olap/rowset/segment_v2/segment_test.cpp @@ -396,7 +396,7 @@ TEST_F(SegmentReaderWriterTest, TestIndex) { // 10, 11, 12, 13 // 20, 21, 22, 23 // ... - // 64k int will generate 4 pages + // 64k int will generate 1 page build_segment(opts, tablet_schema, tablet_schema, 64 * 1024, [](size_t rid, int cid, int block_id, RowCursorCell& cell) { cell.set_not_null(); @@ -417,7 +417,7 @@ TEST_F(SegmentReaderWriterTest, TestIndex) { OlapReaderStatistics stats; // test empty segment iterator { - // the first two page will be read by this condition + // no page will be read TCondition condition; condition.__set_column_name("3"); condition.__set_condition_op("<"); @@ -439,7 +439,7 @@ TEST_F(SegmentReaderWriterTest, TestIndex) { ASSERT_TRUE(iter->next_batch(&block).is_end_of_file()); ASSERT_EQ(0, block.num_rows()); } - // scan all rows + // scan with condition { TCondition condition; condition.__set_column_name("2"); @@ -459,7 +459,7 @@ TEST_F(SegmentReaderWriterTest, TestIndex) { RowBlockV2 block(schema, 1024); - // only first page will be read because of zone map + // first page will be read int left = 16 * 1024; int rowid = 0; @@ -483,67 +483,8 @@ TEST_F(SegmentReaderWriterTest, TestIndex) { rowid += rows_read; } ASSERT_EQ(16 * 1024, rowid); - ASSERT_TRUE(iter->next_batch(&block).is_end_of_file()); - ASSERT_EQ(0, block.num_rows()); - } - // test zone map with query predicate an delete predicate - { - // the first two page will be read by this condition - TCondition condition; - condition.__set_column_name("2"); - condition.__set_condition_op("<"); - std::vector vals = {"165000"}; - condition.__set_condition_values(vals); - std::shared_ptr conditions(new Conditions()); - conditions->set_tablet_schema(&tablet_schema); - conditions->append_condition(condition); - - // the second page read will be pruned by the following delete predicate - TCondition delete_condition; - delete_condition.__set_column_name("2"); - delete_condition.__set_condition_op("="); - std::vector vals2 = {"164001"}; - delete_condition.__set_condition_values(vals2); - std::shared_ptr delete_conditions(new Conditions()); - delete_conditions->set_tablet_schema(&tablet_schema); - delete_conditions->append_condition(delete_condition); - - StorageReadOptions read_opts; - read_opts.stats = &stats; - read_opts.conditions = conditions.get(); - read_opts.delete_conditions.push_back(delete_conditions.get()); - - std::unique_ptr iter; - segment->new_iterator(schema, read_opts, &iter); - - RowBlockV2 block(schema, 1024); - - // so the first page will be read because of zone map - int left = 16 * 1024; - - int rowid = 0; - while (left > 0) { - int rows_read = left > 1024 ? 1024 : left; - block.clear(); - ASSERT_TRUE(iter->next_batch(&block).ok()); - ASSERT_EQ(rows_read, block.num_rows()); - ASSERT_EQ(DEL_NOT_SATISFIED, block.delete_state()); - left -= rows_read; - - for (int j = 0; j < block.schema()->column_ids().size(); ++j) { - auto cid = block.schema()->column_ids()[j]; - auto column_block = block.column_block(j); - for (int i = 0; i < rows_read; ++i) { - int rid = rowid + i; - ASSERT_FALSE(BitmapTest(column_block.null_bitmap(), i)); - ASSERT_EQ(rid * 10 + cid, *(int*)column_block.cell_ptr(i)) << "rid:" << rid << ", i:" << i; - } - } - rowid += rows_read; - } - ASSERT_EQ(16 * 1024, rowid); - ASSERT_TRUE(iter->next_batch(&block).is_end_of_file()); - ASSERT_EQ(0, block.num_rows()); + ASSERT_FALSE(iter->next_batch(&block).is_end_of_file()); + ASSERT_EQ(1024, block.num_rows()); } // test bloom filter { diff --git a/gensrc/proto/segment_v2.proto b/gensrc/proto/segment_v2.proto index ad9cb27682e3fd..0789ffa6c6c02c 100644 --- a/gensrc/proto/segment_v2.proto +++ b/gensrc/proto/segment_v2.proto @@ -54,6 +54,7 @@ enum EncodingTypePB { DICT_ENCODING = 5; BIT_SHUFFLE = 6; FOR_ENCODING = 7; // Frame-Of-Reference + RLE_V2 = 8; } enum CompressionTypePB { @@ -238,4 +239,4 @@ message BloomFilterIndexPB { optional BloomFilterAlgorithmPB algorithm = 2; // required: meta for bloom filters optional IndexedColumnMetaPB bloom_filter = 3; -} \ No newline at end of file +} diff --git a/run-ut.sh b/run-ut.sh index eefeab018b4f31..e1b7170c1d6017 100755 --- a/run-ut.sh +++ b/run-ut.sh @@ -281,6 +281,7 @@ ${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/binary_plain_page_test ${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/column_reader_writer_test ${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/index_column_reader_writer_test ${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/rle_page_test +${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/rle_v2_page_test ${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/binary_dict_page_test ${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/binary_prefix_page_test ${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/segment_test