diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 61c0ecdea2f823..021d7a9144b277 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -137,6 +137,10 @@ DEFINE_mInt64(stacktrace_in_alloc_large_memory_bytes, "2147483648"); DEFINE_mInt64(crash_in_alloc_large_memory_bytes, "-1"); +// default is true. if any memory tracking in Orphan mem tracker will report error. +// !! not modify the default value of this conf!! otherwise memory errors cannot be detected in time. +// allocator free memory not need to check, because when the thread memory tracker label is Orphan, +// use the tracker saved in Allocator. DEFINE_mBool(enable_memory_orphan_check, "true"); // The maximum time a thread waits for full GC. Currently only query will wait for full gc. @@ -1097,6 +1101,7 @@ DEFINE_mString(kerberos_krb5_conf_path, "/etc/krb5.conf"); DEFINE_mString(get_stack_trace_tool, "libunwind"); DEFINE_mString(dwarf_location_info_mode, "FAST"); +DEFINE_mBool(enable_address_sanitizers_with_stack_trace, "false"); // the ratio of _prefetch_size/_batch_size in AutoIncIDBuffer DEFINE_mInt64(auto_inc_prefetch_size_ratio, "10"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 4b39be5f4c48fd..b1600ed5af38c0 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -191,6 +191,9 @@ DECLARE_mInt64(stacktrace_in_alloc_large_memory_bytes); DECLARE_mInt64(crash_in_alloc_large_memory_bytes); // default is true. if any memory tracking in Orphan mem tracker will report error. +// !! not modify the default value of this conf!! otherwise memory errors cannot be detected in time. +// allocator free memory not need to check, because when the thread memory tracker label is Orphan, +// use the tracker saved in Allocator. DECLARE_mBool(enable_memory_orphan_check); // The maximum time a thread waits for a full GC. Currently only query will wait for full gc. @@ -1149,6 +1152,7 @@ DECLARE_mString(kerberos_krb5_conf_path); // Values include `none`, `glog`, `boost`, `glibc`, `libunwind` DECLARE_mString(get_stack_trace_tool); +DECLARE_mBool(enable_address_sanitizers_with_stack_trace); // DISABLED: Don't resolve location info. // FAST: Perform CU lookup using .debug_aranges (might be incomplete). diff --git a/be/src/http/action/http_stream.cpp b/be/src/http/action/http_stream.cpp index 7dd85653002e37..83ce0ce82cc638 100644 --- a/be/src/http/action/http_stream.cpp +++ b/be/src/http/action/http_stream.cpp @@ -234,26 +234,37 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) { struct evhttp_request* ev_req = req->get_evhttp_request(); auto evbuf = evhttp_request_get_input_buffer(ev_req); + SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker()); + int64_t start_read_data_time = MonotonicNanos(); + Status st = ctx->allocate_schema_buffer(); + if (!st.ok()) { + ctx->status = st; + return; + } while (evbuffer_get_length(evbuf) > 0) { - auto bb = ByteBuffer::allocate(128 * 1024); + ByteBufferPtr bb; + st = ByteBuffer::allocate(128 * 1024, &bb); + if (!st.ok()) { + ctx->status = st; + return; + } auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity); bb->pos = remove_bytes; bb->flip(); - auto st = ctx->body_sink->append(bb); + st = ctx->body_sink->append(bb); // schema_buffer stores 1M of data for parsing column information // need to determine whether to cache for the first time if (ctx->is_read_schema) { - if (ctx->schema_buffer->pos + remove_bytes < config::stream_tvf_buffer_size) { - ctx->schema_buffer->put_bytes(bb->ptr, remove_bytes); + if (ctx->schema_buffer()->pos + remove_bytes < config::stream_tvf_buffer_size) { + ctx->schema_buffer()->put_bytes(bb->ptr, remove_bytes); } else { LOG(INFO) << "use a portion of data to request fe to obtain column information"; ctx->is_read_schema = false; ctx->status = process_put(req, ctx); } } - - if (!st.ok() && !ctx->status.ok()) { + if (!st.ok()) { LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief(); ctx->status = st; return; diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 3f32655cf14027..2036043b4d40e4 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -339,13 +339,20 @@ void StreamLoadAction::on_chunk_data(HttpRequest* req) { struct evhttp_request* ev_req = req->get_evhttp_request(); auto evbuf = evhttp_request_get_input_buffer(ev_req); + SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker()); + int64_t start_read_data_time = MonotonicNanos(); while (evbuffer_get_length(evbuf) > 0) { - auto bb = ByteBuffer::allocate(128 * 1024); + ByteBufferPtr bb; + Status st = ByteBuffer::allocate(128 * 1024, &bb); + if (!st.ok()) { + ctx->status = st; + return; + } auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity); bb->pos = remove_bytes; bb->flip(); - auto st = ctx->body_sink->append(bb); + st = ctx->body_sink->append(bb); if (!st.ok()) { LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief(); ctx->status = st; diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp index 4d6158f8f7e96d..95d537320883e8 100644 --- a/be/src/io/file_factory.cpp +++ b/be/src/io/file_factory.cpp @@ -161,13 +161,14 @@ Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderS if (!stream_load_ctx) { return Status::InternalError("unknown stream load id: {}", UniqueId(load_id).to_string()); } - if (need_schema == true) { + if (need_schema) { + RETURN_IF_ERROR(stream_load_ctx->allocate_schema_buffer()); // Here, a portion of the data is processed to parse column information auto pipe = std::make_shared( io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */, - stream_load_ctx->schema_buffer->pos /* total_length */); - stream_load_ctx->schema_buffer->flip(); - RETURN_IF_ERROR(pipe->append(stream_load_ctx->schema_buffer)); + stream_load_ctx->schema_buffer()->pos /* total_length */); + stream_load_ctx->schema_buffer()->flip(); + RETURN_IF_ERROR(pipe->append(stream_load_ctx->schema_buffer())); RETURN_IF_ERROR(pipe->finish()); *file_reader = std::move(pipe); } else { diff --git a/be/src/io/fs/stream_load_pipe.cpp b/be/src/io/fs/stream_load_pipe.cpp index ecce306bdf1ad2..392125e6fc0b86 100644 --- a/be/src/io/fs/stream_load_pipe.cpp +++ b/be/src/io/fs/stream_load_pipe.cpp @@ -111,7 +111,9 @@ Status StreamLoadPipe::read_one_message(std::unique_ptr* data, size_t } Status StreamLoadPipe::append_and_flush(const char* data, size_t size, size_t proto_byte_size) { - ByteBufferPtr buf = ByteBuffer::allocate(BitUtil::RoundUpToPowerOfTwo(size + 1)); + SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker()); + ByteBufferPtr buf; + RETURN_IF_ERROR(ByteBuffer::allocate(128 * 1024, &buf)); buf->put_bytes(data, size); buf->flip(); return _append(buf, proto_byte_size); @@ -145,7 +147,8 @@ Status StreamLoadPipe::append(const char* data, size_t size) { // need to allocate a new chunk, min chunk is 64k size_t chunk_size = std::max(_min_chunk_size, size - pos); chunk_size = BitUtil::RoundUpToPowerOfTwo(chunk_size); - _write_buf = ByteBuffer::allocate(chunk_size); + SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker()); + RETURN_IF_ERROR(ByteBuffer::allocate(chunk_size, &_write_buf)); _write_buf->put_bytes(data + pos, size - pos); return Status::OK(); } diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 923849162db331..a4df4b8f6742aa 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -505,7 +505,7 @@ bool MemTable::need_agg() const { return false; } -Status MemTable::to_block(std::unique_ptr* res) { +Status MemTable::_to_block(std::unique_ptr* res) { size_t same_keys_num = _sort(); if (_keys_type == KeysType::DUP_KEYS || same_keys_num == 0) { if (_keys_type == KeysType::DUP_KEYS && _tablet_schema->num_key_columns() == 0) { @@ -529,4 +529,9 @@ Status MemTable::to_block(std::unique_ptr* res) { return Status::OK(); } +Status MemTable::to_block(std::unique_ptr* res) { + RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_to_block(res)); + return Status::OK(); +} + } // namespace doris diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index 916067ba1193d2..70f7a9f22a0aa8 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -205,6 +205,9 @@ class MemTable { void _aggregate_two_row_in_block(vectorized::MutableBlock& mutable_block, RowInBlock* new_row, RowInBlock* row_in_skiplist); + // Used to wrapped by to_block to do exception handle logic + Status _to_block(std::unique_ptr* res); + private: int64_t _tablet_id; bool _enable_unique_key_mow = false; diff --git a/be/src/olap/rowset/segment_creator.cpp b/be/src/olap/rowset/segment_creator.cpp index 641b32535561cc..bf10ff3f1ed880 100644 --- a/be/src/olap/rowset/segment_creator.cpp +++ b/be/src/olap/rowset/segment_creator.cpp @@ -85,7 +85,7 @@ Status SegmentFlusher::flush_single_block(const vectorized::Block* block, int32_ return Status::OK(); } -Status SegmentFlusher::_parse_variant_columns(vectorized::Block& block) { +Status SegmentFlusher::_internal_parse_variant_columns(vectorized::Block& block) { size_t num_rows = block.rows(); if (num_rows == 0) { return Status::OK(); diff --git a/be/src/olap/rowset/segment_creator.h b/be/src/olap/rowset/segment_creator.h index 93508e9629ddbb..7fa69b2c57c718 100644 --- a/be/src/olap/rowset/segment_creator.h +++ b/be/src/olap/rowset/segment_creator.h @@ -138,7 +138,11 @@ class SegmentFlusher { bool need_buffering(); private: - Status _parse_variant_columns(vectorized::Block& block); + // This method will catch exception when allocate memory failed + Status _parse_variant_columns(vectorized::Block& block) { + RETURN_IF_CATCH_EXCEPTION({ return _internal_parse_variant_columns(block); }); + } + Status _internal_parse_variant_columns(vectorized::Block& block); Status _add_rows(std::unique_ptr& segment_writer, const vectorized::Block* block, size_t row_offset, size_t row_num); Status _add_rows(std::unique_ptr& segment_writer, diff --git a/be/src/olap/rowset/segment_v2/binary_dict_page.cpp b/be/src/olap/rowset/segment_v2/binary_dict_page.cpp index 52795f0338a79f..8270adfbde8bf1 100644 --- a/be/src/olap/rowset/segment_v2/binary_dict_page.cpp +++ b/be/src/olap/rowset/segment_v2/binary_dict_page.cpp @@ -142,7 +142,7 @@ Status BinaryDictPageBuilder::add(const uint8_t* vals, size_t* count) { } } -OwnedSlice BinaryDictPageBuilder::finish() { +Status BinaryDictPageBuilder::finish(OwnedSlice* slice) { if (VLOG_DEBUG_IS_ON && _encoding_type == DICT_ENCODING) { VLOG_DEBUG << "dict page size:" << _dict_builder->size(); } @@ -150,11 +150,14 @@ OwnedSlice BinaryDictPageBuilder::finish() { DCHECK(!_finished); _finished = true; - OwnedSlice data_slice = _data_page_builder->finish(); + OwnedSlice data_slice; + RETURN_IF_ERROR(_data_page_builder->finish(&data_slice)); // TODO(gaodayue) separate page header and content to avoid this copy - _buffer.append(data_slice.slice().data, data_slice.slice().size); + RETURN_IF_CATCH_EXCEPTION( + { _buffer.append(data_slice.slice().data, data_slice.slice().size); }); encode_fixed32_le(&_buffer[0], _encoding_type); - return _buffer.build(); + *slice = _buffer.build(); + return Status::OK(); } Status BinaryDictPageBuilder::reset() { @@ -183,8 +186,7 @@ uint64_t BinaryDictPageBuilder::size() const { } Status BinaryDictPageBuilder::get_dictionary_page(OwnedSlice* dictionary_page) { - *dictionary_page = _dict_builder->finish(); - return Status::OK(); + return _dict_builder->finish(dictionary_page); } Status BinaryDictPageBuilder::get_first_value(void* value) const { diff --git a/be/src/olap/rowset/segment_v2/binary_dict_page.h b/be/src/olap/rowset/segment_v2/binary_dict_page.h index 2a8467e7def516..d069eb9f7edc98 100644 --- a/be/src/olap/rowset/segment_v2/binary_dict_page.h +++ b/be/src/olap/rowset/segment_v2/binary_dict_page.h @@ -68,7 +68,7 @@ class BinaryDictPageBuilder : public PageBuilderHelper { Status add(const uint8_t* vals, size_t* count) override; - OwnedSlice finish() override; + Status finish(OwnedSlice* slice) override; Status reset() override; diff --git a/be/src/olap/rowset/segment_v2/binary_plain_page.h b/be/src/olap/rowset/segment_v2/binary_plain_page.h index 3fe76c5d3aee84..69d79fbcc5b3e3 100644 --- a/be/src/olap/rowset/segment_v2/binary_plain_page.h +++ b/be/src/olap/rowset/segment_v2/binary_plain_page.h @@ -93,19 +93,22 @@ class BinaryPlainPageBuilder : public PageBuilderHelper 0) { - _copy_value_at(0, &_first_value); - _copy_value_at(_offsets.size() - 1, &_last_value); - } - return _buffer.build(); + RETURN_IF_CATCH_EXCEPTION({ + // Set up trailer + for (uint32_t _offset : _offsets) { + put_fixed32_le(&_buffer, _offset); + } + put_fixed32_le(&_buffer, _offsets.size()); + if (_offsets.size() > 0) { + _copy_value_at(0, &_first_value); + _copy_value_at(_offsets.size() - 1, &_last_value); + } + *slice = _buffer.build(); + }); + return Status::OK(); } Status reset() override { diff --git a/be/src/olap/rowset/segment_v2/binary_prefix_page.cpp b/be/src/olap/rowset/segment_v2/binary_prefix_page.cpp index 9d1ecdb9470778..34eb14951aeb32 100644 --- a/be/src/olap/rowset/segment_v2/binary_prefix_page.cpp +++ b/be/src/olap/rowset/segment_v2/binary_prefix_page.cpp @@ -88,18 +88,21 @@ Status BinaryPrefixPageBuilder::add(const uint8_t* vals, size_t* add_count) { return Status::OK(); } -OwnedSlice BinaryPrefixPageBuilder::finish() { +Status BinaryPrefixPageBuilder::finish(OwnedSlice* slice) { DCHECK(!_finished); _finished = true; - put_fixed32_le(&_buffer, (uint32_t)_count); - uint8_t restart_point_internal = RESTART_POINT_INTERVAL; - _buffer.append(&restart_point_internal, 1); - auto restart_point_size = _restart_points_offset.size(); - for (uint32_t i = 0; i < restart_point_size; ++i) { - put_fixed32_le(&_buffer, _restart_points_offset[i]); - } - put_fixed32_le(&_buffer, restart_point_size); - return _buffer.build(); + RETURN_IF_CATCH_EXCEPTION({ + put_fixed32_le(&_buffer, (uint32_t)_count); + uint8_t restart_point_internal = RESTART_POINT_INTERVAL; + _buffer.append(&restart_point_internal, 1); + auto restart_point_size = _restart_points_offset.size(); + for (uint32_t i = 0; i < restart_point_size; ++i) { + put_fixed32_le(&_buffer, _restart_points_offset[i]); + } + put_fixed32_le(&_buffer, restart_point_size); + *slice = _buffer.build(); + }); + return Status::OK(); } const uint8_t* BinaryPrefixPageDecoder::_decode_value_lengths(const uint8_t* ptr, uint32_t* shared, diff --git a/be/src/olap/rowset/segment_v2/binary_prefix_page.h b/be/src/olap/rowset/segment_v2/binary_prefix_page.h index de4ec60070bad6..41deb4e6c1fe1a 100644 --- a/be/src/olap/rowset/segment_v2/binary_prefix_page.h +++ b/be/src/olap/rowset/segment_v2/binary_prefix_page.h @@ -52,7 +52,7 @@ class BinaryPrefixPageBuilder : public PageBuilderHelper 0) { _first_value = cell(0); _last_value = cell(_count - 1); } - return _finish(SIZE_OF_TYPE); + RETURN_IF_CATCH_EXCEPTION({ *slice = _finish(SIZE_OF_TYPE); }); + return Status::OK(); } Status reset() override { diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp b/be/src/olap/rowset/segment_v2/column_writer.cpp index e463b883fd206d..bdbfcdc2d41621 100644 --- a/be/src/olap/rowset/segment_v2/column_writer.cpp +++ b/be/src/olap/rowset/segment_v2/column_writer.cpp @@ -70,9 +70,10 @@ class NullBitmapBuilder { // Returns whether the building nullmap contains nullptr bool has_null() const { return _has_null; } - OwnedSlice finish() { + Status finish(OwnedSlice* slice) { _rle_encoder.Flush(); - return _bitmap_buf.build(); + RETURN_IF_CATCH_EXCEPTION({ *slice = _bitmap_buf.build(); }); + return Status::OK(); } void reset() { @@ -723,14 +724,15 @@ Status ScalarColumnWriter::finish_current_page() { // build data page body : encoded values + [nullmap] std::vector body; - OwnedSlice encoded_values = _page_builder->finish(); + OwnedSlice encoded_values; + RETURN_IF_ERROR(_page_builder->finish(&encoded_values)); RETURN_IF_ERROR(_page_builder->reset()); body.push_back(encoded_values.slice()); OwnedSlice nullmap; if (_null_bitmap_builder != nullptr) { if (is_nullable() && _null_bitmap_builder->has_null()) { - nullmap = _null_bitmap_builder->finish(); + RETURN_IF_ERROR(_null_bitmap_builder->finish(&nullmap)); body.push_back(nullmap.slice()); } _null_bitmap_builder->reset(); diff --git a/be/src/olap/rowset/segment_v2/frame_of_reference_page.h b/be/src/olap/rowset/segment_v2/frame_of_reference_page.h index 4477912803b3bb..5aedf126b55ce2 100644 --- a/be/src/olap/rowset/segment_v2/frame_of_reference_page.h +++ b/be/src/olap/rowset/segment_v2/frame_of_reference_page.h @@ -54,11 +54,12 @@ class FrameOfReferencePageBuilder : public PageBuilderHelperflush(); - return _buf.build(); + RETURN_IF_CATCH_EXCEPTION({ *slice = _buf.build(); }); + return Status::OK(); } Status reset() override { diff --git a/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp b/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp index e1b238084a9ce5..51606d818899ec 100644 --- a/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp +++ b/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp @@ -117,7 +117,8 @@ Status IndexedColumnWriter::_finish_current_data_page(size_t& num_val) { ordinal_t first_ordinal = _num_values - num_values_in_page; // IndexedColumn doesn't have NULLs, thus data page body only contains encoded values - OwnedSlice page_body = _data_page_builder->finish(); + OwnedSlice page_body; + RETURN_IF_ERROR(_data_page_builder->finish(&page_body)); RETURN_IF_ERROR(_data_page_builder->reset()); PageFooterPB footer; diff --git a/be/src/olap/rowset/segment_v2/page_builder.h b/be/src/olap/rowset/segment_v2/page_builder.h index 61fa2eaf8e1d66..7e24c56796cbb5 100644 --- a/be/src/olap/rowset/segment_v2/page_builder.h +++ b/be/src/olap/rowset/segment_v2/page_builder.h @@ -63,7 +63,8 @@ class PageBuilder { // Finish building the current page, return the encoded data. // This api should be followed by reset() before reusing the builder - virtual OwnedSlice finish() = 0; + // It will return error status when memory allocated failed during finish + virtual Status finish(OwnedSlice* owned_slice) = 0; // Get the dictionary page for dictionary encoding mode column. virtual Status get_dictionary_page(OwnedSlice* dictionary_page) { diff --git a/be/src/olap/rowset/segment_v2/page_io.cpp b/be/src/olap/rowset/segment_v2/page_io.cpp index cea4a23f742178..07d5656ee8a44b 100644 --- a/be/src/olap/rowset/segment_v2/page_io.cpp +++ b/be/src/olap/rowset/segment_v2/page_io.cpp @@ -111,8 +111,8 @@ Status PageIO::write_page(io::FileWriter* writer, const std::vector& body return Status::OK(); } -Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle* handle, - Slice* body, PageFooterPB* footer) { +Status PageIO::read_and_decompress_page_(const PageReadOptions& opts, PageHandle* handle, + Slice* body, PageFooterPB* footer) { opts.sanity_check(); opts.stats->total_pages_num++; diff --git a/be/src/olap/rowset/segment_v2/page_io.h b/be/src/olap/rowset/segment_v2/page_io.h index 31c81880dac650..889dae6d34efe6 100644 --- a/be/src/olap/rowset/segment_v2/page_io.h +++ b/be/src/olap/rowset/segment_v2/page_io.h @@ -123,8 +123,17 @@ class PageIO { // `handle' holds the memory of page data, // `body' points to page body, // `footer' stores the page footer. + // This method is exception safe, it will failed when allocate memory failed. static Status read_and_decompress_page(const PageReadOptions& opts, PageHandle* handle, - Slice* body, PageFooterPB* footer); + Slice* body, PageFooterPB* footer) { + RETURN_IF_CATCH_EXCEPTION( + { return read_and_decompress_page_(opts, handle, body, footer); }); + } + +private: + // An internal method that not deal with exception. + static Status read_and_decompress_page_(const PageReadOptions& opts, PageHandle* handle, + Slice* body, PageFooterPB* footer); }; } // namespace segment_v2 diff --git a/be/src/olap/rowset/segment_v2/plain_page.h b/be/src/olap/rowset/segment_v2/plain_page.h index af31275002ad3c..28b1e96d206fbb 100644 --- a/be/src/olap/rowset/segment_v2/plain_page.h +++ b/be/src/olap/rowset/segment_v2/plain_page.h @@ -59,14 +59,18 @@ class PlainPageBuilder : public PageBuilderHelper > { return Status::OK(); } - OwnedSlice finish() override { + Status finish(OwnedSlice* slice) override { encode_fixed32_le((uint8_t*)&_buffer[0], _count); - if (_count > 0) { - _first_value.assign_copy(&_buffer[PLAIN_PAGE_HEADER_SIZE], SIZE_OF_TYPE); - _last_value.assign_copy(&_buffer[PLAIN_PAGE_HEADER_SIZE + (_count - 1) * SIZE_OF_TYPE], - SIZE_OF_TYPE); - } - return _buffer.build(); + RETURN_IF_CATCH_EXCEPTION({ + if (_count > 0) { + _first_value.assign_copy(&_buffer[PLAIN_PAGE_HEADER_SIZE], SIZE_OF_TYPE); + _last_value.assign_copy( + &_buffer[PLAIN_PAGE_HEADER_SIZE + (_count - 1) * SIZE_OF_TYPE], + SIZE_OF_TYPE); + } + *slice = _buffer.build(); + }); + return Status::OK(); } Status reset() override { diff --git a/be/src/olap/rowset/segment_v2/rle_page.h b/be/src/olap/rowset/segment_v2/rle_page.h index 40ec587743c1a2..d1974f18d39ddc 100644 --- a/be/src/olap/rowset/segment_v2/rle_page.h +++ b/be/src/olap/rowset/segment_v2/rle_page.h @@ -94,14 +94,15 @@ class RlePageBuilder : public PageBuilderHelper > { return Status::OK(); } - OwnedSlice finish() override { + Status finish(OwnedSlice* slice) override { DCHECK(!_finished); _finished = true; // here should Flush first and then encode the count header // or it will lead to a bug if the header is less than 8 byte and the data is small _rle_encoder->Flush(); encode_fixed32_le(&_buf[0], _count); - return _buf.build(); + *slice = _buf.build(); + return Status::OK(); } Status reset() override { diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index dab359ed0408c6..550e60c210c080 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -846,6 +846,10 @@ Status PipelineFragmentContext::_create_sink(int sender_id, const TDataSink& thr break; } case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: { +#ifndef NDEBUG + DCHECK(state->get_query_ctx() != nullptr); + state->get_query_ctx()->query_mem_tracker->is_group_commit_load = true; +#endif sink_ = std::make_shared(next_operator_builder_id(), _sink.get()); break; diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 4b2478ccf99b4b..18061e04528c05 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -185,6 +185,15 @@ class ExecEnv { std::shared_ptr segcompaction_mem_tracker() { return _segcompaction_mem_tracker; } + std::shared_ptr stream_load_pipe_tracker() { + return _stream_load_pipe_tracker; + } + std::shared_ptr point_query_executor_mem_tracker() { + return _point_query_executor_mem_tracker; + } + std::shared_ptr block_compression_mem_tracker() { + return _block_compression_mem_tracker; + } std::shared_ptr rowid_storage_reader_tracker() { return _rowid_storage_reader_tracker; } @@ -362,6 +371,11 @@ class ExecEnv { std::shared_ptr _brpc_iobuf_block_memory_tracker; // Count the memory consumption of segment compaction tasks. std::shared_ptr _segcompaction_mem_tracker; + std::shared_ptr _stream_load_pipe_tracker; + + // Tracking memory may be shared between multiple queries. + std::shared_ptr _point_query_executor_mem_tracker; + std::shared_ptr _block_compression_mem_tracker; // TODO, looking forward to more accurate tracking. std::shared_ptr _rowid_storage_reader_tracker; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index bbd6bbc9447fbf..5d2ab598b33538 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -529,12 +529,18 @@ void ExecEnv::init_mem_tracker() { std::make_shared("IOBufBlockMemory", _details_mem_tracker_set.get()); _segcompaction_mem_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "SegCompaction"); + _point_query_executor_mem_tracker = + MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "PointQueryExecutor"); + _block_compression_mem_tracker = + MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "BlockCompression"); _rowid_storage_reader_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "RowIdStorageReader"); _subcolumns_tree_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "SubcolumnsTree"); _s3_file_buffer_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "S3FileBuffer"); + _stream_load_pipe_tracker = + MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "StreamLoadPipe"); } void ExecEnv::_register_metrics() { diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 494645d56b5533..49e9064938114f 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -39,6 +39,7 @@ #include "util/perf_counters.h" #include "util/pretty_printer.h" #include "util/runtime_profile.h" +#include "util/stack_util.h" namespace doris { @@ -110,9 +111,11 @@ std::shared_ptr MemTrackerLimiter::create_shared(MemTrackerLi MemTrackerLimiter::~MemTrackerLimiter() { consume(_untracked_mem); static std::string mem_tracker_inaccurate_msg = - ", mem tracker not equal to 0 when mem tracker destruct, this usually means that " + "mem tracker not equal to 0 when mem tracker destruct, this usually means that " "memory tracking is inaccurate and SCOPED_ATTACH_TASK and " "SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER are not used correctly. " + "If the log is truncated, search for `Address Sanitizer` in the be.INFO log to see " + "more information." "1. For query and load, memory leaks may have occurred, it is expected that the query " "mem tracker will be bound to the thread context using SCOPED_ATTACH_TASK and " "SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER before all memory alloc and free. " @@ -124,21 +127,101 @@ MemTrackerLimiter::~MemTrackerLimiter() { "4. If you need to " "transfer memory tracking value between two trackers, can use transfer_to."; if (_consumption->current_value() != 0) { - // TODO, expect mem tracker equal to 0 at the task end. - if (doris::config::enable_memory_orphan_check && _type == Type::QUERY) { - LOG(INFO) << "mem tracker label: " << _label - << ", consumption: " << _consumption->current_value() - << ", peak consumption: " << _consumption->peak_value() - << mem_tracker_inaccurate_msg; + // TODO, expect mem tracker equal to 0 at the load/compaction/etc. task end. +#ifndef NDEBUG + if (_type == Type::QUERY || (_type == Type::LOAD && !is_group_commit_load)) { + std::string err_msg = + fmt::format("mem tracker label: {}, consumption: {}, peak consumption: {}, {}.", + label(), _consumption->current_value(), _consumption->peak_value(), + mem_tracker_inaccurate_msg); + LOG(INFO) << err_msg << print_address_sanitizers(); } +#endif if (ExecEnv::tracking_memory()) { ExecEnv::GetInstance()->orphan_mem_tracker()->consume(_consumption->current_value()); } _consumption->set(0); +#ifndef NDEBUG + } else if (!_address_sanitizers.empty() && !is_group_commit_load) { + LOG(INFO) << "[Address Sanitizer] consumption is 0, but address sanitizers not empty. " + << ", mem tracker label: " << _label + << ", peak consumption: " << _consumption->peak_value() + << print_address_sanitizers(); +#endif } memory_memtrackerlimiter_cnt << -1; } +#ifndef NDEBUG +void MemTrackerLimiter::add_address_sanitizers(void* buf, size_t size) { + if (_type == Type::QUERY || (_type == Type::LOAD && !is_group_commit_load)) { + std::lock_guard l(_address_sanitizers_mtx); + auto it = _address_sanitizers.find(buf); + if (it != _address_sanitizers.end()) { + _error_address_sanitizers.emplace_back( + fmt::format("[Address Sanitizer] memory buf repeat add, mem tracker label: {}, " + "consumption: {}, peak consumption: {}, buf: {}, size: {}, old " + "buf: {}, old size: {}, new stack_trace: {}, old stack_trace: {}.", + _label, _consumption->current_value(), _consumption->peak_value(), + buf, size, it->first, it->second.size, + get_stack_trace(1, "FULL_WITH_INLINE"), it->second.stack_trace)); + } + + // if alignment not equal to 0, maybe usable_size > size. + AddressSanitizer as = {size, doris::config::enable_address_sanitizers_with_stack_trace + ? get_stack_trace() + : ""}; + _address_sanitizers.emplace(buf, as); + } +} + +void MemTrackerLimiter::remove_address_sanitizers(void* buf, size_t size) { + if (_type == Type::QUERY || (_type == Type::LOAD && !is_group_commit_load)) { + std::lock_guard l(_address_sanitizers_mtx); + auto it = _address_sanitizers.find(buf); + if (it != _address_sanitizers.end()) { + if (it->second.size != size) { + _error_address_sanitizers.emplace_back(fmt::format( + "[Address Sanitizer] free memory buf size inaccurate, mem tracker label: " + "{}, consumption: {}, peak consumption: {}, buf: {}, size: {}, old buf: " + "{}, old size: {}, new stack_trace: {}, old stack_trace: {}.", + _label, _consumption->current_value(), _consumption->peak_value(), buf, + size, it->first, it->second.size, get_stack_trace(1, "FULL_WITH_INLINE"), + it->second.stack_trace)); + } + _address_sanitizers.erase(buf); + } else { + _error_address_sanitizers.emplace_back(fmt::format( + "[Address Sanitizer] memory buf not exist, mem tracker label: {}, consumption: " + "{}, peak consumption: {}, buf: {}, size: {}, stack_trace: {}.", + _label, _consumption->current_value(), _consumption->peak_value(), buf, size, + get_stack_trace(1, "FULL_WITH_INLINE"))); + } + } +} + +std::string MemTrackerLimiter::print_address_sanitizers() { + std::lock_guard l(_address_sanitizers_mtx); + std::string detail = "[Address Sanitizer]:"; + detail += "\n memory not be freed:"; + for (const auto& it : _address_sanitizers) { + auto msg = fmt::format( + "\n [Address Sanitizer] buf not be freed, mem tracker label: {}, consumption: " + "{}, peak consumption: {}, buf: {}, size {}, strack trace: {}", + _label, _consumption->current_value(), _consumption->peak_value(), it.first, + it.second.size, it.second.stack_trace); + LOG(INFO) << msg; + detail += msg; + } + detail += "\n incorrect memory alloc and free:"; + for (const auto& err_msg : _error_address_sanitizers) { + LOG(INFO) << err_msg; + detail += fmt::format("\n {}", err_msg); + } + return detail; +} +#endif + MemTracker::Snapshot MemTrackerLimiter::make_snapshot() const { Snapshot snapshot; snapshot.type = type_string(_type); diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 67c40e1f6c52b9..344f3dc92b6670 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -205,6 +205,13 @@ class MemTrackerLimiter final : public MemTracker { // Log the memory usage when memory limit is exceeded. std::string tracker_limit_exceeded_str(); +#ifndef NDEBUG + void add_address_sanitizers(void* buf, size_t size); + void remove_address_sanitizers(void* buf, size_t size); + std::string print_address_sanitizers(); + bool is_group_commit_load {false}; +#endif + std::string debug_string() override { std::stringstream msg; msg << "limit: " << _limit << "; " @@ -245,6 +252,17 @@ class MemTrackerLimiter final : public MemTracker { // Avoid frequent printing. bool _enable_print_log_usage = false; static std::atomic _enable_print_log_process_usage; + +#ifndef NDEBUG + struct AddressSanitizer { + size_t size; + std::string stack_trace; + }; + + std::mutex _address_sanitizers_mtx; + std::unordered_map _address_sanitizers; + std::vector _error_address_sanitizers; +#endif }; inline int64_t MemTrackerLimiter::add_untracked_mem(int64_t bytes) { diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 2713ee441dd0df..cdb5a65a977147 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -463,15 +463,6 @@ Status RuntimeState::append_error_msg_to_file(std::function line, return Status::OK(); } -int64_t RuntimeState::get_load_mem_limit() { - // TODO: the code is abandoned, it can be deleted after v1.3 - if (_query_options.__isset.load_mem_limit && _query_options.load_mem_limit > 0) { - return _query_options.load_mem_limit; - } else { - return _query_mem_tracker->limit(); - } -} - void RuntimeState::resize_op_id_to_local_state(int operator_size) { _op_id_to_local_state.resize(-operator_size); } diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 8b8cbd85f0f117..8243faa37aeec5 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -466,10 +466,6 @@ class RuntimeState { std::vector& error_tablet_infos() { return _error_tablet_infos; } - // get mem limit for load channel - // if load mem limit is not set, or is zero, using query mem limit instead. - int64_t get_load_mem_limit(); - // local runtime filter mgr, the runtime filter do not have remote target or // not need local merge should regist here. the instance exec finish, the local // runtime filter mgr can release the memory of local runtime filter diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index 1dc7ccf73ba18b..2ccf8ce5014a88 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -37,6 +37,7 @@ #include "common/utils.h" #include "runtime/exec_env.h" #include "runtime/stream_load/stream_load_executor.h" +#include "runtime/thread_context.h" #include "util/byte_buffer.h" #include "util/time.h" #include "util/uid_util.h" @@ -118,6 +119,17 @@ class StreamLoadContext { // also print the load source info if detail is set to true std::string brief(bool detail = false) const; + Status allocate_schema_buffer() { + if (_schema_buffer == nullptr) { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->stream_load_pipe_tracker()); + return ByteBuffer::allocate(config::stream_tvf_buffer_size, &_schema_buffer); + } + return Status::OK(); + } + + ByteBufferPtr schema_buffer() { return _schema_buffer; } + public: static const int default_txn_id = -1; // load type, eg: ROUTINE LOAD/MANUAL LOAD @@ -182,8 +194,6 @@ class StreamLoadContext { std::shared_ptr body_sink; std::shared_ptr pipe; - ByteBufferPtr schema_buffer = ByteBuffer::allocate(config::stream_tvf_buffer_size); - TStreamLoadPutResult put_result; TStreamLoadMultiTablePutResult multi_table_put_result; @@ -241,6 +251,7 @@ class StreamLoadContext { private: ExecEnv* _exec_env = nullptr; + ByteBufferPtr _schema_buffer; }; } // namespace doris diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 0761b445bee084..0616c6474aad32 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -142,6 +142,10 @@ Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptrthread_mem_tracker_mgr->disable_wait_gc(); // set the data so that next time bthread_getspecific in the thread returns the data. - CHECK(0 == bthread_setspecific(btls_key, bthread_context) || k_doris_exit); + CHECK(0 == bthread_setspecific(btls_key, bthread_context) || doris::k_doris_exit); } DCHECK(bthread_context != nullptr); bthread_context->thread_local_handle_count++; @@ -357,7 +356,7 @@ static ThreadContext* thread_context(bool allow_return_null = false) { // in bthread // bthread switching pthread may be very frequent, remember not to use lock or other time-consuming operations. auto* bthread_context = static_cast(bthread_getspecific(btls_key)); - DCHECK(bthread_context != nullptr); + DCHECK(bthread_context != nullptr && bthread_context->thread_local_handle_count > 0); return bthread_context; } if (allow_return_null) { @@ -443,30 +442,38 @@ class AttachTask { class SwitchThreadMemTrackerLimiter { public: - explicit SwitchThreadMemTrackerLimiter(const std::shared_ptr& mem_tracker) { + explicit SwitchThreadMemTrackerLimiter( + const std::shared_ptr& mem_tracker) { DCHECK(mem_tracker); - ThreadLocalHandle::create_thread_local_if_not_exits(); - _old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); - thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker); + doris::ThreadLocalHandle::create_thread_local_if_not_exits(); + if (mem_tracker != thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()) { + _old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); + thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker); + } } - explicit SwitchThreadMemTrackerLimiter(const QueryThreadContext& query_thread_context) { - ThreadLocalHandle::create_thread_local_if_not_exits(); + explicit SwitchThreadMemTrackerLimiter(const doris::QueryThreadContext& query_thread_context) { + doris::ThreadLocalHandle::create_thread_local_if_not_exits(); DCHECK(thread_context()->task_id() == query_thread_context.query_id); // workload group alse not change DCHECK(query_thread_context.query_mem_tracker); - _old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); - thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker( - query_thread_context.query_mem_tracker); + if (query_thread_context.query_mem_tracker != + thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()) { + _old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); + thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker( + query_thread_context.query_mem_tracker); + } } ~SwitchThreadMemTrackerLimiter() { - thread_context()->thread_mem_tracker_mgr->detach_limiter_tracker(_old_mem_tracker); - ThreadLocalHandle::del_thread_local_if_count_is_zero(); + if (_old_mem_tracker != nullptr) { + thread_context()->thread_mem_tracker_mgr->detach_limiter_tracker(_old_mem_tracker); + } + doris::ThreadLocalHandle::del_thread_local_if_count_is_zero(); } private: - std::shared_ptr _old_mem_tracker; + std::shared_ptr _old_mem_tracker {nullptr}; }; class AddThreadMemTrackerConsumer { @@ -549,7 +556,7 @@ class ScopeSkipMemoryCheck { // must call create_thread_local_if_not_exits() before use thread_context(). #define CONSUME_THREAD_MEM_TRACKER(size) \ do { \ - if (doris::use_mem_hook || size == 0) { \ + if (size == 0 || doris::use_mem_hook) { \ break; \ } \ if (doris::pthread_context_ptr_init) { \ diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 0801f30fb2e265..099236153a396e 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -784,8 +784,9 @@ void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c const TFileScanRangeParams& params = file_scan_range.params; std::shared_ptr mem_tracker = MemTrackerLimiter::create_shared( - MemTrackerLimiter::Type::SCHEMA_CHANGE, - fmt::format("{}#{}", params.format_type, params.file_type)); + MemTrackerLimiter::Type::OTHER, + fmt::format("InternalService::fetch_table_schema:{}#{}", params.format_type, + params.file_type)); SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker); // make sure profile is desctructed after reader cause PrefetchBufferedReader diff --git a/be/src/service/point_query_executor.cpp b/be/src/service/point_query_executor.cpp index c89bc52115a786..4e9295ed53dcee 100644 --- a/be/src/service/point_query_executor.cpp +++ b/be/src/service/point_query_executor.cpp @@ -37,6 +37,7 @@ #include "olap/tablet_schema.h" #include "runtime/exec_env.h" #include "runtime/runtime_state.h" +#include "runtime/thread_context.h" #include "util/key_util.h" #include "util/runtime_profile.h" #include "util/simd/bits.h" @@ -166,7 +167,8 @@ void RowCache::erase(const RowCacheKey& key) { } PointQueryExecutor::~PointQueryExecutor() { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->point_query_executor_mem_tracker()); _tablet.reset(); _reusable.reset(); _result_block.reset(); @@ -180,10 +182,7 @@ Status PointQueryExecutor::init(const PTabletKeyLookupRequest* request, // using cache __int128_t uuid = static_cast<__int128_t>(request->uuid().uuid_high()) << 64 | request->uuid().uuid_low(); - _mem_tracker = MemTrackerLimiter::create_shared( - MemTrackerLimiter::Type::QUERY, - fmt::format("PointQueryExecutor:{}#{}", uuid, request->tablet_id())); - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker); + SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->point_query_executor_mem_tracker()); auto cache_handle = LookupConnectionCache::instance()->get(uuid); _binary_row_format = request->is_binary_row(); _tablet = StorageEngine::instance()->tablet_manager()->get_tablet(request->tablet_id()); @@ -234,7 +233,7 @@ Status PointQueryExecutor::init(const PTabletKeyLookupRequest* request, } Status PointQueryExecutor::lookup_up() { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker); + SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->point_query_executor_mem_tracker()); RETURN_IF_ERROR(_lookup_row_key()); RETURN_IF_ERROR(_lookup_row_data()); RETURN_IF_ERROR(_output_data()); diff --git a/be/src/service/point_query_executor.h b/be/src/service/point_query_executor.h index 565a585d322afe..e168ef16ad798a 100644 --- a/be/src/service/point_query_executor.h +++ b/be/src/service/point_query_executor.h @@ -323,7 +323,6 @@ class PointQueryExecutor { std::vector _row_read_ctxs; std::shared_ptr _reusable; std::unique_ptr _result_block; - std::shared_ptr _mem_tracker; Metrics _profile_metrics; bool _binary_row_format = false; }; diff --git a/be/src/util/block_compression.cpp b/be/src/util/block_compression.cpp index 10f975451d3d1e..e71a890142155d 100644 --- a/be/src/util/block_compression.cpp +++ b/be/src/util/block_compression.cpp @@ -53,6 +53,7 @@ #include "gutil/endian.h" #include "gutil/strings/substitute.h" #include "orc/OrcFile.hh" +#include "runtime/thread_context.h" #include "util/bit_util.h" #include "util/defer_op.h" #include "util/faststring.h" @@ -99,10 +100,16 @@ class Lz4BlockCompression : public BlockCompressionCodec { ENABLE_FACTORY_CREATOR(Context); public: - Context() : ctx(nullptr) { buffer = std::make_unique(); } + Context() : ctx(nullptr) { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); + buffer = std::make_unique(); + } LZ4_stream_t* ctx; std::unique_ptr buffer; ~Context() { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); if (ctx) { LZ4_freeStream(ctx); } @@ -115,7 +122,11 @@ class Lz4BlockCompression : public BlockCompressionCodec { static Lz4BlockCompression s_instance; return &s_instance; } - ~Lz4BlockCompression() { _ctx_pool.clear(); } + ~Lz4BlockCompression() { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); + _ctx_pool.clear(); + } Status compress(const Slice& input, faststring* output) override { if (input.size > INT_MAX) { @@ -144,7 +155,13 @@ class Lz4BlockCompression : public BlockCompressionCodec { compressed_buf.size = max_len; } else { // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE - context->buffer->resize(max_len); + { + // context->buffer is resuable between queries, should accouting to + // global tracker. + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); + context->buffer->resize(max_len); + } compressed_buf.data = reinterpret_cast(context->buffer->data()); compressed_buf.size = max_len; } @@ -253,10 +270,16 @@ class Lz4fBlockCompression : public BlockCompressionCodec { ENABLE_FACTORY_CREATOR(CContext); public: - CContext() : ctx(nullptr) { buffer = std::make_unique(); } + CContext() : ctx(nullptr) { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); + buffer = std::make_unique(); + } LZ4F_compressionContext_t ctx; std::unique_ptr buffer; ~CContext() { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); if (ctx) { LZ4F_freeCompressionContext(ctx); } @@ -282,6 +305,8 @@ class Lz4fBlockCompression : public BlockCompressionCodec { return &s_instance; } ~Lz4fBlockCompression() { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); _ctx_c_pool.clear(); _ctx_d_pool.clear(); } @@ -326,8 +351,12 @@ class Lz4fBlockCompression : public BlockCompressionCodec { compressed_buf.data = reinterpret_cast(output->data()); compressed_buf.size = max_len; } else { - // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE - context->buffer->resize(max_len); + { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); + // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE + context->buffer->resize(max_len); + } compressed_buf.data = reinterpret_cast(context->buffer->data()); compressed_buf.size = max_len; } @@ -482,10 +511,16 @@ class Lz4HCBlockCompression : public BlockCompressionCodec { ENABLE_FACTORY_CREATOR(Context); public: - Context() : ctx(nullptr) { buffer = std::make_unique(); } + Context() : ctx(nullptr) { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); + buffer = std::make_unique(); + } LZ4_streamHC_t* ctx; std::unique_ptr buffer; ~Context() { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); if (ctx) { LZ4_freeStreamHC(ctx); } @@ -498,7 +533,11 @@ class Lz4HCBlockCompression : public BlockCompressionCodec { static Lz4HCBlockCompression s_instance; return &s_instance; } - ~Lz4HCBlockCompression() { _ctx_pool.clear(); } + ~Lz4HCBlockCompression() { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); + _ctx_pool.clear(); + } Status compress(const Slice& input, faststring* output) override { std::unique_ptr context; @@ -519,7 +558,12 @@ class Lz4HCBlockCompression : public BlockCompressionCodec { compressed_buf.data = reinterpret_cast(output->data()); compressed_buf.size = max_len; } else { - context->buffer->resize(max_len); + { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); + // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE + context->buffer->resize(max_len); + } compressed_buf.data = reinterpret_cast(context->buffer->data()); compressed_buf.size = max_len; } @@ -781,10 +825,16 @@ class ZstdBlockCompression : public BlockCompressionCodec { ENABLE_FACTORY_CREATOR(CContext); public: - CContext() : ctx(nullptr) { buffer = std::make_unique(); } + CContext() : ctx(nullptr) { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); + buffer = std::make_unique(); + } ZSTD_CCtx* ctx; std::unique_ptr buffer; ~CContext() { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); if (ctx) { ZSTD_freeCCtx(ctx); } @@ -810,6 +860,8 @@ class ZstdBlockCompression : public BlockCompressionCodec { return &s_instance; } ~ZstdBlockCompression() { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); _ctx_c_pool.clear(); _ctx_d_pool.clear(); } @@ -843,8 +895,12 @@ class ZstdBlockCompression : public BlockCompressionCodec { compressed_buf.data = reinterpret_cast(output->data()); compressed_buf.size = max_len; } else { - // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE - context->buffer->resize(max_len); + { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); + // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE + context->buffer->resize(max_len); + } compressed_buf.data = reinterpret_cast(context->buffer->data()); compressed_buf.size = max_len; } diff --git a/be/src/util/byte_buffer.h b/be/src/util/byte_buffer.h index aab8fd42db6e3b..17764b9e4f6ec1 100644 --- a/be/src/util/byte_buffer.h +++ b/be/src/util/byte_buffer.h @@ -23,19 +23,26 @@ #include #include "common/logging.h" +#include "common/status.h" +#include "runtime/thread_context.h" +#include "vec/common/allocator.h" +#include "vec/common/allocator_fwd.h" namespace doris { struct ByteBuffer; using ByteBufferPtr = std::shared_ptr; -struct ByteBuffer { - static ByteBufferPtr allocate(size_t size) { - ByteBufferPtr ptr(new ByteBuffer(size)); - return ptr; +struct ByteBuffer : private Allocator { + static Status allocate(const size_t size, ByteBufferPtr* ptr) { + RETURN_IF_CATCH_EXCEPTION({ *ptr = ByteBufferPtr(new ByteBuffer(size)); }); + return Status::OK(); } - ~ByteBuffer() { delete[] ptr; } + ~ByteBuffer() { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker_); + Allocator::free(ptr, capacity); + } void put_bytes(const char* data, size_t size) { memcpy(ptr + pos, data, size); @@ -56,14 +63,21 @@ struct ByteBuffer { size_t remaining() const { return limit - pos; } bool has_remaining() const { return limit > pos; } - char* const ptr; + char* ptr; size_t pos; size_t limit; size_t capacity; private: ByteBuffer(size_t capacity_) - : ptr(new char[capacity_]), pos(0), limit(capacity_), capacity(capacity_) {} + : pos(0), + limit(capacity_), + capacity(capacity_), + mem_tracker_(doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()) { + ptr = reinterpret_cast(Allocator::alloc(capacity_)); + } + + std::shared_ptr mem_tracker_; }; } // namespace doris diff --git a/be/src/util/faststring.h b/be/src/util/faststring.h index 9308a4d20bbb1c..eae7db536252ca 100644 --- a/be/src/util/faststring.h +++ b/be/src/util/faststring.h @@ -85,7 +85,8 @@ class faststring : private Allocator(Allocator::alloc(len_)); + ret = reinterpret_cast(Allocator::alloc(capacity_)); + DCHECK(len_ <= capacity_); memcpy(ret, data_, len_); } OwnedSlice result(ret, len_, capacity_); diff --git a/be/src/util/slice.h b/be/src/util/slice.h index bae33d4ee75010..b38b1147894f9e 100644 --- a/be/src/util/slice.h +++ b/be/src/util/slice.h @@ -358,7 +358,16 @@ class OwnedSlice : private Allocator::throw_b throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err); } +#ifndef NDEBUG +template +void Allocator::add_address_sanitizers( + void* buf, size_t size) const { +#ifdef BE_TEST + if (!doris::ExecEnv::ready()) { + return; + } +#endif + doris::thread_context()->thread_mem_tracker()->add_address_sanitizers(buf, size); +} + +template +void Allocator::remove_address_sanitizers( + void* buf, size_t size) const { +#ifdef BE_TEST + if (!doris::ExecEnv::ready()) { + return; + } +#endif + doris::thread_context()->thread_mem_tracker()->remove_address_sanitizers(buf, size); +} +#endif + template void* Allocator::alloc(size_t size, size_t alignment) { diff --git a/be/src/vec/common/allocator.h b/be/src/vec/common/allocator.h index 83cb6eddb7ddec..026d398882feb0 100644 --- a/be/src/vec/common/allocator.h +++ b/be/src/vec/common/allocator.h @@ -87,6 +87,10 @@ static constexpr size_t MALLOC_MIN_ALIGNMENT = 8; // is always a multiple of sixteen. (https://www.gnu.org/software/libc/manual/html_node/Aligned-Memory-Blocks.html) static constexpr int ALLOCATOR_ALIGNMENT_16 = 16; +namespace doris { +class MemTrackerLimiter; +} + class DefaultMemoryAllocator { public: static void* malloc(size_t size) __THROW { return std::malloc(size); } @@ -228,9 +232,20 @@ class Allocator { // alloc will continue to execute, so the consume memtracker is forced. void memory_check(size_t size) const; // Increases consumption of this tracker by 'bytes'. + // some special cases: + // 1. objects that inherit Allocator will not be shared by multiple queries. + // non-compliant: page cache, ORC ByteBuffer. + // 2. objects that inherit Allocator will only free memory allocated by themselves. + // non-compliant: phmap, the memory alloced by an object may be transferred to another object and then free. + // 3. the memory tracker in TLS is the same during the construction of objects that inherit Allocator + // and during subsequent memory allocation. void consume_memory(size_t size) const; void release_memory(size_t size) const; void throw_bad_alloc(const std::string& err) const; +#ifndef NDEBUG + void add_address_sanitizers(void* buf, size_t size) const; + void remove_address_sanitizers(void* buf, size_t size) const; +#endif void* alloc(size_t size, size_t alignment = 0); void* realloc(void* buf, size_t old_size, size_t new_size, size_t alignment = 0); @@ -238,6 +253,7 @@ class Allocator { /// Allocate memory range. void* alloc_impl(size_t size, size_t alignment = 0) { memory_check(size); + // consume memory in tracker before alloc, similar to early declaration. consume_memory(size); void* buf; size_t record_size = size; @@ -273,6 +289,9 @@ class Allocator { if constexpr (MemoryAllocator::need_record_actual_size()) { record_size = MemoryAllocator::allocated_size(buf); } +#ifndef NDEBUG + add_address_sanitizers(buf, record_size); +#endif } else { buf = nullptr; int res = MemoryAllocator::posix_memalign(&buf, alignment, size); @@ -282,6 +301,9 @@ class Allocator { throw_bad_alloc( fmt::format("Cannot allocate memory (posix_memalign) {}.", size)); } +#ifndef NDEBUG + add_address_sanitizers(buf, size); +#endif if constexpr (clear_memory) memset(buf, 0, size); @@ -303,6 +325,9 @@ class Allocator { throw_bad_alloc(fmt::format("Allocator: Cannot munmap {}.", size)); } } else { +#ifndef NDEBUG + remove_address_sanitizers(buf, size); +#endif MemoryAllocator::free(buf); } release_memory(size); @@ -326,6 +351,9 @@ class Allocator { if (!use_mmap || (old_size < doris::config::mmap_threshold && new_size < doris::config::mmap_threshold && alignment <= MALLOC_MIN_ALIGNMENT)) { +#ifndef NDEBUG + remove_address_sanitizers(buf, old_size); +#endif /// Resize malloc'd memory region with no special alignment requirement. void* new_buf = MemoryAllocator::realloc(buf, new_size); if (nullptr == new_buf) { @@ -333,6 +361,10 @@ class Allocator { throw_bad_alloc(fmt::format("Allocator: Cannot realloc from {} to {}.", old_size, new_size)); } +#ifndef NDEBUG + add_address_sanitizers( + new_buf, new_size); // usually, buf addr = new_buf addr, asan maybe not equal. +#endif buf = new_buf; if constexpr (clear_memory) @@ -362,6 +394,10 @@ class Allocator { // Big allocs that requires a copy. void* new_buf = alloc(new_size, alignment); memcpy(new_buf, buf, std::min(old_size, new_size)); +#ifndef NDEBUG + add_address_sanitizers(new_buf, new_size); + remove_address_sanitizers(buf, old_size); +#endif free(buf, old_size); buf = new_buf; } diff --git a/be/src/vec/common/pod_array_fwd.h b/be/src/vec/common/pod_array_fwd.h index ff00b312575a63..e1a428eda9dafb 100644 --- a/be/src/vec/common/pod_array_fwd.h +++ b/be/src/vec/common/pod_array_fwd.h @@ -36,9 +36,12 @@ template class PODArray; -/** For columns. Padding is enough to read and write xmm-register at the address of the last element. */ +/** For columns. Padding is enough to read and write xmm-register at the address of the last element. + * TODO, pad_right is temporarily changed from 15 to 16, will waste 1 bytes, + * can rollback after fix wrong reinterpret_cast column and PODArray swap. + */ template > -using PaddedPODArray = PODArray; +using PaddedPODArray = PODArray; /** A helper for declaring PODArray that uses inline memory. * The initial size is set to use all the inline bytes, since using less would diff --git a/be/src/vec/common/schema_util.cpp b/be/src/vec/common/schema_util.cpp index bb7b64992dee27..61b98bafd48ef6 100644 --- a/be/src/vec/common/schema_util.cpp +++ b/be/src/vec/common/schema_util.cpp @@ -545,15 +545,9 @@ Status _parse_variant_columns(Block& block, const std::vector& variant_pos, Status parse_variant_columns(Block& block, const std::vector& variant_pos, const ParseContext& ctx) { - try { - // Parse each variant column from raw string column - RETURN_IF_ERROR(vectorized::schema_util::_parse_variant_columns(block, variant_pos, ctx)); - } catch (const doris::Exception& e) { - // TODO more graceful, max_filter_ratio - LOG(WARNING) << "encounter execption " << e.to_string(); - return Status::InternalError(e.to_string()); - } - return Status::OK(); + // Parse each variant column from raw string column + RETURN_IF_CATCH_EXCEPTION( + { return vectorized::schema_util::_parse_variant_columns(block, variant_pos, ctx); }); } void finalize_variant_columns(Block& block, const std::vector& variant_pos, diff --git a/be/src/vec/exec/scan/group_commit_scan_node.cpp b/be/src/vec/exec/scan/group_commit_scan_node.cpp index 50ba8e31be468f..be48bf7bf31bae 100644 --- a/be/src/vec/exec/scan/group_commit_scan_node.cpp +++ b/be/src/vec/exec/scan/group_commit_scan_node.cpp @@ -42,6 +42,10 @@ Status GroupCommitScanNode::get_next(RuntimeState* state, vectorized::Block* blo Status GroupCommitScanNode::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(VScanNode::init(tnode, state)); +#ifndef NDEBUG + DCHECK(state->get_query_ctx() != nullptr); + state->get_query_ctx()->query_mem_tracker->is_group_commit_load = true; +#endif return state->exec_env()->group_commit_mgr()->get_load_block_queue( _table_id, state->fragment_instance_id(), load_block_queue); } diff --git a/be/src/vec/sink/vtablet_block_convertor.cpp b/be/src/vec/sink/vtablet_block_convertor.cpp index 96de68f597677c..086c9a3ddd04d9 100644 --- a/be/src/vec/sink/vtablet_block_convertor.cpp +++ b/be/src/vec/sink/vtablet_block_convertor.cpp @@ -182,12 +182,11 @@ DecimalType OlapTableBlockConvertor::_get_decimalv3_min_or_max(const TypeDescrip return DecimalType(value); } -Status OlapTableBlockConvertor::_validate_column(RuntimeState* state, const TypeDescriptor& type, - bool is_nullable, vectorized::ColumnPtr column, - size_t slot_index, bool* stop_processing, - fmt::memory_buffer& error_prefix, - const uint32_t row_count, - vectorized::IColumn::Permutation* rows) { +Status OlapTableBlockConvertor::_internal_validate_column( + RuntimeState* state, const TypeDescriptor& type, bool is_nullable, + vectorized::ColumnPtr column, size_t slot_index, bool* stop_processing, + fmt::memory_buffer& error_prefix, const uint32_t row_count, + vectorized::IColumn::Permutation* rows) { DCHECK((rows == nullptr) || (rows->size() == row_count)); fmt::memory_buffer error_msg; auto set_invalid_and_append_error_msg = [&](int row) { diff --git a/be/src/vec/sink/vtablet_block_convertor.h b/be/src/vec/sink/vtablet_block_convertor.h index 0db340ce6c27d4..7f866c38032775 100644 --- a/be/src/vec/sink/vtablet_block_convertor.h +++ b/be/src/vec/sink/vtablet_block_convertor.h @@ -69,7 +69,18 @@ class OlapTableBlockConvertor { Status _validate_column(RuntimeState* state, const TypeDescriptor& type, bool is_nullable, vectorized::ColumnPtr column, size_t slot_index, bool* stop_processing, fmt::memory_buffer& error_prefix, const uint32_t row_count, - vectorized::IColumn::Permutation* rows = nullptr); + vectorized::IColumn::Permutation* rows = nullptr) { + RETURN_IF_CATCH_EXCEPTION({ + return _internal_validate_column(state, type, is_nullable, column, slot_index, + stop_processing, error_prefix, row_count, rows); + }); + } + + Status _internal_validate_column(RuntimeState* state, const TypeDescriptor& type, + bool is_nullable, vectorized::ColumnPtr column, + size_t slot_index, bool* stop_processing, + fmt::memory_buffer& error_prefix, const uint32_t row_count, + vectorized::IColumn::Permutation* rows = nullptr); // make input data valid for OLAP table // return number of invalid/filtered rows. diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index 1e6b8f7b8687b6..d1651bb3a92419 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -413,7 +413,6 @@ void VNodeChannel::_open_internal(bool is_incremental) { request->set_num_senders(_parent->_num_senders); request->set_need_gen_rollup(false); // Useless but it is a required field in pb - request->set_load_mem_limit(_parent->_load_mem_limit); request->set_load_channel_timeout_s(_parent->_load_channel_timeout_s); request->set_is_high_priority(_parent->_is_high_priority); request->set_sender_ip(BackendOptions::get_localhost()); @@ -1226,7 +1225,6 @@ Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) { _max_wait_exec_timer = ADD_TIMER(profile, "MaxWaitExecTime"); _add_batch_number = ADD_COUNTER(profile, "NumberBatchAdded", TUnit::UNIT); _num_node_channels = ADD_COUNTER(profile, "NumberNodeChannels", TUnit::UNIT); - _load_mem_limit = state->get_load_mem_limit(); #ifdef DEBUG // check: tablet ids should be unique diff --git a/be/src/vec/sink/writer/vtablet_writer.h b/be/src/vec/sink/writer/vtablet_writer.h index 603034cea6d7a5..ba986fbc6d4e0f 100644 --- a/be/src/vec/sink/writer/vtablet_writer.h +++ b/be/src/vec/sink/writer/vtablet_writer.h @@ -660,9 +660,6 @@ class VTabletWriter final : public AsyncResultWriter { RuntimeProfile::Counter* _add_batch_number = nullptr; RuntimeProfile::Counter* _num_node_channels = nullptr; - // load mem limit is for remote load channel - int64_t _load_mem_limit = -1; - // the timeout of load channels opened by this tablet sink. in second int64_t _load_channel_timeout_s = 0; diff --git a/be/test/util/byte_buffer2_test.cpp b/be/test/util/byte_buffer2_test.cpp index 04b62cd5fe8f0e..73c38c9e404340 100644 --- a/be/test/util/byte_buffer2_test.cpp +++ b/be/test/util/byte_buffer2_test.cpp @@ -32,7 +32,8 @@ class ByteBufferTest : public testing::Test { }; TEST_F(ByteBufferTest, normal) { - auto buf = ByteBuffer::allocate(4); + ByteBufferPtr buf; + Status st = ByteBuffer::allocate(4, &buf); EXPECT_EQ(0, buf->pos); EXPECT_EQ(4, buf->limit); EXPECT_EQ(4, buf->capacity); diff --git a/bin/run-fs-benchmark.sh b/bin/run-fs-benchmark.sh index 3076a96876116b..f4edd4117d01e8 100755 --- a/bin/run-fs-benchmark.sh +++ b/bin/run-fs-benchmark.sh @@ -189,7 +189,7 @@ else fi ## set asan and ubsan env to generate core file -export ASAN_OPTIONS=symbolize=1:abort_on_error=1:disable_coredump=0:unmap_shadow_on_exit=1:detect_container_overflow=0 +export ASAN_OPTIONS=symbolize=1:abort_on_error=1:disable_coredump=0:unmap_shadow_on_exit=1:detect_container_overflow=0:check_malloc_usable_size=0 export UBSAN_OPTIONS=print_stacktrace=1 ## set TCMALLOC_HEAP_LIMIT_MB to limit memory used by tcmalloc diff --git a/bin/start_be.sh b/bin/start_be.sh index d9800e1b047fcb..396e75a5d484c4 100755 --- a/bin/start_be.sh +++ b/bin/start_be.sh @@ -267,7 +267,8 @@ fi export AWS_MAX_ATTEMPTS=2 ## set asan and ubsan env to generate core file -export ASAN_OPTIONS=symbolize=1:abort_on_error=1:disable_coredump=0:unmap_shadow_on_exit=1:detect_container_overflow=0 +## detect_container_overflow=0, https://github.com/google/sanitizers/issues/193 +export ASAN_OPTIONS=symbolize=1:abort_on_error=1:disable_coredump=0:unmap_shadow_on_exit=1:detect_container_overflow=0:check_malloc_usable_size=0 export UBSAN_OPTIONS=print_stacktrace=1 ## set TCMALLOC_HEAP_LIMIT_MB to limit memory used by tcmalloc diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java index d60c17233d7a08..14cd742be45cd8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java @@ -152,14 +152,6 @@ private void executeOnce() throws Exception { curCoordinator.setExecMemoryLimit(execMemLimit); curCoordinator.setExecPipEngine(Config.enable_pipeline_load); - /* - * For broker load job, user only need to set mem limit by 'exec_mem_limit' property. - * And the variable 'load_mem_limit' does not make any effect. - * However, in order to ensure the consistency of semantics when executing on the BE side, - * and to prevent subsequent modification from incorrectly setting the load_mem_limit, - * here we use exec_mem_limit to directly override the load_mem_limit property. - */ - curCoordinator.setLoadMemLimit(execMemLimit); curCoordinator.setMemTableOnSinkNode(enableMemTableOnSinkNode); long leftTimeMs = getLeftTimeMs(); diff --git a/run-be-ut.sh b/run-be-ut.sh index 346d5cd1ecb10e..f9fcf9e9d53eed 100755 --- a/run-be-ut.sh +++ b/run-be-ut.sh @@ -405,7 +405,8 @@ export ORC_EXAMPLE_DIR="${DORIS_HOME}/be/src/apache-orc/examples" # set asan and ubsan env to generate core file export DORIS_HOME="${DORIS_TEST_BINARY_DIR}/" -export ASAN_OPTIONS=symbolize=1:abort_on_error=1:disable_coredump=0:unmap_shadow_on_exit=1:detect_container_overflow=0 +## detect_container_overflow=0, https://github.com/google/sanitizers/issues/193 +export ASAN_OPTIONS=symbolize=1:abort_on_error=1:disable_coredump=0:unmap_shadow_on_exit=1:detect_container_overflow=0:check_malloc_usable_size=0 export UBSAN_OPTIONS=print_stacktrace=1 export JAVA_OPTS="-Xmx1024m -DlogPath=${DORIS_HOME}/log/jni.log -Xloggc:${DORIS_HOME}/log/be.gc.log.${CUR_DATE} -Dsun.java.command=DorisBE -XX:-CriticalJNINatives -DJDBC_MIN_POOL=1 -DJDBC_MAX_POOL=100 -DJDBC_MAX_IDLE_TIME=300000"