Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ bool MemTable::need_agg() const {
return false;
}

Status MemTable::to_block(std::unique_ptr<vectorized::Block>* res) {
Status MemTable::_to_block(std::unique_ptr<vectorized::Block>* res) {
size_t same_keys_num = _sort();
if (_keys_type == KeysType::DUP_KEYS || same_keys_num == 0) {
if (_keys_type == KeysType::DUP_KEYS && _tablet_schema->num_key_columns() == 0) {
Expand All @@ -529,4 +529,9 @@ Status MemTable::to_block(std::unique_ptr<vectorized::Block>* res) {
return Status::OK();
}

Status MemTable::to_block(std::unique_ptr<vectorized::Block>* res) {
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_to_block(res));
return Status::OK();
}

} // namespace doris
3 changes: 3 additions & 0 deletions be/src/olap/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ class MemTable {
void _aggregate_two_row_in_block(vectorized::MutableBlock& mutable_block, RowInBlock* new_row,
RowInBlock* row_in_skiplist);

// Used to wrapped by to_block to do exception handle logic
Status _to_block(std::unique_ptr<vectorized::Block>* res);

private:
int64_t _tablet_id;
bool _enable_unique_key_mow = false;
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ Status SegmentFlusher::flush_single_block(const vectorized::Block* block, int32_
return Status::OK();
}

Status SegmentFlusher::_parse_variant_columns(vectorized::Block& block) {
Status SegmentFlusher::_internal_parse_variant_columns(vectorized::Block& block) {
size_t num_rows = block.rows();
if (num_rows == 0) {
return Status::OK();
Expand Down
6 changes: 5 additions & 1 deletion be/src/olap/rowset/segment_creator.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,11 @@ class SegmentFlusher {
bool need_buffering();

private:
Status _parse_variant_columns(vectorized::Block& block);
// This method will catch exception when allocate memory failed
Status _parse_variant_columns(vectorized::Block& block) {
RETURN_IF_CATCH_EXCEPTION({ return _internal_parse_variant_columns(block); });
}
Status _internal_parse_variant_columns(vectorized::Block& block);
Status _add_rows(std::unique_ptr<segment_v2::SegmentWriter>& segment_writer,
const vectorized::Block* block, size_t row_offset, size_t row_num);
Status _add_rows(std::unique_ptr<segment_v2::VerticalSegmentWriter>& segment_writer,
Expand Down
14 changes: 8 additions & 6 deletions be/src/olap/rowset/segment_v2/binary_dict_page.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,19 +142,22 @@ Status BinaryDictPageBuilder::add(const uint8_t* vals, size_t* count) {
}
}

OwnedSlice BinaryDictPageBuilder::finish() {
Status BinaryDictPageBuilder::finish(OwnedSlice* slice) {
if (VLOG_DEBUG_IS_ON && _encoding_type == DICT_ENCODING) {
VLOG_DEBUG << "dict page size:" << _dict_builder->size();
}

DCHECK(!_finished);
_finished = true;

OwnedSlice data_slice = _data_page_builder->finish();
OwnedSlice data_slice;
RETURN_IF_ERROR(_data_page_builder->finish(&data_slice));
// TODO(gaodayue) separate page header and content to avoid this copy
_buffer.append(data_slice.slice().data, data_slice.slice().size);
RETURN_IF_CATCH_EXCEPTION(
{ _buffer.append(data_slice.slice().data, data_slice.slice().size); });
encode_fixed32_le(&_buffer[0], _encoding_type);
return _buffer.build();
*slice = _buffer.build();
return Status::OK();
}

Status BinaryDictPageBuilder::reset() {
Expand Down Expand Up @@ -183,8 +186,7 @@ uint64_t BinaryDictPageBuilder::size() const {
}

Status BinaryDictPageBuilder::get_dictionary_page(OwnedSlice* dictionary_page) {
*dictionary_page = _dict_builder->finish();
return Status::OK();
return _dict_builder->finish(dictionary_page);
}

Status BinaryDictPageBuilder::get_first_value(void* value) const {
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/binary_dict_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class BinaryDictPageBuilder : public PageBuilderHelper<BinaryDictPageBuilder> {

Status add(const uint8_t* vals, size_t* count) override;

OwnedSlice finish() override;
Status finish(OwnedSlice* slice) override;

Status reset() override;

Expand Down
25 changes: 14 additions & 11 deletions be/src/olap/rowset/segment_v2/binary_plain_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,22 @@ class BinaryPlainPageBuilder : public PageBuilderHelper<BinaryPlainPageBuilder<T
return Status::OK();
}

OwnedSlice finish() override {
Status finish(OwnedSlice* slice) override {
DCHECK(!_finished);
_finished = true;
// Set up trailer
for (uint32_t _offset : _offsets) {
put_fixed32_le(&_buffer, _offset);
}
put_fixed32_le(&_buffer, _offsets.size());
if (_offsets.size() > 0) {
_copy_value_at(0, &_first_value);
_copy_value_at(_offsets.size() - 1, &_last_value);
}
return _buffer.build();
RETURN_IF_CATCH_EXCEPTION({
// Set up trailer
for (uint32_t _offset : _offsets) {
put_fixed32_le(&_buffer, _offset);
}
put_fixed32_le(&_buffer, _offsets.size());
if (_offsets.size() > 0) {
_copy_value_at(0, &_first_value);
_copy_value_at(_offsets.size() - 1, &_last_value);
}
*slice = _buffer.build();
});
return Status::OK();
}

Status reset() override {
Expand Down
23 changes: 13 additions & 10 deletions be/src/olap/rowset/segment_v2/binary_prefix_page.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,21 @@ Status BinaryPrefixPageBuilder::add(const uint8_t* vals, size_t* add_count) {
return Status::OK();
}

OwnedSlice BinaryPrefixPageBuilder::finish() {
Status BinaryPrefixPageBuilder::finish(OwnedSlice* slice) {
DCHECK(!_finished);
_finished = true;
put_fixed32_le(&_buffer, (uint32_t)_count);
uint8_t restart_point_internal = RESTART_POINT_INTERVAL;
_buffer.append(&restart_point_internal, 1);
auto restart_point_size = _restart_points_offset.size();
for (uint32_t i = 0; i < restart_point_size; ++i) {
put_fixed32_le(&_buffer, _restart_points_offset[i]);
}
put_fixed32_le(&_buffer, restart_point_size);
return _buffer.build();
RETURN_IF_CATCH_EXCEPTION({
put_fixed32_le(&_buffer, (uint32_t)_count);
uint8_t restart_point_internal = RESTART_POINT_INTERVAL;
_buffer.append(&restart_point_internal, 1);
auto restart_point_size = _restart_points_offset.size();
for (uint32_t i = 0; i < restart_point_size; ++i) {
put_fixed32_le(&_buffer, _restart_points_offset[i]);
}
put_fixed32_le(&_buffer, restart_point_size);
*slice = _buffer.build();
});
return Status::OK();
}

const uint8_t* BinaryPrefixPageDecoder::_decode_value_lengths(const uint8_t* ptr, uint32_t* shared,
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/binary_prefix_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class BinaryPrefixPageBuilder : public PageBuilderHelper<BinaryPrefixPageBuilder

Status add(const uint8_t* vals, size_t* add_count) override;

OwnedSlice finish() override;
Status finish(OwnedSlice* slice) override;

Status reset() override {
_restart_points_offset.clear();
Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/rowset/segment_v2/bitshuffle_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,13 @@ class BitshufflePageBuilder : public PageBuilderHelper<BitshufflePageBuilder<Typ
return Status::OK();
}

OwnedSlice finish() override {
Status finish(OwnedSlice* slice) override {
if (_count > 0) {
_first_value = cell(0);
_last_value = cell(_count - 1);
}
return _finish(SIZE_OF_TYPE);
RETURN_IF_CATCH_EXCEPTION({ *slice = _finish(SIZE_OF_TYPE); });
return Status::OK();
}

Status reset() override {
Expand Down
10 changes: 6 additions & 4 deletions be/src/olap/rowset/segment_v2/column_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,10 @@ class NullBitmapBuilder {
// Returns whether the building nullmap contains nullptr
bool has_null() const { return _has_null; }

OwnedSlice finish() {
Status finish(OwnedSlice* slice) {
_rle_encoder.Flush();
return _bitmap_buf.build();
RETURN_IF_CATCH_EXCEPTION({ *slice = _bitmap_buf.build(); });
return Status::OK();
}

void reset() {
Expand Down Expand Up @@ -723,14 +724,15 @@ Status ScalarColumnWriter::finish_current_page() {

// build data page body : encoded values + [nullmap]
std::vector<Slice> body;
OwnedSlice encoded_values = _page_builder->finish();
OwnedSlice encoded_values;
RETURN_IF_ERROR(_page_builder->finish(&encoded_values));
RETURN_IF_ERROR(_page_builder->reset());
body.push_back(encoded_values.slice());

OwnedSlice nullmap;
if (_null_bitmap_builder != nullptr) {
if (is_nullable() && _null_bitmap_builder->has_null()) {
nullmap = _null_bitmap_builder->finish();
RETURN_IF_ERROR(_null_bitmap_builder->finish(&nullmap));
body.push_back(nullmap.slice());
}
_null_bitmap_builder->reset();
Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/rowset/segment_v2/frame_of_reference_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,12 @@ class FrameOfReferencePageBuilder : public PageBuilderHelper<FrameOfReferencePag
return Status::OK();
}

OwnedSlice finish() override {
Status finish(OwnedSlice* slice) override {
DCHECK(!_finished);
_finished = true;
_encoder->flush();
return _buf.build();
RETURN_IF_CATCH_EXCEPTION({ *slice = _buf.build(); });
return Status::OK();
}

Status reset() override {
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset/segment_v2/indexed_column_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ Status IndexedColumnWriter::_finish_current_data_page(size_t& num_val) {
ordinal_t first_ordinal = _num_values - num_values_in_page;

// IndexedColumn doesn't have NULLs, thus data page body only contains encoded values
OwnedSlice page_body = _data_page_builder->finish();
OwnedSlice page_body;
RETURN_IF_ERROR(_data_page_builder->finish(&page_body));
RETURN_IF_ERROR(_data_page_builder->reset());

PageFooterPB footer;
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset/segment_v2/page_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ class PageBuilder {

// Finish building the current page, return the encoded data.
// This api should be followed by reset() before reusing the builder
virtual OwnedSlice finish() = 0;
// It will return error status when memory allocated failed during finish
virtual Status finish(OwnedSlice* owned_slice) = 0;

// Get the dictionary page for dictionary encoding mode column.
virtual Status get_dictionary_page(OwnedSlice* dictionary_page) {
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/segment_v2/page_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ Status PageIO::write_page(io::FileWriter* writer, const std::vector<Slice>& body
return Status::OK();
}

Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle* handle,
Slice* body, PageFooterPB* footer) {
Status PageIO::read_and_decompress_page_(const PageReadOptions& opts, PageHandle* handle,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function 'read_and_decompress_page_' exceeds recommended size/complexity thresholds [readability-function-size]

Status PageIO::read_and_decompress_page_(const PageReadOptions& opts, PageHandle* handle,
               ^
Additional context

be/src/olap/rowset/segment_v2/page_io.cpp:113: 115 lines including whitespace and comments (threshold 80)

Status PageIO::read_and_decompress_page_(const PageReadOptions& opts, PageHandle* handle,
               ^

Slice* body, PageFooterPB* footer) {
opts.sanity_check();
opts.stats->total_pages_num++;

Expand Down
11 changes: 10 additions & 1 deletion be/src/olap/rowset/segment_v2/page_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,17 @@ class PageIO {
// `handle' holds the memory of page data,
// `body' points to page body,
// `footer' stores the page footer.
// This method is exception safe, it will failed when allocate memory failed.
static Status read_and_decompress_page(const PageReadOptions& opts, PageHandle* handle,
Slice* body, PageFooterPB* footer);
Slice* body, PageFooterPB* footer) {
RETURN_IF_CATCH_EXCEPTION(
{ return read_and_decompress_page_(opts, handle, body, footer); });
}

private:
// An internal method that not deal with exception.
static Status read_and_decompress_page_(const PageReadOptions& opts, PageHandle* handle,
Slice* body, PageFooterPB* footer);
};

} // namespace segment_v2
Expand Down
18 changes: 11 additions & 7 deletions be/src/olap/rowset/segment_v2/plain_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,18 @@ class PlainPageBuilder : public PageBuilderHelper<PlainPageBuilder<Type> > {
return Status::OK();
}

OwnedSlice finish() override {
Status finish(OwnedSlice* slice) override {
encode_fixed32_le((uint8_t*)&_buffer[0], _count);
if (_count > 0) {
_first_value.assign_copy(&_buffer[PLAIN_PAGE_HEADER_SIZE], SIZE_OF_TYPE);
_last_value.assign_copy(&_buffer[PLAIN_PAGE_HEADER_SIZE + (_count - 1) * SIZE_OF_TYPE],
SIZE_OF_TYPE);
}
return _buffer.build();
RETURN_IF_CATCH_EXCEPTION({
if (_count > 0) {
_first_value.assign_copy(&_buffer[PLAIN_PAGE_HEADER_SIZE], SIZE_OF_TYPE);
_last_value.assign_copy(
&_buffer[PLAIN_PAGE_HEADER_SIZE + (_count - 1) * SIZE_OF_TYPE],
SIZE_OF_TYPE);
}
*slice = _buffer.build();
});
return Status::OK();
}

Status reset() override {
Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/rowset/segment_v2/rle_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,15 @@ class RlePageBuilder : public PageBuilderHelper<RlePageBuilder<Type> > {
return Status::OK();
}

OwnedSlice finish() override {
Status finish(OwnedSlice* slice) override {
DCHECK(!_finished);
_finished = true;
// here should Flush first and then encode the count header
// or it will lead to a bug if the header is less than 8 byte and the data is small
_rle_encoder->Flush();
encode_fixed32_le(&_buf[0], _count);
return _buf.build();
*slice = _buf.build();
return Status::OK();
}

Status reset() override {
Expand Down
9 changes: 0 additions & 9 deletions be/src/runtime/runtime_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -463,15 +463,6 @@ Status RuntimeState::append_error_msg_to_file(std::function<std::string()> line,
return Status::OK();
}

int64_t RuntimeState::get_load_mem_limit() {
// TODO: the code is abandoned, it can be deleted after v1.3
if (_query_options.__isset.load_mem_limit && _query_options.load_mem_limit > 0) {
return _query_options.load_mem_limit;
} else {
return _query_mem_tracker->limit();
}
}

void RuntimeState::resize_op_id_to_local_state(int operator_size) {
_op_id_to_local_state.resize(-operator_size);
}
Expand Down
4 changes: 0 additions & 4 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -466,10 +466,6 @@ class RuntimeState {

std::vector<TErrorTabletInfo>& error_tablet_infos() { return _error_tablet_infos; }

// get mem limit for load channel
// if load mem limit is not set, or is zero, using query mem limit instead.
int64_t get_load_mem_limit();

// local runtime filter mgr, the runtime filter do not have remote target or
// not need local merge should regist here. the instance exec finish, the local
// runtime filter mgr can release the memory of local runtime filter
Expand Down
12 changes: 3 additions & 9 deletions be/src/vec/common/schema_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -545,15 +545,9 @@ Status _parse_variant_columns(Block& block, const std::vector<int>& variant_pos,

Status parse_variant_columns(Block& block, const std::vector<int>& variant_pos,
const ParseContext& ctx) {
try {
// Parse each variant column from raw string column
RETURN_IF_ERROR(vectorized::schema_util::_parse_variant_columns(block, variant_pos, ctx));
} catch (const doris::Exception& e) {
// TODO more graceful, max_filter_ratio
LOG(WARNING) << "encounter execption " << e.to_string();
return Status::InternalError(e.to_string());
}
return Status::OK();
// Parse each variant column from raw string column
RETURN_IF_CATCH_EXCEPTION(
{ return vectorized::schema_util::_parse_variant_columns(block, variant_pos, ctx); });
}

void finalize_variant_columns(Block& block, const std::vector<int>& variant_pos,
Expand Down
2 changes: 0 additions & 2 deletions be/src/vec/sink/writer/vtablet_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,6 @@ void VNodeChannel::_open_internal(bool is_incremental) {

request->set_num_senders(_parent->_num_senders);
request->set_need_gen_rollup(false); // Useless but it is a required field in pb
request->set_load_mem_limit(_parent->_load_mem_limit);
request->set_load_channel_timeout_s(_parent->_load_channel_timeout_s);
request->set_is_high_priority(_parent->_is_high_priority);
request->set_sender_ip(BackendOptions::get_localhost());
Expand Down Expand Up @@ -1245,7 +1244,6 @@ Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) {
_max_wait_exec_timer = ADD_TIMER(profile, "MaxWaitExecTime");
_add_batch_number = ADD_COUNTER(profile, "NumberBatchAdded", TUnit::UNIT);
_num_node_channels = ADD_COUNTER(profile, "NumberNodeChannels", TUnit::UNIT);
_load_mem_limit = state->get_load_mem_limit();

#ifdef DEBUG
// check: tablet ids should be unique
Expand Down
3 changes: 0 additions & 3 deletions be/src/vec/sink/writer/vtablet_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -660,9 +660,6 @@ class VTabletWriter final : public AsyncResultWriter {
RuntimeProfile::Counter* _add_batch_number = nullptr;
RuntimeProfile::Counter* _num_node_channels = nullptr;

// load mem limit is for remote load channel
int64_t _load_mem_limit = -1;

// the timeout of load channels opened by this tablet sink. in second
int64_t _load_channel_timeout_s = 0;

Expand Down
Loading