Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ DEFINE_mInt64(stacktrace_in_alloc_large_memory_bytes, "2147483648");

DEFINE_mInt64(crash_in_alloc_large_memory_bytes, "-1");

// default is true. if any memory tracking in Orphan mem tracker will report error.
// !! not modify the default value of this conf!! otherwise memory errors cannot be detected in time.
// allocator free memory not need to check, because when the thread memory tracker label is Orphan,
// use the tracker saved in Allocator.
DEFINE_mBool(enable_memory_orphan_check, "true");

// The maximum time a thread waits for full GC. Currently only query will wait for full gc.
Expand Down Expand Up @@ -1097,6 +1101,7 @@ DEFINE_mString(kerberos_krb5_conf_path, "/etc/krb5.conf");

DEFINE_mString(get_stack_trace_tool, "libunwind");
DEFINE_mString(dwarf_location_info_mode, "FAST");
DEFINE_mBool(enable_address_sanitizers_with_stack_trace, "false");

// the ratio of _prefetch_size/_batch_size in AutoIncIDBuffer
DEFINE_mInt64(auto_inc_prefetch_size_ratio, "10");
Expand Down
4 changes: 4 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,9 @@ DECLARE_mInt64(stacktrace_in_alloc_large_memory_bytes);
DECLARE_mInt64(crash_in_alloc_large_memory_bytes);

// default is true. if any memory tracking in Orphan mem tracker will report error.
// !! not modify the default value of this conf!! otherwise memory errors cannot be detected in time.
// allocator free memory not need to check, because when the thread memory tracker label is Orphan,
// use the tracker saved in Allocator.
DECLARE_mBool(enable_memory_orphan_check);

// The maximum time a thread waits for a full GC. Currently only query will wait for full gc.
Expand Down Expand Up @@ -1149,6 +1152,7 @@ DECLARE_mString(kerberos_krb5_conf_path);

// Values include `none`, `glog`, `boost`, `glibc`, `libunwind`
DECLARE_mString(get_stack_trace_tool);
DECLARE_mBool(enable_address_sanitizers_with_stack_trace);

// DISABLED: Don't resolve location info.
// FAST: Perform CU lookup using .debug_aranges (might be incomplete).
Expand Down
23 changes: 17 additions & 6 deletions be/src/http/action/http_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,26 +234,37 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) {
struct evhttp_request* ev_req = req->get_evhttp_request();
auto evbuf = evhttp_request_get_input_buffer(ev_req);

SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());

