Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions be/src/olap/field.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class Field {

inline void set_to_max(char* buf) const { return _type_info->set_to_max(buf); }
inline void set_to_min(char* buf) const { return _type_info->set_to_min(buf); }
inline char* allocate_value_from_arena(Arena* arena) const { return _type_info->allocate_value_from_arena(arena); }

inline void agg_update(RowCursorCell* dest, const RowCursorCell& src, MemPool* mem_pool = nullptr) const {
_agg_info->update(dest, src, mem_pool);
Expand Down Expand Up @@ -199,6 +200,10 @@ class Field {
_type_info->deep_copy_with_arena(dest, src, arena);
}

inline void direct_copy_content(char* dest, const char* src) const {
_type_info->direct_copy(dest, src);
}

// Copy srouce content to destination in index format.
template<typename DstCellType, typename SrcCellType>
void to_index(DstCellType* dst, const SrcCellType& src) const;
Expand Down
5 changes: 4 additions & 1 deletion be/src/olap/olap_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,12 @@ static const uint64_t OLAP_FIX_HEADER_MAGIC_NUMBER = 0;
// 执行be/ce时默认的候选集大小
static constexpr uint32_t OLAP_COMPACTION_DEFAULT_CANDIDATE_SIZE = 10;

// the max length supported for string type
// the max length supported for varchar type
static const uint16_t OLAP_STRING_MAX_LENGTH = 65535;

//the max length supported for char type
static const uint16_t OLAP_CHAR_MAX_LENGTH = 255;

static const int32_t PREFERRED_SNAPSHOT_VERSION = 3;

// the max bytes for stored string length
Expand Down
13 changes: 10 additions & 3 deletions be/src/olap/rowset/segment_v2/binary_dict_page.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ Slice BinaryDictPageBuilder::finish() {
Slice data_slice = _data_page_builder->finish();
_buffer.append(data_slice.data, data_slice.size);
encode_fixed32_le(&_buffer[0], _encoding_type);
return Slice(_buffer.data(), _buffer.size());
return Slice(_buffer);
}

void BinaryDictPageBuilder::reset() {
Expand Down Expand Up @@ -147,7 +147,6 @@ BinaryDictPageDecoder::BinaryDictPageDecoder(Slice data, const PageDecoderOption
_data(data),
_options(options),
_data_page_decoder(nullptr),
_dict_decoder(options.dict_decoder),
_parsed(false),
_encoding_type(UNKNOWN_ENCODING) { }

Expand All @@ -161,7 +160,6 @@ Status BinaryDictPageDecoder::init() {
_encoding_type = static_cast<EncodingTypePB>(type);
_data.remove_prefix(BINARY_DICT_PAGE_HEADER_SIZE);
if (_encoding_type == DICT_ENCODING) {
DCHECK(_dict_decoder != nullptr) << "dict decoder pointer is nullptr";
_data_page_decoder.reset(new BitShufflePageDecoder<OLAP_FIELD_TYPE_INT>(_data, _options));
} else if (_encoding_type == PLAIN_ENCODING) {
DCHECK_EQ(_encoding_type, PLAIN_ENCODING);
Expand All @@ -180,12 +178,21 @@ Status BinaryDictPageDecoder::seek_to_position_in_page(size_t pos) {
return _data_page_decoder->seek_to_position_in_page(pos);
}

bool BinaryDictPageDecoder::is_dict_encoding() const {
return _encoding_type == DICT_ENCODING;
}

void BinaryDictPageDecoder::set_dict_decoder(PageDecoder* dict_decoder){
_dict_decoder = (BinaryPlainPageDecoder*)dict_decoder;
};

Status BinaryDictPageDecoder::next_batch(size_t* n, ColumnBlockView* dst) {
if (_encoding_type == PLAIN_ENCODING) {
return _data_page_decoder->next_batch(n, dst);
}
// dictionary encoding
DCHECK(_parsed);
DCHECK(_dict_decoder != nullptr) << "dict decoder pointer is nullptr";
if (PREDICT_FALSE(*n == 0)) {
*n = 0;
return Status::OK();
Expand Down
6 changes: 5 additions & 1 deletion be/src/olap/rowset/segment_v2/binary_dict_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,15 @@ class BinaryDictPageDecoder : public PageDecoder {
return _data_page_decoder->current_index();
}

bool is_dict_encoding() const;

void set_dict_decoder(PageDecoder* dict_decoder);

private:
Slice _data;
PageDecoderOptions _options;
std::unique_ptr<PageDecoder> _data_page_decoder;
BinaryPlainPageDecoder* _dict_decoder;
const BinaryPlainPageDecoder* _dict_decoder = nullptr;
bool _parsed;
EncodingTypePB _encoding_type;
faststring _code_buf;
Expand Down
23 changes: 23 additions & 0 deletions be/src/olap/rowset/segment_v2/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "util/crc32c.h"
#include "util/rle_encoding.h" // for RleDecoder
#include "util/block_compression.h"
#include "olap/rowset/segment_v2/binary_dict_page.h" // for BinaryDictPageDecoder

namespace doris {
namespace segment_v2 {
Expand Down Expand Up @@ -168,6 +169,10 @@ void ColumnReader::get_row_ranges_by_zone_map(CondColumn* cond_column,
_calculate_row_ranges(page_indexes, row_ranges);
}

PagePointer ColumnReader::get_dict_page_pointer() const {
return _meta.dict_page();
}

void ColumnReader::_get_filtered_pages(CondColumn* cond_column,
const std::vector<CondColumn*>& delete_conditions, std::vector<uint32_t>* page_indexes) {
FieldType type = _type_info->type();
Expand Down Expand Up @@ -426,6 +431,24 @@ Status FileColumnIterator::_read_page(const OrdinalPageIndexIterator& iter, Pars
RETURN_IF_ERROR(_reader->encoding_info()->create_page_decoder(data, options, &page->data_decoder));
RETURN_IF_ERROR(page->data_decoder->init());

// lazy init dict_encoding'dict for three reasons
// 1. a column use dictionary encoding still has non-dict-encoded data pages are seeked,load dict when necessary
// 2. ColumnReader which is owned by Segment and Rowset can being alive even when there is no query,it should retain memory as small as possible.
// 3. Iterators of the same column won't repeat load the dict page because of page cache.
if (_reader->encoding_info()->encoding() == DICT_ENCODING) {
BinaryDictPageDecoder* binary_dict_page_decoder = (BinaryDictPageDecoder*)page->data_decoder;
if (binary_dict_page_decoder->is_dict_encoding()) {
if (_dict_decoder == nullptr) {
PagePointer pp = _reader->get_dict_page_pointer();
RETURN_IF_ERROR(_reader->read_page(pp, &_dict_page_handle));

_dict_decoder.reset(new BinaryPlainPageDecoder(_dict_page_handle.data()));
RETURN_IF_ERROR(_dict_decoder->init());
}
binary_dict_page_decoder->set_dict_decoder(_dict_decoder.get());
}
}

page->offset_in_page = 0;

return Status::OK();
Expand Down
9 changes: 9 additions & 0 deletions be/src/olap/rowset/segment_v2/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "olap/rowset/segment_v2/ordinal_page_index.h" // for OrdinalPageIndexIterator
#include "olap/rowset/segment_v2/column_zone_map.h" // for ColumnZoneMap
#include "olap/rowset/segment_v2/row_ranges.h" // for RowRanges
#include "olap/rowset/segment_v2/page_handle.h" // for PageHandle

namespace doris {

Expand Down Expand Up @@ -85,6 +86,8 @@ class ColumnReader {
void get_row_ranges_by_zone_map(CondColumn* cond_column,
const std::vector<CondColumn*>& delete_conditions, RowRanges* row_ranges);

PagePointer get_dict_page_pointer() const;

private:
Status _init_ordinal_index();

Expand Down Expand Up @@ -189,6 +192,12 @@ class FileColumnIterator : public ColumnIterator {
// 3. When _page is null, it means that this reader can not be read.
std::unique_ptr<ParsedPage> _page;

// keep dict page decoder
std::unique_ptr<PageDecoder> _dict_decoder;

// keep dict page handle to avoid released
PageHandle _dict_page_handle;

// page iterator used to get next page when current page is finished.
// This value will be reset when a new seek is issued
OrdinalPageIndexIterator _page_iter;
Expand Down
11 changes: 11 additions & 0 deletions be/src/olap/rowset/segment_v2/column_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,14 @@ Status ColumnWriter::write_data() {
RETURN_IF_ERROR(_write_data_page(page));
page = page->next;
}
// write column dict
if (_encoding_info->encoding() == DICT_ENCODING) {
Slice dict_page;
_page_builder->get_dictionary_page(&dict_page);
std::vector<Slice> origin_data;
origin_data.push_back(dict_page);
RETURN_IF_ERROR(_write_physical_page(&origin_data, &_dict_page_pp));
}
return Status::OK();
}

Expand Down Expand Up @@ -240,6 +248,9 @@ void ColumnWriter::write_meta(ColumnMetaPB* meta) {
if (_opts.need_zone_map) {
_zone_map_pp.to_proto(meta->mutable_zone_map_page());
}
if (_encoding_info->encoding() == DICT_ENCODING) {
_dict_page_pp.to_proto(meta->mutable_dict_page());
}
}

// write a page into file and update ordinal index
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/rowset/segment_v2/column_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ class ColumnWriter {

PagePointer _ordinal_index_pp;
PagePointer _zone_map_pp;
PagePointer _dict_page_pp;
uint64_t _written_size = 0;
};

Expand Down
14 changes: 5 additions & 9 deletions be/src/olap/rowset/segment_v2/column_zone_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,20 @@ ColumnZoneMapBuilder::ColumnZoneMapBuilder(const TypeInfo* type_info) : _type_in
options.data_page_size = 0;
_page_builder.reset(new BinaryPlainPageBuilder(options));
_field.reset(FieldFactory::create_by_type(_type_info->type()));
_max_string_value = _arena.Allocate(OLAP_STRING_MAX_LENGTH);
_zone_map.min_value = _arena.Allocate(_type_info->size());
_zone_map.max_value = _arena.Allocate(_type_info->size());
_zone_map.min_value = _field->allocate_value_from_arena(&_arena);
_zone_map.max_value = _field->allocate_value_from_arena(&_arena);

_reset_zone_map();
}

Status ColumnZoneMapBuilder::add(const uint8_t *vals, size_t count) {
if (vals != nullptr) {
for (int i = 0; i < count; ++i) {
if (_field->compare(_zone_map.min_value, (char *)vals) > 0) {
_field->deep_copy_content(_zone_map.min_value, (const char *)vals, &_arena);
_field->direct_copy_content(_zone_map.min_value, (const char *)vals);
}
if (_field->compare(_zone_map.max_value, (char *)vals) < 0) {
_field->deep_copy_content(_zone_map.max_value, (const char *)vals, &_arena);
_field->direct_copy_content(_zone_map.max_value, (const char *)vals);
}
vals += _type_info->size();
if (!_zone_map.has_not_null) {
Expand Down Expand Up @@ -78,10 +78,6 @@ Status ColumnZoneMapBuilder::flush() {
}

void ColumnZoneMapBuilder::_reset_zone_map() {
// we should allocate max varchar length and set to max for min value
Slice *min_slice = (Slice *)_zone_map.min_value;
min_slice->data = _max_string_value;
min_slice->size = OLAP_STRING_MAX_LENGTH;
_field->set_to_max(_zone_map.min_value);
_field->set_to_min(_zone_map.max_value);
_zone_map.has_null = false;
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/rowset/segment_v2/column_zone_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ class ColumnZoneMapBuilder {
std::unique_ptr<Field> _field;
// memory will be managed by arena
ZoneMap _zone_map;
char* _max_string_value;
Arena _arena;
};

Expand Down
17 changes: 17 additions & 0 deletions be/src/olap/rowset/segment_v2/encoding_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "olap/olap_common.h"
#include "olap/rowset/segment_v2/bitshuffle_page.h"
#include "olap/rowset/segment_v2/rle_page.h"
#include "olap/rowset/segment_v2/binary_dict_page.h"

namespace doris {
namespace segment_v2 {
Expand Down Expand Up @@ -67,6 +68,18 @@ struct TypeEncodingTraits<type, RLE> {
}
};

template<FieldType type>
struct TypeEncodingTraits<type, DICT_ENCODING> {
static Status create_page_builder(const PageBuilderOptions& opts, PageBuilder** builder) {
*builder = new BinaryDictPageBuilder(opts);
return Status::OK();
}
static Status create_page_decoder(const Slice& data, const PageDecoderOptions& opts, PageDecoder** decoder) {
*decoder = new BinaryDictPageDecoder(data, opts);
return Status::OK();
}
};

template<FieldType Type, EncodingTypePB Encoding>
struct EncodingTraits : TypeEncodingTraits<Type, Encoding> {
static const FieldType type = Type;
Expand Down Expand Up @@ -122,6 +135,10 @@ EncodingInfoResolver::EncodingInfoResolver() {
_add_map<OLAP_FIELD_TYPE_FLOAT, PLAIN_ENCODING>();
_add_map<OLAP_FIELD_TYPE_DOUBLE, BIT_SHUFFLE>();
_add_map<OLAP_FIELD_TYPE_DOUBLE, PLAIN_ENCODING>();
_add_map<OLAP_FIELD_TYPE_CHAR, DICT_ENCODING>();
_add_map<OLAP_FIELD_TYPE_CHAR, PLAIN_ENCODING>();
_add_map<OLAP_FIELD_TYPE_VARCHAR, DICT_ENCODING>();
_add_map<OLAP_FIELD_TYPE_VARCHAR, PLAIN_ENCODING>();
_add_map<OLAP_FIELD_TYPE_BOOL, RLE>();
_add_map<OLAP_FIELD_TYPE_BOOL, BIT_SHUFFLE>();
_add_map<OLAP_FIELD_TYPE_BOOL, PLAIN_ENCODING>();
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/rowset/segment_v2/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ struct PageBuilderOptions {
};

struct PageDecoderOptions {
BinaryPlainPageDecoder* dict_decoder = nullptr;
};

} // namespace segment_v2
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ TypeInfo::TypeInfo(TypeTraitsClass t)
_deep_copy(TypeTraitsClass::deep_copy),
_deep_copy_with_arena(TypeTraitsClass::deep_copy_with_arena),
_direct_copy(TypeTraitsClass::direct_copy),
_allocate_value_from_arena(TypeTraitsClass::allocate_value_from_arena),
_from_string(TypeTraitsClass::from_string),
_to_string(TypeTraitsClass::to_string),
_set_to_max(TypeTraitsClass::set_to_max),
Expand Down
23 changes: 23 additions & 0 deletions be/src/olap/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ class TypeInfo {
_direct_copy(dest, src);
}

inline char* allocate_value_from_arena(Arena* arena) const {
return _allocate_value_from_arena(arena);
}

OLAPStatus from_string(void* buf, const std::string& scan_key) const {
return _from_string(buf, scan_key);
}
Expand All @@ -85,6 +89,7 @@ class TypeInfo {
void (*_deep_copy)(void* dest, const void* src, MemPool* mem_pool);
void (*_deep_copy_with_arena)(void* dest, const void* src, Arena* arena);
void (*_direct_copy)(void* dest, const void* src);
char* (*_allocate_value_from_arena)(Arena* arena);

OLAPStatus (*_from_string)(void* buf, const std::string& scan_key);
std::string (*_to_string)(const void* src);
Expand Down Expand Up @@ -213,6 +218,10 @@ struct BaseFieldtypeTraits : public CppTypeTraits<field_type> {
return HashUtil::hash(data, sizeof(CppType), seed);
}

static inline char* allocate_value_from_arena(Arena* arena) {
return arena->Allocate(sizeof(CppType));
}

static std::string to_string(const void* src) {
std::stringstream stream;
stream << *reinterpret_cast<const CppType*>(src);
Expand Down Expand Up @@ -568,6 +577,13 @@ struct FieldTypeTraits<OLAP_FIELD_TYPE_CHAR> : public BaseFieldtypeTraits<OLAP_F
auto slice = reinterpret_cast<const Slice*>(data);
return HashUtil::hash(slice->data, slice->size, seed);
}
static char* allocate_value_from_arena(Arena* arena) {
char* type_value = arena->Allocate(sizeof(Slice));
auto slice = reinterpret_cast<Slice*>(type_value);
slice->size = OLAP_CHAR_MAX_LENGTH;
slice->data = arena->Allocate(OLAP_CHAR_MAX_LENGTH);
return type_value;
}
};

template<>
Expand All @@ -594,6 +610,13 @@ struct FieldTypeTraits<OLAP_FIELD_TYPE_VARCHAR> : public FieldTypeTraits<OLAP_FI
auto slice = reinterpret_cast<Slice*>(buf);
slice->size = 0;
}
static char* allocate_value_from_arena(Arena* arena) {
char* type_value = arena->Allocate(sizeof(Slice));
auto slice = reinterpret_cast<Slice*>(type_value);
slice->size = OLAP_STRING_MAX_LENGTH;
slice->data = arena->Allocate(OLAP_STRING_MAX_LENGTH);
return type_value;
}
};

template<>
Expand Down
5 changes: 3 additions & 2 deletions be/test/olap/rowset/segment_v2/binary_dict_page_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ class BinaryDictPageTest : public testing::Test {

// decode
PageDecoderOptions decoder_options;
decoder_options.dict_decoder = dict_page_decoder.get();
BinaryDictPageDecoder page_decoder(s, decoder_options);
page_decoder.set_dict_decoder(dict_page_decoder.get());

status = page_decoder.init();
ASSERT_TRUE(status.ok());
ASSERT_EQ(slices.size(), page_decoder.count());
Expand Down Expand Up @@ -154,9 +155,9 @@ class BinaryDictPageTest : public testing::Test {

// decode
PageDecoderOptions decoder_options;
decoder_options.dict_decoder = dict_page_decoder.get();
BinaryDictPageDecoder page_decoder(results[slice_index], decoder_options);
status = page_decoder.init();
page_decoder.set_dict_decoder(dict_page_decoder.get());
ASSERT_TRUE(status.ok());

//check values
Expand Down
2 changes: 1 addition & 1 deletion be/test/olap/rowset/segment_v2/binary_plain_page_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class BinaryPlainPageTest : public testing::Test {
PageDecoderType page_decoder(s, decoder_options);
Status status = page_decoder.init();
ASSERT_TRUE(status.ok());

//test1

size_t size = 3;
Expand Down
Loading