int64_t start_read_data_time = MonotonicNanos();
Status st = ctx->allocate_schema_buffer();
if (!st.ok()) {
ctx->status = st;
return;
}
while (evbuffer_get_length(evbuf) > 0) {
auto bb = ByteBuffer::allocate(128 * 1024);
ByteBufferPtr bb;
st = ByteBuffer::allocate(128 * 1024, &bb);
if (!st.ok()) {
ctx->status = st;
return;
}
auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
bb->pos = remove_bytes;
bb->flip();
auto st = ctx->body_sink->append(bb);
st = ctx->body_sink->append(bb);
// schema_buffer stores 1M of data for parsing column information
// need to determine whether to cache for the first time
if (ctx->is_read_schema) {
if (ctx->schema_buffer->pos + remove_bytes < config::stream_tvf_buffer_size) {
ctx->schema_buffer->put_bytes(bb->ptr, remove_bytes);
if (ctx->schema_buffer()->pos + remove_bytes < config::stream_tvf_buffer_size) {
ctx->schema_buffer()->put_bytes(bb->ptr, remove_bytes);
} else {
LOG(INFO) << "use a portion of data to request fe to obtain column information";
ctx->is_read_schema = false;
ctx->status = process_put(req, ctx);
}
}

if (!st.ok() && !ctx->status.ok()) {
if (!st.ok()) {
LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief();
ctx->status = st;
return;
Expand Down
11 changes: 9 additions & 2 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,13 +339,20 @@ void StreamLoadAction::on_chunk_data(HttpRequest* req) {
struct evhttp_request* ev_req = req->get_evhttp_request();
auto evbuf = evhttp_request_get_input_buffer(ev_req);

SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());

int64_t start_read_data_time = MonotonicNanos();
while (evbuffer_get_length(evbuf) > 0) {
auto bb = ByteBuffer::allocate(128 * 1024);
ByteBufferPtr bb;
Status st = ByteBuffer::allocate(128 * 1024, &bb);
if (!st.ok()) {
ctx->status = st;
return;
}
auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
bb->pos = remove_bytes;
bb->flip();
auto st = ctx->body_sink->append(bb);
st = ctx->body_sink->append(bb);
if (!st.ok()) {
LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief();
ctx->status = st;
Expand Down
9 changes: 5 additions & 4 deletions be/src/io/file_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,14 @@ Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderS
if (!stream_load_ctx) {
return Status::InternalError("unknown stream load id: {}", UniqueId(load_id).to_string());
}
if (need_schema == true) {
if (need_schema) {
RETURN_IF_ERROR(stream_load_ctx->allocate_schema_buffer());
// Here, a portion of the data is processed to parse column information
auto pipe = std::make_shared<io::StreamLoadPipe>(
io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */,
stream_load_ctx->schema_buffer->pos /* total_length */);
stream_load_ctx->schema_buffer->flip();
RETURN_IF_ERROR(pipe->append(stream_load_ctx->schema_buffer));
stream_load_ctx->schema_buffer()->pos /* total_length */);
stream_load_ctx->schema_buffer()->flip();
RETURN_IF_ERROR(pipe->append(stream_load_ctx->schema_buffer()));
RETURN_IF_ERROR(pipe->finish());
*file_reader = std::move(pipe);
} else {
Expand Down
7 changes: 5 additions & 2 deletions be/src/io/fs/stream_load_pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ Status StreamLoadPipe::read_one_message(std::unique_ptr<uint8_t[]>* data, size_t
}

Status StreamLoadPipe::append_and_flush(const char* data, size_t size, size_t proto_byte_size) {
ByteBufferPtr buf = ByteBuffer::allocate(BitUtil::RoundUpToPowerOfTwo(size + 1));
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());
ByteBufferPtr buf;
RETURN_IF_ERROR(ByteBuffer::allocate(128 * 1024, &buf));
buf->put_bytes(data, size);
buf->flip();
return _append(buf, proto_byte_size);
Expand Down Expand Up @@ -145,7 +147,8 @@ Status StreamLoadPipe::append(const char* data, size_t size) {
// need to allocate a new chunk, min chunk is 64k
size_t chunk_size = std::max(_min_chunk_size, size - pos);
chunk_size = BitUtil::RoundUpToPowerOfTwo(chunk_size);
_write_buf = ByteBuffer::allocate(chunk_size);
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());
RETURN_IF_ERROR(ByteBuffer::allocate(chunk_size, &_write_buf));
_write_buf->put_bytes(data + pos, size - pos);
return Status::OK();
}
Expand Down
7 changes: 6 additions & 1 deletion be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ bool MemTable::need_agg() const {
return false;
}

Status MemTable::to_block(std::unique_ptr<vectorized::Block>* res) {
Status MemTable::_to_block(std::unique_ptr<vectorized::Block>* res) {
size_t same_keys_num = _sort();
if (_keys_type == KeysType::DUP_KEYS || same_keys_num == 0) {
if (_keys_type == KeysType::DUP_KEYS && _tablet_schema->num_key_columns() == 0) {
Expand All @@ -529,4 +529,9 @@ Status MemTable::to_block(std::unique_ptr<vectorized::Block>* res) {
return Status::OK();
}

Status MemTable::to_block(std::unique_ptr<vectorized::Block>* res) {
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_to_block(res));
return Status::OK();
}

} // namespace doris
3 changes: 3 additions & 0 deletions be/src/olap/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ class MemTable {
void _aggregate_two_row_in_block(vectorized::MutableBlock& mutable_block, RowInBlock* new_row,
RowInBlock* row_in_skiplist);

// Used to wrapped by to_block to do exception handle logic
Status _to_block(std::unique_ptr<vectorized::Block>* res);

private:
int64_t _tablet_id;
bool _enable_unique_key_mow = false;
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ Status SegmentFlusher::flush_single_block(const vectorized::Block* block, int32_
return Status::OK();
}

Status SegmentFlusher::_parse_variant_columns(vectorized::Block& block) {
Status SegmentFlusher::_internal_parse_variant_columns(vectorized::Block& block) {
size_t num_rows = block.rows();
if (num_rows == 0) {
return Status::OK();
Expand Down
6 changes: 5 additions & 1 deletion be/src/olap/rowset/segment_creator.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,11 @@ class SegmentFlusher {
bool need_buffering();

private:
Status _parse_variant_columns(vectorized::Block& block);
// This method will catch exception when allocate memory failed
Status _parse_variant_columns(vectorized::Block& block) {
RETURN_IF_CATCH_EXCEPTION({ return _internal_parse_variant_columns(block); });
}
Status _internal_parse_variant_columns(vectorized::Block& block);
Status _add_rows(std::unique_ptr<segment_v2::SegmentWriter>& segment_writer,
const vectorized::Block* block, size_t row_offset, size_t row_num);
Status _add_rows(std::unique_ptr<segment_v2::VerticalSegmentWriter>& segment_writer,
Expand Down
14 changes: 8 additions & 6 deletions be/src/olap/rowset/segment_v2/binary_dict_page.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,19 +142,22 @@ Status BinaryDictPageBuilder::add(const uint8_t* vals, size_t* count) {
}
}

OwnedSlice BinaryDictPageBuilder::finish() {
Status BinaryDictPageBuilder::finish(OwnedSlice* slice) {
if (VLOG_DEBUG_IS_ON && _encoding_type == DICT_ENCODING) {
VLOG_DEBUG << "dict page size:" << _dict_builder->size();
}

DCHECK(!_finished);
_finished = true;

OwnedSlice data_slice = _data_page_builder->finish();
OwnedSlice data_slice;
RETURN_IF_ERROR(_data_page_builder->finish(&data_slice));
// TODO(gaodayue) separate page header and content to avoid this copy
_buffer.append(data_slice.slice().data, data_slice.slice().size);
RETURN_IF_CATCH_EXCEPTION(
{ _buffer.append(data_slice.slice().data, data_slice.slice().size); });
encode_fixed32_le(&_buffer[0], _encoding_type);
return _buffer.build();
*slice = _buffer.build();
return Status::OK();
}

Status BinaryDictPageBuilder::reset() {
Expand Down Expand Up @@ -183,8 +186,7 @@ uint64_t BinaryDictPageBuilder::size() const {
}

Status BinaryDictPageBuilder::get_dictionary_page(OwnedSlice* dictionary_page) {
*dictionary_page = _dict_builder->finish();
return Status::OK();
return _dict_builder->finish(dictionary_page);
}

Status BinaryDictPageBuilder::get_first_value(void* value) const {
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/binary_dict_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class BinaryDictPageBuilder : public PageBuilderHelper<BinaryDictPageBuilder> {

Status add(const uint8_t* vals, size_t* count) override;

OwnedSlice finish() override;
Status finish(OwnedSlice* slice) override;

Status reset() override;

Expand Down
25 changes: 14 additions & 11 deletions be/src/olap/rowset/segment_v2/binary_plain_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,22 @@ class BinaryPlainPageBuilder : public PageBuilderHelper<BinaryPlainPageBuilder<T
return Status::OK();
}

OwnedSlice finish() override {
Status finish(OwnedSlice* slice) override {
DCHECK(!_finished);
_finished = true;
// Set up trailer
for (uint32_t _offset : _offsets) {
put_fixed32_le(&_buffer, _offset);
}
put_fixed32_le(&_buffer, _offsets.size());
if (_offsets.size() > 0) {
_copy_value_at(0, &_first_value);
_copy_value_at(_offsets.size() - 1, &_last_value);
}
return _buffer.build();
RETURN_IF_CATCH_EXCEPTION({
// Set up trailer
for (uint32_t _offset : _offsets) {
put_fixed32_le(&_buffer, _offset);
}
put_fixed32_le(&_buffer, _offsets.size());
if (_offsets.size() > 0) {
_copy_value_at(0, &_first_value);
_copy_value_at(_offsets.size() - 1, &_last_value);
}
*slice = _buffer.build();
});
return Status::OK();
}

Status reset() override {
Expand Down
23 changes: 13 additions & 10 deletions be/src/olap/rowset/segment_v2/binary_prefix_page.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,21 @@ Status BinaryPrefixPageBuilder::add(const uint8_t* vals, size_t* add_count) {
return Status::OK();
}

OwnedSlice BinaryPrefixPageBuilder::finish() {
Status BinaryPrefixPageBuilder::finish(OwnedSlice* slice) {
DCHECK(!_finished);
_finished = true;
put_fixed32_le(&_buffer, (uint32_t)_count);
uint8_t restart_point_internal = RESTART_POINT_INTERVAL;
_buffer.append(&restart_point_internal, 1);
auto restart_point_size = _restart_points_offset.size();
for (uint32_t i = 0; i < restart_point_size; ++i) {
put_fixed32_le(&_buffer, _restart_points_offset[i]);
}
put_fixed32_le(&_buffer, restart_point_size);
return _buffer.build();
RETURN_IF_CATCH_EXCEPTION({
put_fixed32_le(&_buffer, (uint32_t)_count);
uint8_t restart_point_internal = RESTART_POINT_INTERVAL;
_buffer.append(&restart_point_internal, 1);
auto restart_point_size = _restart_points_offset.size();
for (uint32_t i = 0; i < restart_point_size; ++i) {
put_fixed32_le(&_buffer, _restart_points_offset[i]);
}
put_fixed32_le(&_buffer, restart_point_size);
*slice = _buffer.build();
});
return Status::OK();
}

const uint8_t* BinaryPrefixPageDecoder::_decode_value_lengths(const uint8_t* ptr, uint32_t* shared,
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/binary_prefix_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class BinaryPrefixPageBuilder : public PageBuilderHelper<BinaryPrefixPageBuilder

Status add(const uint8_t* vals, size_t* add_count) override;

OwnedSlice finish() override;
Status finish(OwnedSlice* slice) override;

Status reset() override {
_restart_points_offset.clear();
Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/rowset/segment_v2/bitshuffle_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,13 @@ class BitshufflePageBuilder : public PageBuilderHelper<BitshufflePageBuilder<Typ
return Status::OK();
}

OwnedSlice finish() override {
Status finish(OwnedSlice* slice) override {
if (_count > 0) {
_first_value = cell(0);
_last_value = cell(_count - 1);
}
return _finish(SIZE_OF_TYPE);
RETURN_IF_CATCH_EXCEPTION({ *slice = _finish(SIZE_OF_TYPE); });
return Status::OK();
}

Status reset() override {
Expand Down
10 changes: 6 additions & 4 deletions be/src/olap/rowset/segment_v2/column_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,10 @@ class NullBitmapBuilder {
// Returns whether the building nullmap contains nullptr
bool has_null() const { return _has_null; }

OwnedSlice finish() {
Status finish(OwnedSlice* slice) {
_rle_encoder.Flush();
return _bitmap_buf.build();
RETURN_IF_CATCH_EXCEPTION({ *slice = _bitmap_buf.build(); });
return Status::OK();
}

void reset() {
Expand Down Expand Up @@ -723,14 +724,15 @@ Status ScalarColumnWriter::finish_current_page() {

// build data page body : encoded values + [nullmap]
std::vector<Slice> body;
OwnedSlice encoded_values = _page_builder->finish();
OwnedSlice encoded_values;
RETURN_IF_ERROR(_page_builder->finish(&encoded_values));
RETURN_IF_ERROR(_page_builder->reset());
body.push_back(encoded_values.slice());

OwnedSlice nullmap;
if (_null_bitmap_builder != nullptr) {
if (is_nullable() && _null_bitmap_builder->has_null()) {
nullmap = _null_bitmap_builder->finish();
RETURN_IF_ERROR(_null_bitmap_builder->finish(&nullmap));
body.push_back(nullmap.slice());
}
_null_bitmap_builder->reset();
Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/rowset/segment_v2/frame_of_reference_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,12 @@ class FrameOfReferencePageBuilder : public PageBuilderHelper<FrameOfReferencePag
return Status::OK();
}

OwnedSlice finish() override {
Status finish(OwnedSlice* slice) override {
DCHECK(!_finished);
_finished = true;
_encoder->flush();
return _buf.build();
RETURN_IF_CATCH_EXCEPTION({ *slice = _buf.build(); });
return Status::OK();
}

Status reset() override {
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset/segment_v2/indexed_column_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ Status IndexedColumnWriter::_finish_current_data_page(size_t& num_val) {
ordinal_t first_ordinal = _num_values - num_values_in_page;

// IndexedColumn doesn't have NULLs, thus data page body only contains encoded values
OwnedSlice page_body = _data_page_builder->finish();
OwnedSlice page_body;
RETURN_IF_ERROR(_data_page_builder->finish(&page_body));
RETURN_IF_ERROR(_data_page_builder->reset());

PageFooterPB footer;
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset/segment_v2/page_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ class PageBuilder {

// Finish building the current page, return the encoded data.
// This api should be followed by reset() before reusing the builder
virtual OwnedSlice finish() = 0;
// It will return error status when memory allocated failed during finish
virtual Status finish(OwnedSlice* owned_slice) = 0;

// Get the dictionary page for dictionary encoding mode column.
virtual Status get_dictionary_page(OwnedSlice* dictionary_page) {
Expand Down
Loading