From 684b2e69490ab2f2c2e3032e8783f3e51adaaf04 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Mon, 9 Sep 2024 19:42:53 +0800 Subject: [PATCH 01/13] pick exception handle logic to 2.1 --- be/src/http/action/http_stream.cpp | 23 ++++++++++++----- be/src/http/action/stream_load.cpp | 11 ++++++-- be/src/io/file_factory.cpp | 9 ++++--- be/src/io/fs/stream_load_pipe.cpp | 7 ++++-- be/src/olap/memtable.cpp | 7 +++++- be/src/olap/memtable.h | 3 +++ be/src/olap/rowset/segment_creator.cpp | 2 +- be/src/olap/rowset/segment_creator.h | 6 ++++- .../rowset/segment_v2/binary_dict_page.cpp | 14 ++++++----- .../olap/rowset/segment_v2/binary_dict_page.h | 2 +- .../rowset/segment_v2/binary_plain_page.h | 25 +++++++++++-------- .../rowset/segment_v2/binary_prefix_page.cpp | 23 +++++++++-------- .../rowset/segment_v2/binary_prefix_page.h | 2 +- .../olap/rowset/segment_v2/bitshuffle_page.h | 5 ++-- .../olap/rowset/segment_v2/column_writer.cpp | 10 +++++--- .../segment_v2/frame_of_reference_page.h | 5 ++-- .../segment_v2/indexed_column_writer.cpp | 3 ++- be/src/olap/rowset/segment_v2/page_builder.h | 3 ++- be/src/olap/rowset/segment_v2/page_io.cpp | 4 +-- be/src/olap/rowset/segment_v2/page_io.h | 11 +++++++- be/src/olap/rowset/segment_v2/plain_page.h | 18 +++++++------ be/src/olap/rowset/segment_v2/rle_page.h | 5 ++-- be/src/runtime/exec_env.h | 4 +++ be/src/runtime/exec_env_init.cpp | 2 ++ be/src/runtime/runtime_state.cpp | 9 ------- be/src/runtime/runtime_state.h | 4 --- .../runtime/stream_load/stream_load_context.h | 15 +++++++++-- .../stream_load/stream_load_executor.cpp | 4 +++ be/src/util/byte_buffer.h | 20 +++++++++------ be/src/vec/common/schema_util.cpp | 12 +++------ be/src/vec/sink/vtablet_block_convertor.cpp | 11 ++++---- be/src/vec/sink/vtablet_block_convertor.h | 13 +++++++++- be/src/vec/sink/writer/vtablet_writer.cpp | 2 -- be/src/vec/sink/writer/vtablet_writer.h | 3 --- be/test/util/byte_buffer2_test.cpp | 3 ++- .../doris/load/loadv2/LoadLoadingTask.java | 8 ------ 36 files changed, 187 insertions(+), 121 deletions(-) diff --git a/be/src/http/action/http_stream.cpp b/be/src/http/action/http_stream.cpp index 7dd85653002e37..83ce0ce82cc638 100644 --- a/be/src/http/action/http_stream.cpp +++ b/be/src/http/action/http_stream.cpp @@ -234,26 +234,37 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) { struct evhttp_request* ev_req = req->get_evhttp_request(); auto evbuf = evhttp_request_get_input_buffer(ev_req); + SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker()); + int64_t start_read_data_time = MonotonicNanos(); + Status st = ctx->allocate_schema_buffer(); + if (!st.ok()) { + ctx->status = st; + return; + } while (evbuffer_get_length(evbuf) > 0) { - auto bb = ByteBuffer::allocate(128 * 1024); + ByteBufferPtr bb; + st = ByteBuffer::allocate(128 * 1024, &bb); + if (!st.ok()) { + ctx->status = st; + return; + } auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity); bb->pos = remove_bytes; bb->flip(); - auto st = ctx->body_sink->append(bb); + st = ctx->body_sink->append(bb); // schema_buffer stores 1M of data for parsing column information // need to determine whether to cache for the first time if (ctx->is_read_schema) { - if (ctx->schema_buffer->pos + remove_bytes < config::stream_tvf_buffer_size) { - ctx->schema_buffer->put_bytes(bb->ptr, remove_bytes); + if (ctx->schema_buffer()->pos + remove_bytes < config::stream_tvf_buffer_size) { + ctx->schema_buffer()->put_bytes(bb->ptr, remove_bytes); } else { LOG(INFO) << "use a portion of data to request fe to obtain column information"; ctx->is_read_schema = false; ctx->status = process_put(req, ctx); } } - - if (!st.ok() && !ctx->status.ok()) { + if (!st.ok()) { LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief(); ctx->status = st; return; diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 3f32655cf14027..2036043b4d40e4 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -339,13 +339,20 @@ void StreamLoadAction::on_chunk_data(HttpRequest* req) { struct evhttp_request* ev_req = req->get_evhttp_request(); auto evbuf = evhttp_request_get_input_buffer(ev_req); + SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker()); + int64_t start_read_data_time = MonotonicNanos(); while (evbuffer_get_length(evbuf) > 0) { - auto bb = ByteBuffer::allocate(128 * 1024); + ByteBufferPtr bb; + Status st = ByteBuffer::allocate(128 * 1024, &bb); + if (!st.ok()) { + ctx->status = st; + return; + } auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity); bb->pos = remove_bytes; bb->flip(); - auto st = ctx->body_sink->append(bb); + st = ctx->body_sink->append(bb); if (!st.ok()) { LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief(); ctx->status = st; diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp index 4d6158f8f7e96d..95d537320883e8 100644 --- a/be/src/io/file_factory.cpp +++ b/be/src/io/file_factory.cpp @@ -161,13 +161,14 @@ Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderS if (!stream_load_ctx) { return Status::InternalError("unknown stream load id: {}", UniqueId(load_id).to_string()); } - if (need_schema == true) { + if (need_schema) { + RETURN_IF_ERROR(stream_load_ctx->allocate_schema_buffer()); // Here, a portion of the data is processed to parse column information auto pipe = std::make_shared( io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */, - stream_load_ctx->schema_buffer->pos /* total_length */); - stream_load_ctx->schema_buffer->flip(); - RETURN_IF_ERROR(pipe->append(stream_load_ctx->schema_buffer)); + stream_load_ctx->schema_buffer()->pos /* total_length */); + stream_load_ctx->schema_buffer()->flip(); + RETURN_IF_ERROR(pipe->append(stream_load_ctx->schema_buffer())); RETURN_IF_ERROR(pipe->finish()); *file_reader = std::move(pipe); } else { diff --git a/be/src/io/fs/stream_load_pipe.cpp b/be/src/io/fs/stream_load_pipe.cpp index ecce306bdf1ad2..392125e6fc0b86 100644 --- a/be/src/io/fs/stream_load_pipe.cpp +++ b/be/src/io/fs/stream_load_pipe.cpp @@ -111,7 +111,9 @@ Status StreamLoadPipe::read_one_message(std::unique_ptr* data, size_t } Status StreamLoadPipe::append_and_flush(const char* data, size_t size, size_t proto_byte_size) { - ByteBufferPtr buf = ByteBuffer::allocate(BitUtil::RoundUpToPowerOfTwo(size + 1)); + SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker()); + ByteBufferPtr buf; + RETURN_IF_ERROR(ByteBuffer::allocate(128 * 1024, &buf)); buf->put_bytes(data, size); buf->flip(); return _append(buf, proto_byte_size); @@ -145,7 +147,8 @@ Status StreamLoadPipe::append(const char* data, size_t size) { // need to allocate a new chunk, min chunk is 64k size_t chunk_size = std::max(_min_chunk_size, size - pos); chunk_size = BitUtil::RoundUpToPowerOfTwo(chunk_size); - _write_buf = ByteBuffer::allocate(chunk_size); + SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker()); + RETURN_IF_ERROR(ByteBuffer::allocate(chunk_size, &_write_buf)); _write_buf->put_bytes(data + pos, size - pos); return Status::OK(); } diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 923849162db331..a4df4b8f6742aa 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -505,7 +505,7 @@ bool MemTable::need_agg() const { return false; } -Status MemTable::to_block(std::unique_ptr* res) { +Status MemTable::_to_block(std::unique_ptr* res) { size_t same_keys_num = _sort(); if (_keys_type == KeysType::DUP_KEYS || same_keys_num == 0) { if (_keys_type == KeysType::DUP_KEYS && _tablet_schema->num_key_columns() == 0) { @@ -529,4 +529,9 @@ Status MemTable::to_block(std::unique_ptr* res) { return Status::OK(); } +Status MemTable::to_block(std::unique_ptr* res) { + RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_to_block(res)); + return Status::OK(); +} + } // namespace doris diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index 916067ba1193d2..70f7a9f22a0aa8 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -205,6 +205,9 @@ class MemTable { void _aggregate_two_row_in_block(vectorized::MutableBlock& mutable_block, RowInBlock* new_row, RowInBlock* row_in_skiplist); + // Used to wrapped by to_block to do exception handle logic + Status _to_block(std::unique_ptr* res); + private: int64_t _tablet_id; bool _enable_unique_key_mow = false; diff --git a/be/src/olap/rowset/segment_creator.cpp b/be/src/olap/rowset/segment_creator.cpp index 641b32535561cc..bf10ff3f1ed880 100644 --- a/be/src/olap/rowset/segment_creator.cpp +++ b/be/src/olap/rowset/segment_creator.cpp @@ -85,7 +85,7 @@ Status SegmentFlusher::flush_single_block(const vectorized::Block* block, int32_ return Status::OK(); } -Status SegmentFlusher::_parse_variant_columns(vectorized::Block& block) { +Status SegmentFlusher::_internal_parse_variant_columns(vectorized::Block& block) { size_t num_rows = block.rows(); if (num_rows == 0) { return Status::OK(); diff --git a/be/src/olap/rowset/segment_creator.h b/be/src/olap/rowset/segment_creator.h index 93508e9629ddbb..7fa69b2c57c718 100644 --- a/be/src/olap/rowset/segment_creator.h +++ b/be/src/olap/rowset/segment_creator.h @@ -138,7 +138,11 @@ class SegmentFlusher { bool need_buffering(); private: - Status _parse_variant_columns(vectorized::Block& block); + // This method will catch exception when allocate memory failed + Status _parse_variant_columns(vectorized::Block& block) { + RETURN_IF_CATCH_EXCEPTION({ return _internal_parse_variant_columns(block); }); + } + Status _internal_parse_variant_columns(vectorized::Block& block); Status _add_rows(std::unique_ptr& segment_writer, const vectorized::Block* block, size_t row_offset, size_t row_num); Status _add_rows(std::unique_ptr& segment_writer, diff --git a/be/src/olap/rowset/segment_v2/binary_dict_page.cpp b/be/src/olap/rowset/segment_v2/binary_dict_page.cpp index 52795f0338a79f..8270adfbde8bf1 100644 --- a/be/src/olap/rowset/segment_v2/binary_dict_page.cpp +++ b/be/src/olap/rowset/segment_v2/binary_dict_page.cpp @@ -142,7 +142,7 @@ Status BinaryDictPageBuilder::add(const uint8_t* vals, size_t* count) { } } -OwnedSlice BinaryDictPageBuilder::finish() { +Status BinaryDictPageBuilder::finish(OwnedSlice* slice) { if (VLOG_DEBUG_IS_ON && _encoding_type == DICT_ENCODING) { VLOG_DEBUG << "dict page size:" << _dict_builder->size(); } @@ -150,11 +150,14 @@ OwnedSlice BinaryDictPageBuilder::finish() { DCHECK(!_finished); _finished = true; - OwnedSlice data_slice = _data_page_builder->finish(); + OwnedSlice data_slice; + RETURN_IF_ERROR(_data_page_builder->finish(&data_slice)); // TODO(gaodayue) separate page header and content to avoid this copy - _buffer.append(data_slice.slice().data, data_slice.slice().size); + RETURN_IF_CATCH_EXCEPTION( + { _buffer.append(data_slice.slice().data, data_slice.slice().size); }); encode_fixed32_le(&_buffer[0], _encoding_type); - return _buffer.build(); + *slice = _buffer.build(); + return Status::OK(); } Status BinaryDictPageBuilder::reset() { @@ -183,8 +186,7 @@ uint64_t BinaryDictPageBuilder::size() const { } Status BinaryDictPageBuilder::get_dictionary_page(OwnedSlice* dictionary_page) { - *dictionary_page = _dict_builder->finish(); - return Status::OK(); + return _dict_builder->finish(dictionary_page); } Status BinaryDictPageBuilder::get_first_value(void* value) const { diff --git a/be/src/olap/rowset/segment_v2/binary_dict_page.h b/be/src/olap/rowset/segment_v2/binary_dict_page.h index 2a8467e7def516..d069eb9f7edc98 100644 --- a/be/src/olap/rowset/segment_v2/binary_dict_page.h +++ b/be/src/olap/rowset/segment_v2/binary_dict_page.h @@ -68,7 +68,7 @@ class BinaryDictPageBuilder : public PageBuilderHelper { Status add(const uint8_t* vals, size_t* count) override; - OwnedSlice finish() override; + Status finish(OwnedSlice* slice) override; Status reset() override; diff --git a/be/src/olap/rowset/segment_v2/binary_plain_page.h b/be/src/olap/rowset/segment_v2/binary_plain_page.h index 3fe76c5d3aee84..69d79fbcc5b3e3 100644 --- a/be/src/olap/rowset/segment_v2/binary_plain_page.h +++ b/be/src/olap/rowset/segment_v2/binary_plain_page.h @@ -93,19 +93,22 @@ class BinaryPlainPageBuilder : public PageBuilderHelper 0) { - _copy_value_at(0, &_first_value); - _copy_value_at(_offsets.size() - 1, &_last_value); - } - return _buffer.build(); + RETURN_IF_CATCH_EXCEPTION({ + // Set up trailer + for (uint32_t _offset : _offsets) { + put_fixed32_le(&_buffer, _offset); + } + put_fixed32_le(&_buffer, _offsets.size()); + if (_offsets.size() > 0) { + _copy_value_at(0, &_first_value); + _copy_value_at(_offsets.size() - 1, &_last_value); + } + *slice = _buffer.build(); + }); + return Status::OK(); } Status reset() override { diff --git a/be/src/olap/rowset/segment_v2/binary_prefix_page.cpp b/be/src/olap/rowset/segment_v2/binary_prefix_page.cpp index 9d1ecdb9470778..34eb14951aeb32 100644 --- a/be/src/olap/rowset/segment_v2/binary_prefix_page.cpp +++ b/be/src/olap/rowset/segment_v2/binary_prefix_page.cpp @@ -88,18 +88,21 @@ Status BinaryPrefixPageBuilder::add(const uint8_t* vals, size_t* add_count) { return Status::OK(); } -OwnedSlice BinaryPrefixPageBuilder::finish() { +Status BinaryPrefixPageBuilder::finish(OwnedSlice* slice) { DCHECK(!_finished); _finished = true; - put_fixed32_le(&_buffer, (uint32_t)_count); - uint8_t restart_point_internal = RESTART_POINT_INTERVAL; - _buffer.append(&restart_point_internal, 1); - auto restart_point_size = _restart_points_offset.size(); - for (uint32_t i = 0; i < restart_point_size; ++i) { - put_fixed32_le(&_buffer, _restart_points_offset[i]); - } - put_fixed32_le(&_buffer, restart_point_size); - return _buffer.build(); + RETURN_IF_CATCH_EXCEPTION({ + put_fixed32_le(&_buffer, (uint32_t)_count); + uint8_t restart_point_internal = RESTART_POINT_INTERVAL; + _buffer.append(&restart_point_internal, 1); + auto restart_point_size = _restart_points_offset.size(); + for (uint32_t i = 0; i < restart_point_size; ++i) { + put_fixed32_le(&_buffer, _restart_points_offset[i]); + } + put_fixed32_le(&_buffer, restart_point_size); + *slice = _buffer.build(); + }); + return Status::OK(); } const uint8_t* BinaryPrefixPageDecoder::_decode_value_lengths(const uint8_t* ptr, uint32_t* shared, diff --git a/be/src/olap/rowset/segment_v2/binary_prefix_page.h b/be/src/olap/rowset/segment_v2/binary_prefix_page.h index de4ec60070bad6..41deb4e6c1fe1a 100644 --- a/be/src/olap/rowset/segment_v2/binary_prefix_page.h +++ b/be/src/olap/rowset/segment_v2/binary_prefix_page.h @@ -52,7 +52,7 @@ class BinaryPrefixPageBuilder : public PageBuilderHelper 0) { _first_value = cell(0); _last_value = cell(_count - 1); } - return _finish(SIZE_OF_TYPE); + RETURN_IF_CATCH_EXCEPTION({ *slice = _finish(SIZE_OF_TYPE); }); + return Status::OK(); } Status reset() override { diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp b/be/src/olap/rowset/segment_v2/column_writer.cpp index e463b883fd206d..bdbfcdc2d41621 100644 --- a/be/src/olap/rowset/segment_v2/column_writer.cpp +++ b/be/src/olap/rowset/segment_v2/column_writer.cpp @@ -70,9 +70,10 @@ class NullBitmapBuilder { // Returns whether the building nullmap contains nullptr bool has_null() const { return _has_null; } - OwnedSlice finish() { + Status finish(OwnedSlice* slice) { _rle_encoder.Flush(); - return _bitmap_buf.build(); + RETURN_IF_CATCH_EXCEPTION({ *slice = _bitmap_buf.build(); }); + return Status::OK(); } void reset() { @@ -723,14 +724,15 @@ Status ScalarColumnWriter::finish_current_page() { // build data page body : encoded values + [nullmap] std::vector body; - OwnedSlice encoded_values = _page_builder->finish(); + OwnedSlice encoded_values; + RETURN_IF_ERROR(_page_builder->finish(&encoded_values)); RETURN_IF_ERROR(_page_builder->reset()); body.push_back(encoded_values.slice()); OwnedSlice nullmap; if (_null_bitmap_builder != nullptr) { if (is_nullable() && _null_bitmap_builder->has_null()) { - nullmap = _null_bitmap_builder->finish(); + RETURN_IF_ERROR(_null_bitmap_builder->finish(&nullmap)); body.push_back(nullmap.slice()); } _null_bitmap_builder->reset(); diff --git a/be/src/olap/rowset/segment_v2/frame_of_reference_page.h b/be/src/olap/rowset/segment_v2/frame_of_reference_page.h index 4477912803b3bb..5aedf126b55ce2 100644 --- a/be/src/olap/rowset/segment_v2/frame_of_reference_page.h +++ b/be/src/olap/rowset/segment_v2/frame_of_reference_page.h @@ -54,11 +54,12 @@ class FrameOfReferencePageBuilder : public PageBuilderHelperflush(); - return _buf.build(); + RETURN_IF_CATCH_EXCEPTION({ *slice = _buf.build(); }); + return Status::OK(); } Status reset() override { diff --git a/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp b/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp index e1b238084a9ce5..51606d818899ec 100644 --- a/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp +++ b/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp @@ -117,7 +117,8 @@ Status IndexedColumnWriter::_finish_current_data_page(size_t& num_val) { ordinal_t first_ordinal = _num_values - num_values_in_page; // IndexedColumn doesn't have NULLs, thus data page body only contains encoded values - OwnedSlice page_body = _data_page_builder->finish(); + OwnedSlice page_body; + RETURN_IF_ERROR(_data_page_builder->finish(&page_body)); RETURN_IF_ERROR(_data_page_builder->reset()); PageFooterPB footer; diff --git a/be/src/olap/rowset/segment_v2/page_builder.h b/be/src/olap/rowset/segment_v2/page_builder.h index 61fa2eaf8e1d66..7e24c56796cbb5 100644 --- a/be/src/olap/rowset/segment_v2/page_builder.h +++ b/be/src/olap/rowset/segment_v2/page_builder.h @@ -63,7 +63,8 @@ class PageBuilder { // Finish building the current page, return the encoded data. // This api should be followed by reset() before reusing the builder - virtual OwnedSlice finish() = 0; + // It will return error status when memory allocated failed during finish + virtual Status finish(OwnedSlice* owned_slice) = 0; // Get the dictionary page for dictionary encoding mode column. virtual Status get_dictionary_page(OwnedSlice* dictionary_page) { diff --git a/be/src/olap/rowset/segment_v2/page_io.cpp b/be/src/olap/rowset/segment_v2/page_io.cpp index cea4a23f742178..07d5656ee8a44b 100644 --- a/be/src/olap/rowset/segment_v2/page_io.cpp +++ b/be/src/olap/rowset/segment_v2/page_io.cpp @@ -111,8 +111,8 @@ Status PageIO::write_page(io::FileWriter* writer, const std::vector& body return Status::OK(); } -Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle* handle, - Slice* body, PageFooterPB* footer) { +Status PageIO::read_and_decompress_page_(const PageReadOptions& opts, PageHandle* handle, + Slice* body, PageFooterPB* footer) { opts.sanity_check(); opts.stats->total_pages_num++; diff --git a/be/src/olap/rowset/segment_v2/page_io.h b/be/src/olap/rowset/segment_v2/page_io.h index 31c81880dac650..889dae6d34efe6 100644 --- a/be/src/olap/rowset/segment_v2/page_io.h +++ b/be/src/olap/rowset/segment_v2/page_io.h @@ -123,8 +123,17 @@ class PageIO { // `handle' holds the memory of page data, // `body' points to page body, // `footer' stores the page footer. + // This method is exception safe, it will failed when allocate memory failed. static Status read_and_decompress_page(const PageReadOptions& opts, PageHandle* handle, - Slice* body, PageFooterPB* footer); + Slice* body, PageFooterPB* footer) { + RETURN_IF_CATCH_EXCEPTION( + { return read_and_decompress_page_(opts, handle, body, footer); }); + } + +private: + // An internal method that not deal with exception. + static Status read_and_decompress_page_(const PageReadOptions& opts, PageHandle* handle, + Slice* body, PageFooterPB* footer); }; } // namespace segment_v2 diff --git a/be/src/olap/rowset/segment_v2/plain_page.h b/be/src/olap/rowset/segment_v2/plain_page.h index af31275002ad3c..28b1e96d206fbb 100644 --- a/be/src/olap/rowset/segment_v2/plain_page.h +++ b/be/src/olap/rowset/segment_v2/plain_page.h @@ -59,14 +59,18 @@ class PlainPageBuilder : public PageBuilderHelper > { return Status::OK(); } - OwnedSlice finish() override { + Status finish(OwnedSlice* slice) override { encode_fixed32_le((uint8_t*)&_buffer[0], _count); - if (_count > 0) { - _first_value.assign_copy(&_buffer[PLAIN_PAGE_HEADER_SIZE], SIZE_OF_TYPE); - _last_value.assign_copy(&_buffer[PLAIN_PAGE_HEADER_SIZE + (_count - 1) * SIZE_OF_TYPE], - SIZE_OF_TYPE); - } - return _buffer.build(); + RETURN_IF_CATCH_EXCEPTION({ + if (_count > 0) { + _first_value.assign_copy(&_buffer[PLAIN_PAGE_HEADER_SIZE], SIZE_OF_TYPE); + _last_value.assign_copy( + &_buffer[PLAIN_PAGE_HEADER_SIZE + (_count - 1) * SIZE_OF_TYPE], + SIZE_OF_TYPE); + } + *slice = _buffer.build(); + }); + return Status::OK(); } Status reset() override { diff --git a/be/src/olap/rowset/segment_v2/rle_page.h b/be/src/olap/rowset/segment_v2/rle_page.h index 40ec587743c1a2..d1974f18d39ddc 100644 --- a/be/src/olap/rowset/segment_v2/rle_page.h +++ b/be/src/olap/rowset/segment_v2/rle_page.h @@ -94,14 +94,15 @@ class RlePageBuilder : public PageBuilderHelper > { return Status::OK(); } - OwnedSlice finish() override { + Status finish(OwnedSlice* slice) override { DCHECK(!_finished); _finished = true; // here should Flush first and then encode the count header // or it will lead to a bug if the header is less than 8 byte and the data is small _rle_encoder->Flush(); encode_fixed32_le(&_buf[0], _count); - return _buf.build(); + *slice = _buf.build(); + return Status::OK(); } Status reset() override { diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 4b2478ccf99b4b..83929cd999feeb 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -185,6 +185,9 @@ class ExecEnv { std::shared_ptr segcompaction_mem_tracker() { return _segcompaction_mem_tracker; } + std::shared_ptr stream_load_pipe_tracker() { + return _stream_load_pipe_tracker; + } std::shared_ptr rowid_storage_reader_tracker() { return _rowid_storage_reader_tracker; } @@ -362,6 +365,7 @@ class ExecEnv { std::shared_ptr _brpc_iobuf_block_memory_tracker; // Count the memory consumption of segment compaction tasks. std::shared_ptr _segcompaction_mem_tracker; + std::shared_ptr _stream_load_pipe_tracker; // TODO, looking forward to more accurate tracking. std::shared_ptr _rowid_storage_reader_tracker; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index bbd6bbc9447fbf..bce801a7a3dd52 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -535,6 +535,8 @@ void ExecEnv::init_mem_tracker() { MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "SubcolumnsTree"); _s3_file_buffer_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "S3FileBuffer"); + _stream_load_pipe_tracker = + MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "StreamLoadPipe"); } void ExecEnv::_register_metrics() { diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 2713ee441dd0df..cdb5a65a977147 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -463,15 +463,6 @@ Status RuntimeState::append_error_msg_to_file(std::function line, return Status::OK(); } -int64_t RuntimeState::get_load_mem_limit() { - // TODO: the code is abandoned, it can be deleted after v1.3 - if (_query_options.__isset.load_mem_limit && _query_options.load_mem_limit > 0) { - return _query_options.load_mem_limit; - } else { - return _query_mem_tracker->limit(); - } -} - void RuntimeState::resize_op_id_to_local_state(int operator_size) { _op_id_to_local_state.resize(-operator_size); } diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 8b8cbd85f0f117..8243faa37aeec5 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -466,10 +466,6 @@ class RuntimeState { std::vector& error_tablet_infos() { return _error_tablet_infos; } - // get mem limit for load channel - // if load mem limit is not set, or is zero, using query mem limit instead. - int64_t get_load_mem_limit(); - // local runtime filter mgr, the runtime filter do not have remote target or // not need local merge should regist here. the instance exec finish, the local // runtime filter mgr can release the memory of local runtime filter diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index 1dc7ccf73ba18b..2ccf8ce5014a88 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -37,6 +37,7 @@ #include "common/utils.h" #include "runtime/exec_env.h" #include "runtime/stream_load/stream_load_executor.h" +#include "runtime/thread_context.h" #include "util/byte_buffer.h" #include "util/time.h" #include "util/uid_util.h" @@ -118,6 +119,17 @@ class StreamLoadContext { // also print the load source info if detail is set to true std::string brief(bool detail = false) const; + Status allocate_schema_buffer() { + if (_schema_buffer == nullptr) { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->stream_load_pipe_tracker()); + return ByteBuffer::allocate(config::stream_tvf_buffer_size, &_schema_buffer); + } + return Status::OK(); + } + + ByteBufferPtr schema_buffer() { return _schema_buffer; } + public: static const int default_txn_id = -1; // load type, eg: ROUTINE LOAD/MANUAL LOAD @@ -182,8 +194,6 @@ class StreamLoadContext { std::shared_ptr body_sink; std::shared_ptr pipe; - ByteBufferPtr schema_buffer = ByteBuffer::allocate(config::stream_tvf_buffer_size); - TStreamLoadPutResult put_result; TStreamLoadMultiTablePutResult multi_table_put_result; @@ -241,6 +251,7 @@ class StreamLoadContext { private: ExecEnv* _exec_env = nullptr; + ByteBufferPtr _schema_buffer; }; } // namespace doris diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 0761b445bee084..0616c6474aad32 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -142,6 +142,10 @@ Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr; -struct ByteBuffer { - static ByteBufferPtr allocate(size_t size) { - ByteBufferPtr ptr(new ByteBuffer(size)); - return ptr; +struct ByteBuffer : private Allocator { + static Status allocate(const size_t size, ByteBufferPtr* ptr) { + RETURN_IF_CATCH_EXCEPTION({ *ptr = ByteBufferPtr(new ByteBuffer(size)); }); + return Status::OK(); } - ~ByteBuffer() { delete[] ptr; } + ~ByteBuffer() { Allocator::free(ptr, capacity); } void put_bytes(const char* data, size_t size) { memcpy(ptr + pos, data, size); @@ -56,14 +59,15 @@ struct ByteBuffer { size_t remaining() const { return limit - pos; } bool has_remaining() const { return limit > pos; } - char* const ptr; + char* ptr; size_t pos; size_t limit; size_t capacity; private: - ByteBuffer(size_t capacity_) - : ptr(new char[capacity_]), pos(0), limit(capacity_), capacity(capacity_) {} + ByteBuffer(size_t capacity_) : pos(0), limit(capacity_), capacity(capacity_) { + ptr = reinterpret_cast(Allocator::alloc(capacity_)); + } }; } // namespace doris diff --git a/be/src/vec/common/schema_util.cpp b/be/src/vec/common/schema_util.cpp index bb7b64992dee27..61b98bafd48ef6 100644 --- a/be/src/vec/common/schema_util.cpp +++ b/be/src/vec/common/schema_util.cpp @@ -545,15 +545,9 @@ Status _parse_variant_columns(Block& block, const std::vector& variant_pos, Status parse_variant_columns(Block& block, const std::vector& variant_pos, const ParseContext& ctx) { - try { - // Parse each variant column from raw string column - RETURN_IF_ERROR(vectorized::schema_util::_parse_variant_columns(block, variant_pos, ctx)); - } catch (const doris::Exception& e) { - // TODO more graceful, max_filter_ratio - LOG(WARNING) << "encounter execption " << e.to_string(); - return Status::InternalError(e.to_string()); - } - return Status::OK(); + // Parse each variant column from raw string column + RETURN_IF_CATCH_EXCEPTION( + { return vectorized::schema_util::_parse_variant_columns(block, variant_pos, ctx); }); } void finalize_variant_columns(Block& block, const std::vector& variant_pos, diff --git a/be/src/vec/sink/vtablet_block_convertor.cpp b/be/src/vec/sink/vtablet_block_convertor.cpp index 96de68f597677c..086c9a3ddd04d9 100644 --- a/be/src/vec/sink/vtablet_block_convertor.cpp +++ b/be/src/vec/sink/vtablet_block_convertor.cpp @@ -182,12 +182,11 @@ DecimalType OlapTableBlockConvertor::_get_decimalv3_min_or_max(const TypeDescrip return DecimalType(value); } -Status OlapTableBlockConvertor::_validate_column(RuntimeState* state, const TypeDescriptor& type, - bool is_nullable, vectorized::ColumnPtr column, - size_t slot_index, bool* stop_processing, - fmt::memory_buffer& error_prefix, - const uint32_t row_count, - vectorized::IColumn::Permutation* rows) { +Status OlapTableBlockConvertor::_internal_validate_column( + RuntimeState* state, const TypeDescriptor& type, bool is_nullable, + vectorized::ColumnPtr column, size_t slot_index, bool* stop_processing, + fmt::memory_buffer& error_prefix, const uint32_t row_count, + vectorized::IColumn::Permutation* rows) { DCHECK((rows == nullptr) || (rows->size() == row_count)); fmt::memory_buffer error_msg; auto set_invalid_and_append_error_msg = [&](int row) { diff --git a/be/src/vec/sink/vtablet_block_convertor.h b/be/src/vec/sink/vtablet_block_convertor.h index 0db340ce6c27d4..7f866c38032775 100644 --- a/be/src/vec/sink/vtablet_block_convertor.h +++ b/be/src/vec/sink/vtablet_block_convertor.h @@ -69,7 +69,18 @@ class OlapTableBlockConvertor { Status _validate_column(RuntimeState* state, const TypeDescriptor& type, bool is_nullable, vectorized::ColumnPtr column, size_t slot_index, bool* stop_processing, fmt::memory_buffer& error_prefix, const uint32_t row_count, - vectorized::IColumn::Permutation* rows = nullptr); + vectorized::IColumn::Permutation* rows = nullptr) { + RETURN_IF_CATCH_EXCEPTION({ + return _internal_validate_column(state, type, is_nullable, column, slot_index, + stop_processing, error_prefix, row_count, rows); + }); + } + + Status _internal_validate_column(RuntimeState* state, const TypeDescriptor& type, + bool is_nullable, vectorized::ColumnPtr column, + size_t slot_index, bool* stop_processing, + fmt::memory_buffer& error_prefix, const uint32_t row_count, + vectorized::IColumn::Permutation* rows = nullptr); // make input data valid for OLAP table // return number of invalid/filtered rows. diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index 1e6b8f7b8687b6..d1651bb3a92419 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -413,7 +413,6 @@ void VNodeChannel::_open_internal(bool is_incremental) { request->set_num_senders(_parent->_num_senders); request->set_need_gen_rollup(false); // Useless but it is a required field in pb - request->set_load_mem_limit(_parent->_load_mem_limit); request->set_load_channel_timeout_s(_parent->_load_channel_timeout_s); request->set_is_high_priority(_parent->_is_high_priority); request->set_sender_ip(BackendOptions::get_localhost()); @@ -1226,7 +1225,6 @@ Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) { _max_wait_exec_timer = ADD_TIMER(profile, "MaxWaitExecTime"); _add_batch_number = ADD_COUNTER(profile, "NumberBatchAdded", TUnit::UNIT); _num_node_channels = ADD_COUNTER(profile, "NumberNodeChannels", TUnit::UNIT); - _load_mem_limit = state->get_load_mem_limit(); #ifdef DEBUG // check: tablet ids should be unique diff --git a/be/src/vec/sink/writer/vtablet_writer.h b/be/src/vec/sink/writer/vtablet_writer.h index 603034cea6d7a5..ba986fbc6d4e0f 100644 --- a/be/src/vec/sink/writer/vtablet_writer.h +++ b/be/src/vec/sink/writer/vtablet_writer.h @@ -660,9 +660,6 @@ class VTabletWriter final : public AsyncResultWriter { RuntimeProfile::Counter* _add_batch_number = nullptr; RuntimeProfile::Counter* _num_node_channels = nullptr; - // load mem limit is for remote load channel - int64_t _load_mem_limit = -1; - // the timeout of load channels opened by this tablet sink. in second int64_t _load_channel_timeout_s = 0; diff --git a/be/test/util/byte_buffer2_test.cpp b/be/test/util/byte_buffer2_test.cpp index 04b62cd5fe8f0e..73c38c9e404340 100644 --- a/be/test/util/byte_buffer2_test.cpp +++ b/be/test/util/byte_buffer2_test.cpp @@ -32,7 +32,8 @@ class ByteBufferTest : public testing::Test { }; TEST_F(ByteBufferTest, normal) { - auto buf = ByteBuffer::allocate(4); + ByteBufferPtr buf; + Status st = ByteBuffer::allocate(4, &buf); EXPECT_EQ(0, buf->pos); EXPECT_EQ(4, buf->limit); EXPECT_EQ(4, buf->capacity); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java index d60c17233d7a08..14cd742be45cd8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java @@ -152,14 +152,6 @@ private void executeOnce() throws Exception { curCoordinator.setExecMemoryLimit(execMemLimit); curCoordinator.setExecPipEngine(Config.enable_pipeline_load); - /* - * For broker load job, user only need to set mem limit by 'exec_mem_limit' property. - * And the variable 'load_mem_limit' does not make any effect. - * However, in order to ensure the consistency of semantics when executing on the BE side, - * and to prevent subsequent modification from incorrectly setting the load_mem_limit, - * here we use exec_mem_limit to directly override the load_mem_limit property. - */ - curCoordinator.setLoadMemLimit(execMemLimit); curCoordinator.setMemTableOnSinkNode(enableMemTableOnSinkNode); long leftTimeMs = getLeftTimeMs(); From d43e873882678cb3a784b67e5b79a8d232260b1d Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Wed, 28 Aug 2024 01:12:55 +0800 Subject: [PATCH 02/13] [fix](memory) Fix Allocator release memory to correct MemTracker after TLS attach task ends (#39908) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Allocator save TLS MemTracker during first alloc, which is used to release memory after TLS attach task ends. ``` 23:00:15  F20240824 22:56:49.773799 66432 thread_context.h:238] Check failed: doris::k_doris_exit || !doris::config::enable_memory_orphan_check || thread_mem_tracker()->label() != "Orphan" If you crash here, it means that SCOPED_ATTACH_TASK and SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER are not used correctly. starting position of each thread is expected to use SCOPED_ATTACH_TASK to bind a MemTrackerLimiter belonging to Query/Load/Compaction/Other Tasks, otherwise memory alloc using Doris Allocator in the thread will crash. If you want to switch MemTrackerLimiter during thread execution, please use SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER, do not repeat Attach. Of course, you can modify enable_memory_orphan_check=false in be.conf to avoid this crash. 23:00:15  *** Check failure stack trace: *** 23:00:15  @ 0x55645d3388e6 google::LogMessageFatal::~LogMessageFatal() 23:00:15  @ 0x5564439637c2 doris::ThreadContext::consume_memory() 23:00:15  @ 0x5564439914fe Allocator<>::release_memory() 23:00:15  @ 0x5564354be11e std::_Sp_counted_ptr<>::_M_dispose() 23:00:15  @ 0x55643557bc3b std::deque<>::pop_front() 23:00:15  @ 0x5564355756b1 doris::io::StreamLoadPipe::~StreamLoadPipe() 23:00:15  @ 0x5564354bfa77 doris::StreamLoadContext::~StreamLoadContext() 23:00:15  @ 0x556436ee5114 doris::HttpRequest::~HttpRequest() ``` --- be/src/common/config.cpp | 4 +++ be/src/common/config.h | 3 ++ be/src/olap/page_cache.cpp | 10 +++--- be/src/olap/page_cache.h | 1 - .../runtime/stream_load/stream_load_context.h | 9 +++++ be/src/runtime/thread_context.h | 28 ++++++++-------- be/src/service/internal_service.cpp | 5 +-- be/src/util/byte_buffer.h | 6 +++- be/src/vec/common/allocator.cpp | 33 +++++++++++++++++-- be/src/vec/common/allocator.h | 8 ++++- 10 files changed, 80 insertions(+), 27 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 61c0ecdea2f823..16a1ff075eac02 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -137,6 +137,10 @@ DEFINE_mInt64(stacktrace_in_alloc_large_memory_bytes, "2147483648"); DEFINE_mInt64(crash_in_alloc_large_memory_bytes, "-1"); +// default is true. if any memory tracking in Orphan mem tracker will report error. +// !! not modify the default value of this conf!! otherwise memory errors cannot be detected in time. +// allocator free memory not need to check, because when the thread memory tracker label is Orphan, +// use the tracker saved in Allocator. DEFINE_mBool(enable_memory_orphan_check, "true"); // The maximum time a thread waits for full GC. Currently only query will wait for full gc. diff --git a/be/src/common/config.h b/be/src/common/config.h index 4b39be5f4c48fd..91b2a72e9081e8 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -191,6 +191,9 @@ DECLARE_mInt64(stacktrace_in_alloc_large_memory_bytes); DECLARE_mInt64(crash_in_alloc_large_memory_bytes); // default is true. if any memory tracking in Orphan mem tracker will report error. +// !! not modify the default value of this conf!! otherwise memory errors cannot be detected in time. +// allocator free memory not need to check, because when the thread memory tracker label is Orphan, +// use the tracker saved in Allocator. DECLARE_mBool(enable_memory_orphan_check); // The maximum time a thread waits for a full GC. Currently only query will wait for full gc. diff --git a/be/src/olap/page_cache.cpp b/be/src/olap/page_cache.cpp index 1f0556f4642110..b70dadc5b431ea 100644 --- a/be/src/olap/page_cache.cpp +++ b/be/src/olap/page_cache.cpp @@ -28,12 +28,10 @@ template PageBase::PageBase(size_t b, bool use_cache, segment_v2::PageTypePB page_type) : LRUCacheValueBase(), _size(b), _capacity(b) { if (use_cache) { - _mem_tracker_by_allocator = StoragePageCache::instance()->mem_tracker(page_type); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + StoragePageCache::instance()->mem_tracker(page_type)); + _data = reinterpret_cast(TAllocator::alloc(_capacity, ALLOCATOR_ALIGNMENT_16)); } else { - _mem_tracker_by_allocator = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); - } - { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker_by_allocator); _data = reinterpret_cast(TAllocator::alloc(_capacity, ALLOCATOR_ALIGNMENT_16)); } } @@ -42,7 +40,7 @@ template PageBase::~PageBase() { if (_data != nullptr) { DCHECK(_capacity != 0 && _size != 0); - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker_by_allocator); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(TAllocator::mem_tracker_); TAllocator::free(_data, _capacity); } } diff --git a/be/src/olap/page_cache.h b/be/src/olap/page_cache.h index 09fc689959ce4c..ef25de7bc30c63 100644 --- a/be/src/olap/page_cache.h +++ b/be/src/olap/page_cache.h @@ -60,7 +60,6 @@ class PageBase : private TAllocator, public LRUCacheValueBase { // Effective size, smaller than capacity, such as data page remove checksum suffix. size_t _size = 0; size_t _capacity = 0; - std::shared_ptr _mem_tracker_by_allocator; }; using DataPage = PageBase>; diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index 2ccf8ce5014a88..918bc5924d3ca5 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -130,6 +130,15 @@ class StreamLoadContext { ByteBufferPtr schema_buffer() { return _schema_buffer; } + ByteBufferPtr schema_buffer() { + if (_schema_buffer == nullptr) { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->stream_load_pipe_tracker()); + _schema_buffer = ByteBuffer::allocate(config::stream_tvf_buffer_size); + } + return _schema_buffer; + } + public: static const int default_txn_id = -1; // load type, eg: ROUTINE LOAD/MANUAL LOAD diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index d1aede848ab6bd..7b305a5313dbc4 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -50,7 +50,7 @@ // Used after SCOPED_ATTACH_TASK, in order to count the memory into another // MemTrackerLimiter instead of the MemTrackerLimiter added by the attach task. #define SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(arg1) \ - auto VARNAME_LINENUM(switch_mem_tracker) = SwitchThreadMemTrackerLimiter(arg1) + auto VARNAME_LINENUM(switch_mem_tracker) = doris::SwitchThreadMemTrackerLimiter(arg1) // Looking forward to tracking memory during thread execution into MemTracker. // Usually used to record query more detailed memory, including ExecNode operators. @@ -167,8 +167,7 @@ static std::string memory_orphan_check_msg = "each thread is expected to use SCOPED_ATTACH_TASK to bind a MemTrackerLimiter belonging " "to Query/Load/Compaction/Other Tasks, otherwise memory alloc using Doris Allocator in the " "thread will crash. If you want to switch MemTrackerLimiter during thread execution, " - "please use SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER, do not repeat Attach. Of course, you " - "can modify enable_memory_orphan_check=false in be.conf to avoid this crash."; + "please use SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER, do not repeat Attach."; // The thread context saves some info about a working thread. // 2 required info: @@ -219,9 +218,9 @@ class ThreadContext { ss << std::this_thread::get_id(); return ss.str(); } - // After thread_mem_tracker_mgr is initialized, the current thread Hook starts to - // consume/release mem_tracker. - // Note that the use of shared_ptr will cause a crash. The guess is that there is an + // Note that if set global Memory Hook, After thread_mem_tracker_mgr is initialized, + // the current thread Hook starts to consume/release mem_tracker. + // the use of shared_ptr will cause a crash. The guess is that there is an // intermediate state during the copy construction of shared_ptr. Shared_ptr is not equal // to nullptr, but the object it points to is not initialized. At this time, when the memory // is released somewhere, the hook is triggered to cause the crash. @@ -315,7 +314,7 @@ class ThreadLocalHandle { // The brpc server should respond as quickly as possible. bthread_context->thread_mem_tracker_mgr->disable_wait_gc(); // set the data so that next time bthread_getspecific in the thread returns the data. - CHECK(0 == bthread_setspecific(btls_key, bthread_context) || k_doris_exit); + CHECK(0 == bthread_setspecific(btls_key, bthread_context) || doris::k_doris_exit); } DCHECK(bthread_context != nullptr); bthread_context->thread_local_handle_count++; @@ -357,7 +356,7 @@ static ThreadContext* thread_context(bool allow_return_null = false) { // in bthread // bthread switching pthread may be very frequent, remember not to use lock or other time-consuming operations. auto* bthread_context = static_cast(bthread_getspecific(btls_key)); - DCHECK(bthread_context != nullptr); + DCHECK(bthread_context != nullptr && bthread_context->thread_local_handle_count > 0); return bthread_context; } if (allow_return_null) { @@ -443,15 +442,16 @@ class AttachTask { class SwitchThreadMemTrackerLimiter { public: - explicit SwitchThreadMemTrackerLimiter(const std::shared_ptr& mem_tracker) { + explicit SwitchThreadMemTrackerLimiter( + const std::shared_ptr& mem_tracker) { DCHECK(mem_tracker); - ThreadLocalHandle::create_thread_local_if_not_exits(); + doris::ThreadLocalHandle::create_thread_local_if_not_exits(); _old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker); } - explicit SwitchThreadMemTrackerLimiter(const QueryThreadContext& query_thread_context) { - ThreadLocalHandle::create_thread_local_if_not_exits(); + explicit SwitchThreadMemTrackerLimiter(const doris::QueryThreadContext& query_thread_context) { + doris::ThreadLocalHandle::create_thread_local_if_not_exits(); DCHECK(thread_context()->task_id() == query_thread_context.query_id); // workload group alse not change DCHECK(query_thread_context.query_mem_tracker); @@ -462,11 +462,11 @@ class SwitchThreadMemTrackerLimiter { ~SwitchThreadMemTrackerLimiter() { thread_context()->thread_mem_tracker_mgr->detach_limiter_tracker(_old_mem_tracker); - ThreadLocalHandle::del_thread_local_if_count_is_zero(); + doris::ThreadLocalHandle::del_thread_local_if_count_is_zero(); } private: - std::shared_ptr _old_mem_tracker; + std::shared_ptr _old_mem_tracker; }; class AddThreadMemTrackerConsumer { diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 0801f30fb2e265..099236153a396e 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -784,8 +784,9 @@ void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c const TFileScanRangeParams& params = file_scan_range.params; std::shared_ptr mem_tracker = MemTrackerLimiter::create_shared( - MemTrackerLimiter::Type::SCHEMA_CHANGE, - fmt::format("{}#{}", params.format_type, params.file_type)); + MemTrackerLimiter::Type::OTHER, + fmt::format("InternalService::fetch_table_schema:{}#{}", params.format_type, + params.file_type)); SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker); // make sure profile is desctructed after reader cause PrefetchBufferedReader diff --git a/be/src/util/byte_buffer.h b/be/src/util/byte_buffer.h index 6bb19dd2a88f2d..aafd4506087d76 100644 --- a/be/src/util/byte_buffer.h +++ b/be/src/util/byte_buffer.h @@ -24,6 +24,7 @@ #include "common/logging.h" #include "common/status.h" +#include "runtime/thread_context.h" #include "vec/common/allocator.h" #include "vec/common/allocator_fwd.h" @@ -38,7 +39,10 @@ struct ByteBuffer : private Allocator { return Status::OK(); } - ~ByteBuffer() { Allocator::free(ptr, capacity); } + ~ByteBuffer() { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker_); + Allocator::free(ptr, capacity); + } void put_bytes(const char* data, size_t size) { memcpy(ptr + pos, data, size); diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp index 34d71d8df3e16c..73f9165f571713 100644 --- a/be/src/vec/common/allocator.cpp +++ b/be/src/vec/common/allocator.cpp @@ -189,14 +189,43 @@ void Allocator::memory_ template void Allocator::consume_memory( - size_t size) const { + size_t size) { + // Usually, an object that inherits Allocator has the same TLS tracker for each alloc. + // If an object that inherits Allocator needs to be reused by multiple queries, + // it is necessary to switch the same tracker to TLS when calling alloc. + // However, in ORC Reader, ORC DataBuffer will be reused, but we cannot switch TLS tracker, + // so we update the Allocator tracker when the TLS tracker changes. + // note that the tracker in thread context when object that inherit Allocator is constructed may be + // no attach memory tracker in tls. usually the memory tracker is attached in tls only during the first alloc. + if (mem_tracker_ == nullptr || + mem_tracker_->label() != doris::thread_context()->thread_mem_tracker()->label()) { + mem_tracker_ = doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); + } CONSUME_THREAD_MEM_TRACKER(size); } template void Allocator::release_memory( size_t size) const { - RELEASE_THREAD_MEM_TRACKER(size); + doris::ThreadContext* thread_context = doris::thread_context(true); + if ((thread_context && thread_context->thread_mem_tracker()->label() != "Orphan") || + mem_tracker_ == nullptr) { + // If thread_context exist and the label of thread_mem_tracker not equal to `Orphan`, + // this means that in the scope of SCOPED_ATTACH_TASK, + // so thread_mem_tracker should be used to release memory. + // If mem_tracker_ is nullptr there is a scenario where an object that inherits Allocator + // has never called alloc, but free memory. + // in phmap, the memory alloced by an object may be transferred to another object and then free. + // in this case, thread context must attach a memory tracker other than Orphan, + // otherwise memory tracking will be wrong. + RELEASE_THREAD_MEM_TRACKER(size); + } else { + // if thread_context does not exist or the label of thread_mem_tracker is equal to + // `Orphan`, it usually happens during object destruction. This means that + // the scope of SCOPED_ATTACH_TASK has been left, so release memory using Allocator tracker. + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker_); + RELEASE_THREAD_MEM_TRACKER(size); + } } template diff --git a/be/src/vec/common/allocator.h b/be/src/vec/common/allocator.h index 83cb6eddb7ddec..df4dd7852b07d8 100644 --- a/be/src/vec/common/allocator.h +++ b/be/src/vec/common/allocator.h @@ -87,6 +87,10 @@ static constexpr size_t MALLOC_MIN_ALIGNMENT = 8; // is always a multiple of sixteen. (https://www.gnu.org/software/libc/manual/html_node/Aligned-Memory-Blocks.html) static constexpr int ALLOCATOR_ALIGNMENT_16 = 16; +namespace doris { +class MemTrackerLimiter; +} + class DefaultMemoryAllocator { public: static void* malloc(size_t size) __THROW { return std::malloc(size); } @@ -228,7 +232,7 @@ class Allocator { // alloc will continue to execute, so the consume memtracker is forced. void memory_check(size_t size) const; // Increases consumption of this tracker by 'bytes'. - void consume_memory(size_t size) const; + void consume_memory(size_t size); void release_memory(size_t size) const; void throw_bad_alloc(const std::string& err) const; @@ -374,6 +378,8 @@ class Allocator { static constexpr bool clear_memory = clear_memory_; + std::shared_ptr mem_tracker_ {nullptr}; + // Freshly mmapped pages are copy-on-write references to a global zero page. // On the first write, a page fault occurs, and an actual writable page is // allocated. If we are going to use this memory soon, such as when resizing From c765804a06030739f68b3076df4682b3f6d4c66a Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Wed, 28 Aug 2024 22:05:11 +0800 Subject: [PATCH 03/13] [fix](memory) Fix OwnedSlice free memory (#40043) ``` I20240828 11:41:58.529152 20190 mem_tracker_limiter.cpp:193] [Address Sanitizer] memory buf not exist, mem tracker label: Load#Id=ac42f13dd1430d2c-bd60f5a4829d0792, consumption: 13054587, peak consumption: 13054587, buf: 0, size: 0, stack_trace: 0# doris::OwnedSlice::~OwnedSlice() 1# doris::segment_v2::ScalarColumnWriter::finish_current_page() 2# doris::segment_v2::ScalarColumnWriter::finish() 3# doris::segment_v2::VerticalSegmentWriter::write_batch() 4# doris::SegmentFlusher::_add_rows(std::unique_ptr >&, doris::vectorized::Block const*, unsigned long, unsigned long) 5# doris::SegmentFlusher::flush_single_block(doris::vectorized::Block const*, int, long*) 6# doris::SegmentCreator::flush_single_block(doris::vectorized::Block const*, int, long*) 7# doris::BaseBetaRowsetWriter::flush_memtable(doris::vectorized::Block*, int, long*) 8# doris::FlushToken::_do_flush_memtable(doris::MemTable*, int, long*) 9# doris::FlushToken::_flush_memtable(std::unique_ptr >, int, long) 10# doris::MemtableFlushTask::run() 11# doris::ThreadPool::dispatch_thread() 12# doris::Thread::supervise_thread(void*) 13# ? 14# clone ``` ``` I20240828 11:41:58.613629 20183 mem_tracker_limiter.cpp:182] [Address Sanitizer] free memory buf size inaccurate, mem tracker label: Load#Id=433657e8b3834e94-ac178bb7ab8ff661, consumption: 3239536, peak consumption: 6385184, buf: 0x6030015390a0, size: 32, old buf: 0x6030015390a0, old size: 20, new stack_trace: 0# doris::OwnedSlice::~OwnedSlice() 1# doris::segment_v2::IndexedColumnWriter::_finish_current_data_page(unsigned long&) 2# doris::segment_v2::IndexedColumnWriter::finish(doris::segment_v2::IndexedColumnMetaPB*) 3# doris::PrimaryKeyIndexBuilder::finalize(doris::segment_v2::PrimaryKeyIndexMetaPB*) 4# doris::segment_v2::VerticalSegmentWriter::_write_primary_key_index() 5# doris::segment_v2::VerticalSegmentWriter::finalize_columns_index(unsigned long*) 6# doris::segment_v2::VerticalSegmentWriter::finalize(unsigned long*, unsigned long*) 7# doris::SegmentFlusher::_flush_segment_writer(std::unique_ptr >&, std::shared_ptr, long*) 8# doris::SegmentFlusher::flush_single_block(doris::vectorized::Block const*, int, long*) 9# doris::SegmentCreator::flush_single_block(doris::vectorized::Block const*, int, long*) 10# doris::BetaRowsetWriterV2::flush_memtable(doris::vectorized::Block*, int, long*) 11# doris::FlushToken::_do_flush_memtable(doris::MemTable*, int, long*) 12# doris::FlushToken::_flush_memtable(std::unique_ptr >, int, long) 13# doris::MemtableFlushTask::run() 14# doris::ThreadPool::dispatch_thread() 15# doris::Thread::supervise_thread(void*) 16# ? 17# clone , old stack_trace: 0# Allocator::alloc_impl(unsigned long, unsigned long) 1# doris::faststring::build() 2# doris::segment_v2::BinaryPrefixPageBuilder::finish(doris::OwnedSlice*) 3# doris::segment_v2::IndexedColumnWriter::_finish_current_data_page(unsigned long&) 4# doris::segment_v2::IndexedColumnWriter::finish(doris::segment_v2::IndexedColumnMetaPB*) 5# doris::PrimaryKeyIndexBuilder::finalize(doris::segment_v2::PrimaryKeyIndexMetaPB*) 6# doris::segment_v2::VerticalSegmentWriter::_write_primary_key_index() 7# doris::segment_v2::VerticalSegmentWriter::finalize_columns_index(unsigned long*) 8# doris::segment_v2::VerticalSegmentWriter::finalize(unsigned long*, unsigned long*) 9# doris::SegmentFlusher::_flush_segment_writer(std::unique_ptr >&, std::shared_ptr, long*) 10# doris::SegmentFlusher::flush_single_block(doris::vectorized::Block const*, int, long*) 11# doris::SegmentCreator::flush_single_block(doris::vectorized::Block const*, int, long*) 12# doris::BetaRowsetWriterV2::flush_memtable(doris::vectorized::Block*, int, long*) 13# doris::FlushToken::_do_flush_memtable(doris::MemTable*, int, long*) 14# doris::FlushToken::_flush_memtable(std::unique_ptr >, int, long) 15# doris::MemtableFlushTask::run() 16# doris::ThreadPool::dispatch_thread() 17# doris::Thread::supervise_thread(void*) 18# ? 19# clon ``` --- be/src/util/faststring.h | 3 ++- be/src/util/slice.h | 11 ++++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/be/src/util/faststring.h b/be/src/util/faststring.h index 9308a4d20bbb1c..eae7db536252ca 100644 --- a/be/src/util/faststring.h +++ b/be/src/util/faststring.h @@ -85,7 +85,8 @@ class faststring : private Allocator(Allocator::alloc(len_)); + ret = reinterpret_cast(Allocator::alloc(capacity_)); + DCHECK(len_ <= capacity_); memcpy(ret, data_, len_); } OwnedSlice result(ret, len_, capacity_); diff --git a/be/src/util/slice.h b/be/src/util/slice.h index bae33d4ee75010..a81593dc913906 100644 --- a/be/src/util/slice.h +++ b/be/src/util/slice.h @@ -358,7 +358,16 @@ class OwnedSlice : private Allocator Date: Thu, 18 Apr 2024 13:21:03 +0800 Subject: [PATCH 04/13] [enhancement](memory) Allocator support address sanitizers (#33396) If DEBUG build type, record size of each memory alloc and free, print no free size or no alloc size when query MemTracker is destructed, if necessary, record stack trace. add global PointQueryExecutor memory tracker in ExecEnv, because memory may be shared between PointQueryExecutors of different pointer queries, but memory will not be shared between PointQueryExecutor and Fragment of the same pointer query. If DEBUG build type, if query memory tracker not equal to 0 when query ends, BE will crash. --- be/src/common/config.cpp | 1 + be/src/common/config.h | 1 + be/src/runtime/exec_env.h | 3 + be/src/runtime/exec_env_init.cpp | 2 + be/src/runtime/memory/mem_tracker_limiter.cpp | 82 +++++++++++++++++-- be/src/runtime/memory/mem_tracker_limiter.h | 16 ++++ be/src/runtime/thread_context.h | 2 +- be/src/service/point_query_executor.cpp | 11 ++- be/src/service/point_query_executor.h | 1 - be/src/util/block_compression.cpp | 7 ++ be/src/vec/common/allocator.cpp | 24 ++++++ be/src/vec/common/allocator.h | 25 ++++++ be/src/vec/common/pod_array_fwd.h | 7 +- bin/run-fs-benchmark.sh | 2 +- bin/start_be.sh | 3 +- run-be-ut.sh | 3 +- 16 files changed, 172 insertions(+), 18 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 16a1ff075eac02..021d7a9144b277 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1101,6 +1101,7 @@ DEFINE_mString(kerberos_krb5_conf_path, "/etc/krb5.conf"); DEFINE_mString(get_stack_trace_tool, "libunwind"); DEFINE_mString(dwarf_location_info_mode, "FAST"); +DEFINE_mBool(enable_address_sanitizers_with_stack_trace, "false"); // the ratio of _prefetch_size/_batch_size in AutoIncIDBuffer DEFINE_mInt64(auto_inc_prefetch_size_ratio, "10"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 91b2a72e9081e8..b1600ed5af38c0 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1152,6 +1152,7 @@ DECLARE_mString(kerberos_krb5_conf_path); // Values include `none`, `glog`, `boost`, `glibc`, `libunwind` DECLARE_mString(get_stack_trace_tool); +DECLARE_mBool(enable_address_sanitizers_with_stack_trace); // DISABLED: Don't resolve location info. // FAST: Perform CU lookup using .debug_aranges (might be incomplete). diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 83929cd999feeb..ca8e6936ebd45c 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -187,6 +187,8 @@ class ExecEnv { } std::shared_ptr stream_load_pipe_tracker() { return _stream_load_pipe_tracker; + std::shared_ptr point_query_executor_mem_tracker() { + return _point_query_executor_mem_tracker; } std::shared_ptr rowid_storage_reader_tracker() { return _rowid_storage_reader_tracker; @@ -366,6 +368,7 @@ class ExecEnv { // Count the memory consumption of segment compaction tasks. std::shared_ptr _segcompaction_mem_tracker; std::shared_ptr _stream_load_pipe_tracker; + std::shared_ptr _point_query_executor_mem_tracker; // TODO, looking forward to more accurate tracking. std::shared_ptr _rowid_storage_reader_tracker; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index bce801a7a3dd52..0688bb64289aef 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -529,6 +529,8 @@ void ExecEnv::init_mem_tracker() { std::make_shared("IOBufBlockMemory", _details_mem_tracker_set.get()); _segcompaction_mem_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "SegCompaction"); + _point_query_executor_mem_tracker = + MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "PointQueryExecutor"); _rowid_storage_reader_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "RowIdStorageReader"); _subcolumns_tree_tracker = diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 494645d56b5533..60454031e33ddf 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -39,6 +39,7 @@ #include "util/perf_counters.h" #include "util/pretty_printer.h" #include "util/runtime_profile.h" +#include "util/stack_util.h" namespace doris { @@ -110,7 +111,7 @@ std::shared_ptr MemTrackerLimiter::create_shared(MemTrackerLi MemTrackerLimiter::~MemTrackerLimiter() { consume(_untracked_mem); static std::string mem_tracker_inaccurate_msg = - ", mem tracker not equal to 0 when mem tracker destruct, this usually means that " + "mem tracker not equal to 0 when mem tracker destruct, this usually means that " "memory tracking is inaccurate and SCOPED_ATTACH_TASK and " "SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER are not used correctly. " "1. For query and load, memory leaks may have occurred, it is expected that the query " @@ -126,19 +127,90 @@ MemTrackerLimiter::~MemTrackerLimiter() { if (_consumption->current_value() != 0) { // TODO, expect mem tracker equal to 0 at the task end. if (doris::config::enable_memory_orphan_check && _type == Type::QUERY) { - LOG(INFO) << "mem tracker label: " << _label - << ", consumption: " << _consumption->current_value() - << ", peak consumption: " << _consumption->peak_value() - << mem_tracker_inaccurate_msg; + std::string err_msg = + fmt::format("mem tracker label: {}, consumption: {}, peak consumption: {}, {}.", + label(), _consumption->current_value(), _consumption->peak_value(), + mem_tracker_inaccurate_msg); +#ifdef NDEBUG + LOG(INFO) << err_msg; +#else + LOG(FATAL) << err_msg << print_address_sanitizers(); +#endif } if (ExecEnv::tracking_memory()) { ExecEnv::GetInstance()->orphan_mem_tracker()->consume(_consumption->current_value()); } _consumption->set(0); +#ifndef NDEBUG + } else if (!_address_sanitizers.empty()) { + LOG(FATAL) << "[Address Sanitizer] consumption is 0, but address sanitizers not empty. " + << ", mem tracker label: " << _label + << ", peak consumption: " << _consumption->peak_value() + << print_address_sanitizers(); +#endif } memory_memtrackerlimiter_cnt << -1; } +#ifndef NDEBUG +void MemTrackerLimiter::add_address_sanitizers(void* buf, size_t size) { + if (_type == Type::QUERY) { + std::lock_guard l(_address_sanitizers_mtx); + auto it = _address_sanitizers.find(buf); + if (it != _address_sanitizers.end()) { + LOG(FATAL) << "[Address Sanitizer] memory buf repeat add, mem tracker label: " << _label + << ", consumption: " << _consumption->current_value() + << ", peak consumption: " << _consumption->peak_value() << ", buf: " << buf + << ", size: " << size << ", old buf: " << it->first + << ", old size: " << it->second.size + << ", new stack_trace: " << get_stack_trace() + << ", old stack_trace: " << it->second.stack_trace; + } + + // if alignment not equal to 0, maybe usable_size > size. + AddressSanitizer as = {size, doris::config::enable_address_sanitizers_with_stack_trace + ? get_stack_trace() + : ""}; + _address_sanitizers.emplace(buf, as); + } +} + +void MemTrackerLimiter::remove_address_sanitizers(void* buf, size_t size) { + if (_type == Type::QUERY) { + std::lock_guard l(_address_sanitizers_mtx); + auto it = _address_sanitizers.find(buf); + if (it != _address_sanitizers.end()) { + if (it->second.size != size) { + LOG(FATAL) << "[Address Sanitizer] free memory buf size inaccurate, mem tracker " + "label: " + << _label << ", consumption: " << _consumption->current_value() + << ", peak consumption: " << _consumption->peak_value() + << ", buf: " << buf << ", size: " << size << ", old buf: " << it->first + << ", old size: " << it->second.size + << ", new stack_trace: " << get_stack_trace() + << ", old stack_trace: " << it->second.stack_trace; + } + _address_sanitizers.erase(buf); + } else { + LOG(FATAL) << "[Address Sanitizer] memory buf not exist, mem tracker label: " << _label + << ", consumption: " << _consumption->current_value() + << ", peak consumption: " << _consumption->peak_value() << ", buf: " << buf + << ", size: " << size << ", stack_trace: " << get_stack_trace(); + } + } +} + +std::string MemTrackerLimiter::print_address_sanitizers() { + std::lock_guard l(_address_sanitizers_mtx); + std::string detail = "[Address Sanitizer]:"; + for (const auto& it : _address_sanitizers) { + detail += fmt::format("\n {}, size {}, strack trace: {}", it.first, it.second.size, + it.second.stack_trace); + } + return detail; +} +#endif + MemTracker::Snapshot MemTrackerLimiter::make_snapshot() const { Snapshot snapshot; snapshot.type = type_string(_type); diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 67c40e1f6c52b9..e5c5cb1bc0369c 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -205,6 +205,12 @@ class MemTrackerLimiter final : public MemTracker { // Log the memory usage when memory limit is exceeded. std::string tracker_limit_exceeded_str(); +#ifndef NDEBUG + void add_address_sanitizers(void* buf, size_t size); + void remove_address_sanitizers(void* buf, size_t size); + std::string print_address_sanitizers(); +#endif + std::string debug_string() override { std::stringstream msg; msg << "limit: " << _limit << "; " @@ -245,6 +251,16 @@ class MemTrackerLimiter final : public MemTracker { // Avoid frequent printing. bool _enable_print_log_usage = false; static std::atomic _enable_print_log_process_usage; + +#ifndef NDEBUG + struct AddressSanitizer { + size_t size; + std::string stack_trace; + }; + + std::mutex _address_sanitizers_mtx; + std::unordered_map _address_sanitizers; +#endif }; inline int64_t MemTrackerLimiter::add_untracked_mem(int64_t bytes) { diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 7b305a5313dbc4..fa2f177b9dffc9 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -549,7 +549,7 @@ class ScopeSkipMemoryCheck { // must call create_thread_local_if_not_exits() before use thread_context(). #define CONSUME_THREAD_MEM_TRACKER(size) \ do { \ - if (doris::use_mem_hook || size == 0) { \ + if (size == 0 || doris::use_mem_hook) { \ break; \ } \ if (doris::pthread_context_ptr_init) { \ diff --git a/be/src/service/point_query_executor.cpp b/be/src/service/point_query_executor.cpp index c89bc52115a786..4e9295ed53dcee 100644 --- a/be/src/service/point_query_executor.cpp +++ b/be/src/service/point_query_executor.cpp @@ -37,6 +37,7 @@ #include "olap/tablet_schema.h" #include "runtime/exec_env.h" #include "runtime/runtime_state.h" +#include "runtime/thread_context.h" #include "util/key_util.h" #include "util/runtime_profile.h" #include "util/simd/bits.h" @@ -166,7 +167,8 @@ void RowCache::erase(const RowCacheKey& key) { } PointQueryExecutor::~PointQueryExecutor() { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->point_query_executor_mem_tracker()); _tablet.reset(); _reusable.reset(); _result_block.reset(); @@ -180,10 +182,7 @@ Status PointQueryExecutor::init(const PTabletKeyLookupRequest* request, // using cache __int128_t uuid = static_cast<__int128_t>(request->uuid().uuid_high()) << 64 | request->uuid().uuid_low(); - _mem_tracker = MemTrackerLimiter::create_shared( - MemTrackerLimiter::Type::QUERY, - fmt::format("PointQueryExecutor:{}#{}", uuid, request->tablet_id())); - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker); + SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->point_query_executor_mem_tracker()); auto cache_handle = LookupConnectionCache::instance()->get(uuid); _binary_row_format = request->is_binary_row(); _tablet = StorageEngine::instance()->tablet_manager()->get_tablet(request->tablet_id()); @@ -234,7 +233,7 @@ Status PointQueryExecutor::init(const PTabletKeyLookupRequest* request, } Status PointQueryExecutor::lookup_up() { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker); + SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->point_query_executor_mem_tracker()); RETURN_IF_ERROR(_lookup_row_key()); RETURN_IF_ERROR(_lookup_row_data()); RETURN_IF_ERROR(_output_data()); diff --git a/be/src/service/point_query_executor.h b/be/src/service/point_query_executor.h index 565a585d322afe..e168ef16ad798a 100644 --- a/be/src/service/point_query_executor.h +++ b/be/src/service/point_query_executor.h @@ -323,7 +323,6 @@ class PointQueryExecutor { std::vector _row_read_ctxs; std::shared_ptr _reusable; std::unique_ptr _result_block; - std::shared_ptr _mem_tracker; Metrics _profile_metrics; bool _binary_row_format = false; }; diff --git a/be/src/util/block_compression.cpp b/be/src/util/block_compression.cpp index 10f975451d3d1e..bd7ba224586d6c 100644 --- a/be/src/util/block_compression.cpp +++ b/be/src/util/block_compression.cpp @@ -53,6 +53,7 @@ #include "gutil/endian.h" #include "gutil/strings/substitute.h" #include "orc/OrcFile.hh" +#include "runtime/thread_context.h" #include "util/bit_util.h" #include "util/defer_op.h" #include "util/faststring.h" @@ -810,6 +811,7 @@ class ZstdBlockCompression : public BlockCompressionCodec { return &s_instance; } ~ZstdBlockCompression() { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context); _ctx_c_pool.clear(); _ctx_d_pool.clear(); } @@ -825,6 +827,7 @@ class ZstdBlockCompression : public BlockCompressionCodec { // https://github.com/facebook/zstd/blob/dev/examples/streaming_compression.c Status compress(const std::vector& inputs, size_t uncompressed_size, faststring* output) override { + _query_thread_context.init(); std::unique_ptr context; RETURN_IF_ERROR(_acquire_compression_ctx(context)); bool compress_failed = false; @@ -909,6 +912,7 @@ class ZstdBlockCompression : public BlockCompressionCodec { } Status decompress(const Slice& input, Slice* output) override { + _query_thread_context.init(); std::unique_ptr context; bool decompress_failed = false; RETURN_IF_ERROR(_acquire_decompression_ctx(context)); @@ -993,6 +997,9 @@ class ZstdBlockCompression : public BlockCompressionCodec { mutable std::mutex _ctx_d_mutex; mutable std::vector> _ctx_d_pool; + mutable std::vector _ctx_d_pool; + + QueryThreadContext _query_thread_context; }; class GzipBlockCompression : public ZlibBlockCompression { diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp index 73f9165f571713..42712c05e28c47 100644 --- a/be/src/vec/common/allocator.cpp +++ b/be/src/vec/common/allocator.cpp @@ -239,6 +239,30 @@ void Allocator::throw_b throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err); } +#ifndef NDEBUG +template +void Allocator::add_address_sanitizers(void* buf, + size_t size) const { +#ifdef BE_TEST + if (!doris::ExecEnv::ready()) { + return; + } +#endif + doris::thread_context()->thread_mem_tracker()->add_address_sanitizers(buf, size); +} + +template +void Allocator::remove_address_sanitizers( + void* buf, size_t size) const { +#ifdef BE_TEST + if (!doris::ExecEnv::ready()) { + return; + } +#endif + doris::thread_context()->thread_mem_tracker()->remove_address_sanitizers(buf, size); +} +#endif + template void* Allocator::alloc(size_t size, size_t alignment) { diff --git a/be/src/vec/common/allocator.h b/be/src/vec/common/allocator.h index df4dd7852b07d8..f0a490c5da29d9 100644 --- a/be/src/vec/common/allocator.h +++ b/be/src/vec/common/allocator.h @@ -235,6 +235,10 @@ class Allocator { void consume_memory(size_t size); void release_memory(size_t size) const; void throw_bad_alloc(const std::string& err) const; +#ifndef NDEBUG + void add_address_sanitizers(void* buf, size_t size) const; + void remove_address_sanitizers(void* buf, size_t size) const; +#endif void* alloc(size_t size, size_t alignment = 0); void* realloc(void* buf, size_t old_size, size_t new_size, size_t alignment = 0); @@ -242,6 +246,7 @@ class Allocator { /// Allocate memory range. void* alloc_impl(size_t size, size_t alignment = 0) { memory_check(size); + // consume memory in tracker before alloc, similar to early declaration. consume_memory(size); void* buf; size_t record_size = size; @@ -277,6 +282,9 @@ class Allocator { if constexpr (MemoryAllocator::need_record_actual_size()) { record_size = MemoryAllocator::allocated_size(buf); } +#ifndef NDEBUG + add_address_sanitizers(buf, record_size); +#endif } else { buf = nullptr; int res = MemoryAllocator::posix_memalign(&buf, alignment, size); @@ -286,6 +294,9 @@ class Allocator { throw_bad_alloc( fmt::format("Cannot allocate memory (posix_memalign) {}.", size)); } +#ifndef NDEBUG + add_address_sanitizers(buf, size); +#endif if constexpr (clear_memory) memset(buf, 0, size); @@ -307,6 +318,9 @@ class Allocator { throw_bad_alloc(fmt::format("Allocator: Cannot munmap {}.", size)); } } else { +#ifndef NDEBUG + remove_address_sanitizers(buf, size); +#endif MemoryAllocator::free(buf); } release_memory(size); @@ -330,6 +344,9 @@ class Allocator { if (!use_mmap || (old_size < doris::config::mmap_threshold && new_size < doris::config::mmap_threshold && alignment <= MALLOC_MIN_ALIGNMENT)) { +#ifndef NDEBUG + remove_address_sanitizers(buf, old_size); +#endif /// Resize malloc'd memory region with no special alignment requirement. void* new_buf = MemoryAllocator::realloc(buf, new_size); if (nullptr == new_buf) { @@ -337,6 +354,10 @@ class Allocator { throw_bad_alloc(fmt::format("Allocator: Cannot realloc from {} to {}.", old_size, new_size)); } +#ifndef NDEBUG + add_address_sanitizers( + new_buf, new_size); // usually, buf addr = new_buf addr, asan maybe not equal. +#endif buf = new_buf; if constexpr (clear_memory) @@ -366,6 +387,10 @@ class Allocator { // Big allocs that requires a copy. void* new_buf = alloc(new_size, alignment); memcpy(new_buf, buf, std::min(old_size, new_size)); +#ifndef NDEBUG + add_address_sanitizers(new_buf, new_size); + remove_address_sanitizers(buf, old_size); +#endif free(buf, old_size); buf = new_buf; } diff --git a/be/src/vec/common/pod_array_fwd.h b/be/src/vec/common/pod_array_fwd.h index ff00b312575a63..e1a428eda9dafb 100644 --- a/be/src/vec/common/pod_array_fwd.h +++ b/be/src/vec/common/pod_array_fwd.h @@ -36,9 +36,12 @@ template class PODArray; -/** For columns. Padding is enough to read and write xmm-register at the address of the last element. */ +/** For columns. Padding is enough to read and write xmm-register at the address of the last element. + * TODO, pad_right is temporarily changed from 15 to 16, will waste 1 bytes, + * can rollback after fix wrong reinterpret_cast column and PODArray swap. + */ template > -using PaddedPODArray = PODArray; +using PaddedPODArray = PODArray; /** A helper for declaring PODArray that uses inline memory. * The initial size is set to use all the inline bytes, since using less would diff --git a/bin/run-fs-benchmark.sh b/bin/run-fs-benchmark.sh index 3076a96876116b..f4edd4117d01e8 100755 --- a/bin/run-fs-benchmark.sh +++ b/bin/run-fs-benchmark.sh @@ -189,7 +189,7 @@ else fi ## set asan and ubsan env to generate core file -export ASAN_OPTIONS=symbolize=1:abort_on_error=1:disable_coredump=0:unmap_shadow_on_exit=1:detect_container_overflow=0 +export ASAN_OPTIONS=symbolize=1:abort_on_error=1:disable_coredump=0:unmap_shadow_on_exit=1:detect_container_overflow=0:check_malloc_usable_size=0 export UBSAN_OPTIONS=print_stacktrace=1 ## set TCMALLOC_HEAP_LIMIT_MB to limit memory used by tcmalloc diff --git a/bin/start_be.sh b/bin/start_be.sh index d9800e1b047fcb..396e75a5d484c4 100755 --- a/bin/start_be.sh +++ b/bin/start_be.sh @@ -267,7 +267,8 @@ fi export AWS_MAX_ATTEMPTS=2 ## set asan and ubsan env to generate core file -export ASAN_OPTIONS=symbolize=1:abort_on_error=1:disable_coredump=0:unmap_shadow_on_exit=1:detect_container_overflow=0 +## detect_container_overflow=0, https://github.com/google/sanitizers/issues/193 +export ASAN_OPTIONS=symbolize=1:abort_on_error=1:disable_coredump=0:unmap_shadow_on_exit=1:detect_container_overflow=0:check_malloc_usable_size=0 export UBSAN_OPTIONS=print_stacktrace=1 ## set TCMALLOC_HEAP_LIMIT_MB to limit memory used by tcmalloc diff --git a/run-be-ut.sh b/run-be-ut.sh index 346d5cd1ecb10e..f9fcf9e9d53eed 100755 --- a/run-be-ut.sh +++ b/run-be-ut.sh @@ -405,7 +405,8 @@ export ORC_EXAMPLE_DIR="${DORIS_HOME}/be/src/apache-orc/examples" # set asan and ubsan env to generate core file export DORIS_HOME="${DORIS_TEST_BINARY_DIR}/" -export ASAN_OPTIONS=symbolize=1:abort_on_error=1:disable_coredump=0:unmap_shadow_on_exit=1:detect_container_overflow=0 +## detect_container_overflow=0, https://github.com/google/sanitizers/issues/193 +export ASAN_OPTIONS=symbolize=1:abort_on_error=1:disable_coredump=0:unmap_shadow_on_exit=1:detect_container_overflow=0:check_malloc_usable_size=0 export UBSAN_OPTIONS=print_stacktrace=1 export JAVA_OPTS="-Xmx1024m -DlogPath=${DORIS_HOME}/log/jni.log -Xloggc:${DORIS_HOME}/log/be.gc.log.${CUR_DATE} -Dsun.java.command=DorisBE -XX:-CriticalJNINatives -DJDBC_MIN_POOL=1 -DJDBC_MAX_POOL=100 -DJDBC_MAX_IDLE_TIME=300000" From 772b1c3158edeefb118d15093cbd8bec2446c095 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Sat, 20 Apr 2024 00:44:11 +0800 Subject: [PATCH 05/13] [fix](memory) Fix BlockCompression memory tracking #33841 --- be/src/runtime/exec_env.h | 6 ++++ be/src/runtime/exec_env_init.cpp | 2 ++ be/src/util/block_compression.cpp | 54 ++++++++++++++++++++++++++----- 3 files changed, 54 insertions(+), 8 deletions(-) diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index ca8e6936ebd45c..584f0d70c6688f 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -190,6 +190,9 @@ class ExecEnv { std::shared_ptr point_query_executor_mem_tracker() { return _point_query_executor_mem_tracker; } + std::shared_ptr block_compression_mem_tracker() { + return _block_compression_mem_tracker; + } std::shared_ptr rowid_storage_reader_tracker() { return _rowid_storage_reader_tracker; } @@ -368,7 +371,10 @@ class ExecEnv { // Count the memory consumption of segment compaction tasks. std::shared_ptr _segcompaction_mem_tracker; std::shared_ptr _stream_load_pipe_tracker; + + // Tracking memory may be shared between multiple queries. std::shared_ptr _point_query_executor_mem_tracker; + std::shared_ptr _block_compression_mem_tracker; // TODO, looking forward to more accurate tracking. std::shared_ptr _rowid_storage_reader_tracker; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 0688bb64289aef..5d2ab598b33538 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -531,6 +531,8 @@ void ExecEnv::init_mem_tracker() { MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "SegCompaction"); _point_query_executor_mem_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "PointQueryExecutor"); + _block_compression_mem_tracker = + MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "BlockCompression"); _rowid_storage_reader_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "RowIdStorageReader"); _subcolumns_tree_tracker = diff --git a/be/src/util/block_compression.cpp b/be/src/util/block_compression.cpp index bd7ba224586d6c..7079c7021bba20 100644 --- a/be/src/util/block_compression.cpp +++ b/be/src/util/block_compression.cpp @@ -116,9 +116,15 @@ class Lz4BlockCompression : public BlockCompressionCodec { static Lz4BlockCompression s_instance; return &s_instance; } - ~Lz4BlockCompression() { _ctx_pool.clear(); } + ~Lz4BlockCompression() { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); + _ctx_pool.clear(); + } Status compress(const Slice& input, faststring* output) override { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); if (input.size > INT_MAX) { return Status::InvalidArgument( "LZ4 not support those case(input.size>INT_MAX), maybe you should change " @@ -172,6 +178,8 @@ class Lz4BlockCompression : public BlockCompressionCodec { } Status decompress(const Slice& input, Slice* output) override { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); auto decompressed_len = LZ4_decompress_safe(input.data, output->data, input.size, output->size); if (decompressed_len < 0) { @@ -283,6 +291,8 @@ class Lz4fBlockCompression : public BlockCompressionCodec { return &s_instance; } ~Lz4fBlockCompression() { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); _ctx_c_pool.clear(); _ctx_d_pool.clear(); } @@ -309,6 +319,8 @@ class Lz4fBlockCompression : public BlockCompressionCodec { private: Status _compress(const std::vector& inputs, size_t uncompressed_size, faststring* output) { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); std::unique_ptr context; RETURN_IF_ERROR(_acquire_compression_ctx(context)); bool compress_failed = false; @@ -374,6 +386,8 @@ class Lz4fBlockCompression : public BlockCompressionCodec { } Status _decompress(const Slice& input, Slice* output) { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); bool decompress_failed = false; std::unique_ptr context; RETURN_IF_ERROR(_acquire_decompression_ctx(context)); @@ -499,9 +513,15 @@ class Lz4HCBlockCompression : public BlockCompressionCodec { static Lz4HCBlockCompression s_instance; return &s_instance; } - ~Lz4HCBlockCompression() { _ctx_pool.clear(); } + ~Lz4HCBlockCompression() { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); + _ctx_pool.clear(); + } Status compress(const Slice& input, faststring* output) override { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); std::unique_ptr context; RETURN_IF_ERROR(_acquire_compression_ctx(context)); bool compress_failed = false; @@ -546,6 +566,8 @@ class Lz4HCBlockCompression : public BlockCompressionCodec { } Status decompress(const Slice& input, Slice* output) override { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); auto decompressed_len = LZ4_decompress_safe(input.data, output->data, input.size, output->size); if (decompressed_len < 0) { @@ -665,6 +687,8 @@ class SnappyBlockCompression : public BlockCompressionCodec { ~SnappyBlockCompression() override {} Status compress(const Slice& input, faststring* output) override { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); size_t max_len = max_compressed_len(input.size); output->resize(max_len); Slice s(*output); @@ -675,6 +699,8 @@ class SnappyBlockCompression : public BlockCompressionCodec { } Status decompress(const Slice& input, Slice* output) override { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); if (!snappy::RawUncompress(input.data, input.size, output->data)) { return Status::InvalidArgument("Fail to do Snappy decompress"); } @@ -706,6 +732,8 @@ class ZlibBlockCompression : public BlockCompressionCodec { ~ZlibBlockCompression() {} Status compress(const Slice& input, faststring* output) override { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); size_t max_len = max_compressed_len(input.size); output->resize(max_len); Slice s(*output); @@ -720,6 +748,8 @@ class ZlibBlockCompression : public BlockCompressionCodec { Status compress(const std::vector& inputs, size_t uncompressed_size, faststring* output) override { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); size_t max_len = max_compressed_len(uncompressed_size); output->resize(max_len); @@ -760,6 +790,8 @@ class ZlibBlockCompression : public BlockCompressionCodec { } Status decompress(const Slice& input, Slice* output) override { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); size_t input_size = input.size; auto zres = ::uncompress2((Bytef*)output->data, &output->size, (Bytef*)input.data, &input_size); @@ -811,7 +843,8 @@ class ZstdBlockCompression : public BlockCompressionCodec { return &s_instance; } ~ZstdBlockCompression() { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); _ctx_c_pool.clear(); _ctx_d_pool.clear(); } @@ -827,7 +860,8 @@ class ZstdBlockCompression : public BlockCompressionCodec { // https://github.com/facebook/zstd/blob/dev/examples/streaming_compression.c Status compress(const std::vector& inputs, size_t uncompressed_size, faststring* output) override { - _query_thread_context.init(); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); std::unique_ptr context; RETURN_IF_ERROR(_acquire_compression_ctx(context)); bool compress_failed = false; @@ -912,7 +946,8 @@ class ZstdBlockCompression : public BlockCompressionCodec { } Status decompress(const Slice& input, Slice* output) override { - _query_thread_context.init(); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); std::unique_ptr context; bool decompress_failed = false; RETURN_IF_ERROR(_acquire_decompression_ctx(context)); @@ -997,9 +1032,6 @@ class ZstdBlockCompression : public BlockCompressionCodec { mutable std::mutex _ctx_d_mutex; mutable std::vector> _ctx_d_pool; - mutable std::vector _ctx_d_pool; - - QueryThreadContext _query_thread_context; }; class GzipBlockCompression : public ZlibBlockCompression { @@ -1011,6 +1043,8 @@ class GzipBlockCompression : public ZlibBlockCompression { ~GzipBlockCompression() override = default; Status decompress(const Slice& input, Slice* output) override { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); z_stream z_strm = {}; z_strm.zalloc = Z_NULL; z_strm.zfree = Z_NULL; @@ -1092,6 +1126,8 @@ class GzipBlockCompressionByLibdeflate final : public GzipBlockCompression { ~GzipBlockCompressionByLibdeflate() override = default; Status decompress(const Slice& input, Slice* output) override { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); if (input.empty()) { output->size = 0; return Status::OK(); @@ -1124,6 +1160,8 @@ class LzoBlockCompression final : public BlockCompressionCodec { } size_t max_compressed_len(size_t len) override { return 0; }; Status decompress(const Slice& input, Slice* output) override { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); auto* input_ptr = input.data; auto remain_input_size = input.size; auto* output_ptr = output->data; From 92766d6f7136a77ffa5bae898b40a456d4a1eaa9 Mon Sep 17 00:00:00 2001 From: yiguolei <676222867@qq.com> Date: Mon, 22 Apr 2024 00:10:03 +0800 Subject: [PATCH 06/13] [memtracker](accuracy) should not account resuable buffer to query memtracker (#33933) Co-authored-by: yiguolei --- be/src/util/block_compression.cpp | 95 +++++++++++++++++-------------- 1 file changed, 53 insertions(+), 42 deletions(-) diff --git a/be/src/util/block_compression.cpp b/be/src/util/block_compression.cpp index 7079c7021bba20..e71a890142155d 100644 --- a/be/src/util/block_compression.cpp +++ b/be/src/util/block_compression.cpp @@ -100,10 +100,16 @@ class Lz4BlockCompression : public BlockCompressionCodec { ENABLE_FACTORY_CREATOR(Context); public: - Context() : ctx(nullptr) { buffer = std::make_unique(); } + Context() : ctx(nullptr) { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); + buffer = std::make_unique(); + } LZ4_stream_t* ctx; std::unique_ptr buffer; ~Context() { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); if (ctx) { LZ4_freeStream(ctx); } @@ -123,8 +129,6 @@ class Lz4BlockCompression : public BlockCompressionCodec { } Status compress(const Slice& input, faststring* output) override { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( - ExecEnv::GetInstance()->block_compression_mem_tracker()); if (input.size > INT_MAX) { return Status::InvalidArgument( "LZ4 not support those case(input.size>INT_MAX), maybe you should change " @@ -151,7 +155,13 @@ class Lz4BlockCompression : public BlockCompressionCodec { compressed_buf.size = max_len; } else { // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE - context->buffer->resize(max_len); + { + // context->buffer is resuable between queries, should accouting to + // global tracker. + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); + context->buffer->resize(max_len); + } compressed_buf.data = reinterpret_cast(context->buffer->data()); compressed_buf.size = max_len; } @@ -178,8 +188,6 @@ class Lz4BlockCompression : public BlockCompressionCodec { } Status decompress(const Slice& input, Slice* output) override { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( - ExecEnv::GetInstance()->block_compression_mem_tracker()); auto decompressed_len = LZ4_decompress_safe(input.data, output->data, input.size, output->size); if (decompressed_len < 0) { @@ -262,10 +270,16 @@ class Lz4fBlockCompression : public BlockCompressionCodec { ENABLE_FACTORY_CREATOR(CContext); public: - CContext() : ctx(nullptr) { buffer = std::make_unique(); } + CContext() : ctx(nullptr) { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); + buffer = std::make_unique(); + } LZ4F_compressionContext_t ctx; std::unique_ptr buffer; ~CContext() { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); if (ctx) { LZ4F_freeCompressionContext(ctx); } @@ -319,8 +333,6 @@ class Lz4fBlockCompression : public BlockCompressionCodec { private: Status _compress(const std::vector& inputs, size_t uncompressed_size, faststring* output) { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( - ExecEnv::GetInstance()->block_compression_mem_tracker()); std::unique_ptr context; RETURN_IF_ERROR(_acquire_compression_ctx(context)); bool compress_failed = false; @@ -339,8 +351,12 @@ class Lz4fBlockCompression : public BlockCompressionCodec { compressed_buf.data = reinterpret_cast(output->data()); compressed_buf.size = max_len; } else { - // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE - context->buffer->resize(max_len); + { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); + // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE + context->buffer->resize(max_len); + } compressed_buf.data = reinterpret_cast(context->buffer->data()); compressed_buf.size = max_len; } @@ -386,8 +402,6 @@ class Lz4fBlockCompression : public BlockCompressionCodec { } Status _decompress(const Slice& input, Slice* output) { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( - ExecEnv::GetInstance()->block_compression_mem_tracker()); bool decompress_failed = false; std::unique_ptr context; RETURN_IF_ERROR(_acquire_decompression_ctx(context)); @@ -497,10 +511,16 @@ class Lz4HCBlockCompression : public BlockCompressionCodec { ENABLE_FACTORY_CREATOR(Context); public: - Context() : ctx(nullptr) { buffer = std::make_unique(); } + Context() : ctx(nullptr) { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); + buffer = std::make_unique(); + } LZ4_streamHC_t* ctx; std::unique_ptr buffer; ~Context() { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); if (ctx) { LZ4_freeStreamHC(ctx); } @@ -520,8 +540,6 @@ class Lz4HCBlockCompression : public BlockCompressionCodec { } Status compress(const Slice& input, faststring* output) override { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( - ExecEnv::GetInstance()->block_compression_mem_tracker()); std::unique_ptr context; RETURN_IF_ERROR(_acquire_compression_ctx(context)); bool compress_failed = false; @@ -540,7 +558,12 @@ class Lz4HCBlockCompression : public BlockCompressionCodec { compressed_buf.data = reinterpret_cast(output->data()); compressed_buf.size = max_len; } else { - context->buffer->resize(max_len); + { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); + // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE + context->buffer->resize(max_len); + } compressed_buf.data = reinterpret_cast(context->buffer->data()); compressed_buf.size = max_len; } @@ -566,8 +589,6 @@ class Lz4HCBlockCompression : public BlockCompressionCodec { } Status decompress(const Slice& input, Slice* output) override { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( - ExecEnv::GetInstance()->block_compression_mem_tracker()); auto decompressed_len = LZ4_decompress_safe(input.data, output->data, input.size, output->size); if (decompressed_len < 0) { @@ -687,8 +708,6 @@ class SnappyBlockCompression : public BlockCompressionCodec { ~SnappyBlockCompression() override {} Status compress(const Slice& input, faststring* output) override { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( - ExecEnv::GetInstance()->block_compression_mem_tracker()); size_t max_len = max_compressed_len(input.size); output->resize(max_len); Slice s(*output); @@ -699,8 +718,6 @@ class SnappyBlockCompression : public BlockCompressionCodec { } Status decompress(const Slice& input, Slice* output) override { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( - ExecEnv::GetInstance()->block_compression_mem_tracker()); if (!snappy::RawUncompress(input.data, input.size, output->data)) { return Status::InvalidArgument("Fail to do Snappy decompress"); } @@ -732,8 +749,6 @@ class ZlibBlockCompression : public BlockCompressionCodec { ~ZlibBlockCompression() {} Status compress(const Slice& input, faststring* output) override { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( - ExecEnv::GetInstance()->block_compression_mem_tracker()); size_t max_len = max_compressed_len(input.size); output->resize(max_len); Slice s(*output); @@ -748,8 +763,6 @@ class ZlibBlockCompression : public BlockCompressionCodec { Status compress(const std::vector& inputs, size_t uncompressed_size, faststring* output) override { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( - ExecEnv::GetInstance()->block_compression_mem_tracker()); size_t max_len = max_compressed_len(uncompressed_size); output->resize(max_len); @@ -790,8 +803,6 @@ class ZlibBlockCompression : public BlockCompressionCodec { } Status decompress(const Slice& input, Slice* output) override { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( - ExecEnv::GetInstance()->block_compression_mem_tracker()); size_t input_size = input.size; auto zres = ::uncompress2((Bytef*)output->data, &output->size, (Bytef*)input.data, &input_size); @@ -814,10 +825,16 @@ class ZstdBlockCompression : public BlockCompressionCodec { ENABLE_FACTORY_CREATOR(CContext); public: - CContext() : ctx(nullptr) { buffer = std::make_unique(); } + CContext() : ctx(nullptr) { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); + buffer = std::make_unique(); + } ZSTD_CCtx* ctx; std::unique_ptr buffer; ~CContext() { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); if (ctx) { ZSTD_freeCCtx(ctx); } @@ -860,8 +877,6 @@ class ZstdBlockCompression : public BlockCompressionCodec { // https://github.com/facebook/zstd/blob/dev/examples/streaming_compression.c Status compress(const std::vector& inputs, size_t uncompressed_size, faststring* output) override { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( - ExecEnv::GetInstance()->block_compression_mem_tracker()); std::unique_ptr context; RETURN_IF_ERROR(_acquire_compression_ctx(context)); bool compress_failed = false; @@ -880,8 +895,12 @@ class ZstdBlockCompression : public BlockCompressionCodec { compressed_buf.data = reinterpret_cast(output->data()); compressed_buf.size = max_len; } else { - // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE - context->buffer->resize(max_len); + { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); + // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE + context->buffer->resize(max_len); + } compressed_buf.data = reinterpret_cast(context->buffer->data()); compressed_buf.size = max_len; } @@ -946,8 +965,6 @@ class ZstdBlockCompression : public BlockCompressionCodec { } Status decompress(const Slice& input, Slice* output) override { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( - ExecEnv::GetInstance()->block_compression_mem_tracker()); std::unique_ptr context; bool decompress_failed = false; RETURN_IF_ERROR(_acquire_decompression_ctx(context)); @@ -1043,8 +1060,6 @@ class GzipBlockCompression : public ZlibBlockCompression { ~GzipBlockCompression() override = default; Status decompress(const Slice& input, Slice* output) override { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( - ExecEnv::GetInstance()->block_compression_mem_tracker()); z_stream z_strm = {}; z_strm.zalloc = Z_NULL; z_strm.zfree = Z_NULL; @@ -1126,8 +1141,6 @@ class GzipBlockCompressionByLibdeflate final : public GzipBlockCompression { ~GzipBlockCompressionByLibdeflate() override = default; Status decompress(const Slice& input, Slice* output) override { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( - ExecEnv::GetInstance()->block_compression_mem_tracker()); if (input.empty()) { output->size = 0; return Status::OK(); @@ -1160,8 +1173,6 @@ class LzoBlockCompression final : public BlockCompressionCodec { } size_t max_compressed_len(size_t len) override { return 0; }; Status decompress(const Slice& input, Slice* output) override { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( - ExecEnv::GetInstance()->block_compression_mem_tracker()); auto* input_ptr = input.data; auto remain_input_size = input.size; auto* output_ptr = output->data; From 47fd19ff90eb70df39959e23b8e6467beb78ec5d Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Thu, 16 May 2024 23:47:55 +0800 Subject: [PATCH 07/13] [fix](memory) Fix query mem tracker log when destruction not equal to 0 #34901 --- be/src/runtime/memory/mem_tracker_limiter.cpp | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 60454031e33ddf..f7906c0cfac6a7 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -126,17 +126,15 @@ MemTrackerLimiter::~MemTrackerLimiter() { "transfer memory tracking value between two trackers, can use transfer_to."; if (_consumption->current_value() != 0) { // TODO, expect mem tracker equal to 0 at the task end. - if (doris::config::enable_memory_orphan_check && _type == Type::QUERY) { +#ifndef NDEBUG + if (_type == Type::QUERY) { std::string err_msg = fmt::format("mem tracker label: {}, consumption: {}, peak consumption: {}, {}.", label(), _consumption->current_value(), _consumption->peak_value(), mem_tracker_inaccurate_msg); -#ifdef NDEBUG - LOG(INFO) << err_msg; -#else LOG(FATAL) << err_msg << print_address_sanitizers(); -#endif } +#endif if (ExecEnv::tracking_memory()) { ExecEnv::GetInstance()->orphan_mem_tracker()->consume(_consumption->current_value()); } From 72a1cd0d48fe3a248eab91aaa4fff393b234aed5 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Fri, 17 May 2024 19:39:14 +0800 Subject: [PATCH 08/13] [opt](memory) QueryMemTracker not equal to 0 will crash at query end when Debug compile (#35014) to eliminate query memory leak !!! --- be/src/runtime/memory/mem_tracker_limiter.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index f7906c0cfac6a7..77d25fc69196c6 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -125,7 +125,7 @@ MemTrackerLimiter::~MemTrackerLimiter() { "4. If you need to " "transfer memory tracking value between two trackers, can use transfer_to."; if (_consumption->current_value() != 0) { - // TODO, expect mem tracker equal to 0 at the task end. + // TODO, expect mem tracker equal to 0 at the load/compaction/etc. task end. #ifndef NDEBUG if (_type == Type::QUERY) { std::string err_msg = From 5d169ad2ea4450a5b34552c9b21591fa542dd423 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Tue, 3 Sep 2024 12:43:46 +0800 Subject: [PATCH 09/13] [fix](memory) When Load ends, check memory tracker value returns is equal to 0 (#40016) Check all memory is freed when Load is finished. --- be/src/pipeline/pipeline_fragment_context.cpp | 4 ++ be/src/runtime/memory/mem_tracker_limiter.cpp | 63 +++++++++++-------- be/src/runtime/memory/mem_tracker_limiter.h | 2 + .../vec/exec/scan/group_commit_scan_node.cpp | 4 ++ 4 files changed, 48 insertions(+), 25 deletions(-) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index dab359ed0408c6..550e60c210c080 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -846,6 +846,10 @@ Status PipelineFragmentContext::_create_sink(int sender_id, const TDataSink& thr break; } case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: { +#ifndef NDEBUG + DCHECK(state->get_query_ctx() != nullptr); + state->get_query_ctx()->query_mem_tracker->is_group_commit_load = true; +#endif sink_ = std::make_shared(next_operator_builder_id(), _sink.get()); break; diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 77d25fc69196c6..a1e89145d37b14 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -114,6 +114,8 @@ MemTrackerLimiter::~MemTrackerLimiter() { "mem tracker not equal to 0 when mem tracker destruct, this usually means that " "memory tracking is inaccurate and SCOPED_ATTACH_TASK and " "SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER are not used correctly. " + "If the log is truncated, search for `Address Sanitizer` in the be.INFO log to see " + "more information." "1. For query and load, memory leaks may have occurred, it is expected that the query " "mem tracker will be bound to the thread context using SCOPED_ATTACH_TASK and " "SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER before all memory alloc and free. " @@ -127,7 +129,7 @@ MemTrackerLimiter::~MemTrackerLimiter() { if (_consumption->current_value() != 0) { // TODO, expect mem tracker equal to 0 at the load/compaction/etc. task end. #ifndef NDEBUG - if (_type == Type::QUERY) { + if (_type == Type::QUERY || (_type == Type::LOAD && !is_group_commit_load)) { std::string err_msg = fmt::format("mem tracker label: {}, consumption: {}, peak consumption: {}, {}.", label(), _consumption->current_value(), _consumption->peak_value(), @@ -140,7 +142,7 @@ MemTrackerLimiter::~MemTrackerLimiter() { } _consumption->set(0); #ifndef NDEBUG - } else if (!_address_sanitizers.empty()) { + } else if (!_address_sanitizers.empty() && !is_group_commit_load) { LOG(FATAL) << "[Address Sanitizer] consumption is 0, but address sanitizers not empty. " << ", mem tracker label: " << _label << ", peak consumption: " << _consumption->peak_value() @@ -152,17 +154,17 @@ MemTrackerLimiter::~MemTrackerLimiter() { #ifndef NDEBUG void MemTrackerLimiter::add_address_sanitizers(void* buf, size_t size) { - if (_type == Type::QUERY) { + if (_type == Type::QUERY || (_type == Type::LOAD && !is_group_commit_load)) { std::lock_guard l(_address_sanitizers_mtx); auto it = _address_sanitizers.find(buf); if (it != _address_sanitizers.end()) { - LOG(FATAL) << "[Address Sanitizer] memory buf repeat add, mem tracker label: " << _label - << ", consumption: " << _consumption->current_value() - << ", peak consumption: " << _consumption->peak_value() << ", buf: " << buf - << ", size: " << size << ", old buf: " << it->first - << ", old size: " << it->second.size - << ", new stack_trace: " << get_stack_trace() - << ", old stack_trace: " << it->second.stack_trace; + _error_address_sanitizers.emplace_back( + fmt::format("[Address Sanitizer] memory buf repeat add, mem tracker label: {}, " + "consumption: {}, peak consumption: {}, buf: {}, size: {}, old " + "buf: {}, old size: {}, new stack_trace: {}, old stack_trace: {}.", + _label, _consumption->current_value(), _consumption->peak_value(), + buf, size, it->first, it->second.size, + get_stack_trace(1, "FULL_WITH_INLINE"), it->second.stack_trace)); } // if alignment not equal to 0, maybe usable_size > size. @@ -174,26 +176,26 @@ void MemTrackerLimiter::add_address_sanitizers(void* buf, size_t size) { } void MemTrackerLimiter::remove_address_sanitizers(void* buf, size_t size) { - if (_type == Type::QUERY) { + if (_type == Type::QUERY || (_type == Type::LOAD && !is_group_commit_load)) { std::lock_guard l(_address_sanitizers_mtx); auto it = _address_sanitizers.find(buf); if (it != _address_sanitizers.end()) { if (it->second.size != size) { - LOG(FATAL) << "[Address Sanitizer] free memory buf size inaccurate, mem tracker " - "label: " - << _label << ", consumption: " << _consumption->current_value() - << ", peak consumption: " << _consumption->peak_value() - << ", buf: " << buf << ", size: " << size << ", old buf: " << it->first - << ", old size: " << it->second.size - << ", new stack_trace: " << get_stack_trace() - << ", old stack_trace: " << it->second.stack_trace; + _error_address_sanitizers.emplace_back(fmt::format( + "[Address Sanitizer] free memory buf size inaccurate, mem tracker label: " + "{}, consumption: {}, peak consumption: {}, buf: {}, size: {}, old buf: " + "{}, old size: {}, new stack_trace: {}, old stack_trace: {}.", + _label, _consumption->current_value(), _consumption->peak_value(), buf, + size, it->first, it->second.size, get_stack_trace(1, "FULL_WITH_INLINE"), + it->second.stack_trace)); } _address_sanitizers.erase(buf); } else { - LOG(FATAL) << "[Address Sanitizer] memory buf not exist, mem tracker label: " << _label - << ", consumption: " << _consumption->current_value() - << ", peak consumption: " << _consumption->peak_value() << ", buf: " << buf - << ", size: " << size << ", stack_trace: " << get_stack_trace(); + _error_address_sanitizers.emplace_back(fmt::format( + "[Address Sanitizer] memory buf not exist, mem tracker label: {}, consumption: " + "{}, peak consumption: {}, buf: {}, size: {}, stack_trace: {}.", + _label, _consumption->current_value(), _consumption->peak_value(), buf, size, + get_stack_trace(1, "FULL_WITH_INLINE"))); } } } @@ -201,9 +203,20 @@ void MemTrackerLimiter::remove_address_sanitizers(void* buf, size_t size) { std::string MemTrackerLimiter::print_address_sanitizers() { std::lock_guard l(_address_sanitizers_mtx); std::string detail = "[Address Sanitizer]:"; + detail += "\n memory not be freed:"; for (const auto& it : _address_sanitizers) { - detail += fmt::format("\n {}, size {}, strack trace: {}", it.first, it.second.size, - it.second.stack_trace); + auto msg = fmt::format( + "\n [Address Sanitizer] buf not be freed, mem tracker label: {}, consumption: " + "{}, peak consumption: {}, buf: {}, size {}, strack trace: {}", + _label, _consumption->current_value(), _consumption->peak_value(), it.first, + it.second.size, it.second.stack_trace); + LOG(INFO) << msg; + detail += msg; + } + detail += "\n incorrect memory alloc and free:"; + for (const auto& err_msg : _error_address_sanitizers) { + LOG(INFO) << err_msg; + detail += fmt::format("\n {}", err_msg); } return detail; } diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index e5c5cb1bc0369c..344f3dc92b6670 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -209,6 +209,7 @@ class MemTrackerLimiter final : public MemTracker { void add_address_sanitizers(void* buf, size_t size); void remove_address_sanitizers(void* buf, size_t size); std::string print_address_sanitizers(); + bool is_group_commit_load {false}; #endif std::string debug_string() override { @@ -260,6 +261,7 @@ class MemTrackerLimiter final : public MemTracker { std::mutex _address_sanitizers_mtx; std::unordered_map _address_sanitizers; + std::vector _error_address_sanitizers; #endif }; diff --git a/be/src/vec/exec/scan/group_commit_scan_node.cpp b/be/src/vec/exec/scan/group_commit_scan_node.cpp index 50ba8e31be468f..9f6e6a041eceb8 100644 --- a/be/src/vec/exec/scan/group_commit_scan_node.cpp +++ b/be/src/vec/exec/scan/group_commit_scan_node.cpp @@ -42,6 +42,10 @@ Status GroupCommitScanNode::get_next(RuntimeState* state, vectorized::Block* blo Status GroupCommitScanNode::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(VScanNode::init(tnode, state)); +#ifndef NDEBUG + DCHECK(state->get_query_ctx() != nullptr); + state->get_query_ctx()->query_mem_tracker->is_group_commit_load = true; +#endif return state->exec_env()->group_commit_mgr()->get_load_block_queue( _table_id, state->fragment_instance_id(), load_block_queue); } From 81dbc984887b6d254c4140f30e5422562bc838f2 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Thu, 5 Sep 2024 22:15:51 +0800 Subject: [PATCH 10/13] [fix](memory) Reflect Allocator memory tracker (#40439) fix: ![img_v3_02e7_e9847bd3-89ad-4e9e-8fa1-76109a5386fg](https://github.com/user-attachments/assets/0a7d627e-23d0-40f1-bec9-b9d9f64fed02) --- be/src/olap/page_cache.cpp | 10 ++++++---- be/src/olap/page_cache.h | 1 + be/src/runtime/thread_context.h | 21 ++++++++++++++------- be/src/util/byte_buffer.h | 8 +++++++- be/src/vec/common/allocator.cpp | 33 ++------------------------------- be/src/vec/common/allocator.h | 11 ++++++++--- 6 files changed, 38 insertions(+), 46 deletions(-) diff --git a/be/src/olap/page_cache.cpp b/be/src/olap/page_cache.cpp index b70dadc5b431ea..1f0556f4642110 100644 --- a/be/src/olap/page_cache.cpp +++ b/be/src/olap/page_cache.cpp @@ -28,10 +28,12 @@ template PageBase::PageBase(size_t b, bool use_cache, segment_v2::PageTypePB page_type) : LRUCacheValueBase(), _size(b), _capacity(b) { if (use_cache) { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( - StoragePageCache::instance()->mem_tracker(page_type)); - _data = reinterpret_cast(TAllocator::alloc(_capacity, ALLOCATOR_ALIGNMENT_16)); + _mem_tracker_by_allocator = StoragePageCache::instance()->mem_tracker(page_type); } else { + _mem_tracker_by_allocator = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); + } + { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker_by_allocator); _data = reinterpret_cast(TAllocator::alloc(_capacity, ALLOCATOR_ALIGNMENT_16)); } } @@ -40,7 +42,7 @@ template PageBase::~PageBase() { if (_data != nullptr) { DCHECK(_capacity != 0 && _size != 0); - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(TAllocator::mem_tracker_); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker_by_allocator); TAllocator::free(_data, _capacity); } } diff --git a/be/src/olap/page_cache.h b/be/src/olap/page_cache.h index ef25de7bc30c63..09fc689959ce4c 100644 --- a/be/src/olap/page_cache.h +++ b/be/src/olap/page_cache.h @@ -60,6 +60,7 @@ class PageBase : private TAllocator, public LRUCacheValueBase { // Effective size, smaller than capacity, such as data page remove checksum suffix. size_t _size = 0; size_t _capacity = 0; + std::shared_ptr _mem_tracker_by_allocator; }; using DataPage = PageBase>; diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index fa2f177b9dffc9..885d616eb06470 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -446,8 +446,10 @@ class SwitchThreadMemTrackerLimiter { const std::shared_ptr& mem_tracker) { DCHECK(mem_tracker); doris::ThreadLocalHandle::create_thread_local_if_not_exits(); - _old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); - thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker); + if (mem_tracker != thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()) { + _old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); + thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker); + } } explicit SwitchThreadMemTrackerLimiter(const doris::QueryThreadContext& query_thread_context) { @@ -455,18 +457,23 @@ class SwitchThreadMemTrackerLimiter { DCHECK(thread_context()->task_id() == query_thread_context.query_id); // workload group alse not change DCHECK(query_thread_context.query_mem_tracker); - _old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); - thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker( - query_thread_context.query_mem_tracker); + if (query_thread_context.query_mem_tracker != + thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()) { + _old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); + thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker( + query_thread_context.query_mem_tracker); + } } ~SwitchThreadMemTrackerLimiter() { - thread_context()->thread_mem_tracker_mgr->detach_limiter_tracker(_old_mem_tracker); + if (_old_mem_tracker != nullptr) { + thread_context()->thread_mem_tracker_mgr->detach_limiter_tracker(_old_mem_tracker); + } doris::ThreadLocalHandle::del_thread_local_if_count_is_zero(); } private: - std::shared_ptr _old_mem_tracker; + std::shared_ptr _old_mem_tracker {nullptr}; }; class AddThreadMemTrackerConsumer { diff --git a/be/src/util/byte_buffer.h b/be/src/util/byte_buffer.h index aafd4506087d76..17764b9e4f6ec1 100644 --- a/be/src/util/byte_buffer.h +++ b/be/src/util/byte_buffer.h @@ -69,9 +69,15 @@ struct ByteBuffer : private Allocator { size_t capacity; private: - ByteBuffer(size_t capacity_) : pos(0), limit(capacity_), capacity(capacity_) { + ByteBuffer(size_t capacity_) + : pos(0), + limit(capacity_), + capacity(capacity_), + mem_tracker_(doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()) { ptr = reinterpret_cast(Allocator::alloc(capacity_)); } + + std::shared_ptr mem_tracker_; }; } // namespace doris diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp index 42712c05e28c47..43c212ae3a53d5 100644 --- a/be/src/vec/common/allocator.cpp +++ b/be/src/vec/common/allocator.cpp @@ -189,43 +189,14 @@ void Allocator::memory_ template void Allocator::consume_memory( - size_t size) { - // Usually, an object that inherits Allocator has the same TLS tracker for each alloc. - // If an object that inherits Allocator needs to be reused by multiple queries, - // it is necessary to switch the same tracker to TLS when calling alloc. - // However, in ORC Reader, ORC DataBuffer will be reused, but we cannot switch TLS tracker, - // so we update the Allocator tracker when the TLS tracker changes. - // note that the tracker in thread context when object that inherit Allocator is constructed may be - // no attach memory tracker in tls. usually the memory tracker is attached in tls only during the first alloc. - if (mem_tracker_ == nullptr || - mem_tracker_->label() != doris::thread_context()->thread_mem_tracker()->label()) { - mem_tracker_ = doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); - } + size_t size) const { CONSUME_THREAD_MEM_TRACKER(size); } template void Allocator::release_memory( size_t size) const { - doris::ThreadContext* thread_context = doris::thread_context(true); - if ((thread_context && thread_context->thread_mem_tracker()->label() != "Orphan") || - mem_tracker_ == nullptr) { - // If thread_context exist and the label of thread_mem_tracker not equal to `Orphan`, - // this means that in the scope of SCOPED_ATTACH_TASK, - // so thread_mem_tracker should be used to release memory. - // If mem_tracker_ is nullptr there is a scenario where an object that inherits Allocator - // has never called alloc, but free memory. - // in phmap, the memory alloced by an object may be transferred to another object and then free. - // in this case, thread context must attach a memory tracker other than Orphan, - // otherwise memory tracking will be wrong. - RELEASE_THREAD_MEM_TRACKER(size); - } else { - // if thread_context does not exist or the label of thread_mem_tracker is equal to - // `Orphan`, it usually happens during object destruction. This means that - // the scope of SCOPED_ATTACH_TASK has been left, so release memory using Allocator tracker. - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker_); - RELEASE_THREAD_MEM_TRACKER(size); - } + RELEASE_THREAD_MEM_TRACKER(size); } template diff --git a/be/src/vec/common/allocator.h b/be/src/vec/common/allocator.h index f0a490c5da29d9..026d398882feb0 100644 --- a/be/src/vec/common/allocator.h +++ b/be/src/vec/common/allocator.h @@ -232,7 +232,14 @@ class Allocator { // alloc will continue to execute, so the consume memtracker is forced. void memory_check(size_t size) const; // Increases consumption of this tracker by 'bytes'. - void consume_memory(size_t size); + // some special cases: + // 1. objects that inherit Allocator will not be shared by multiple queries. + // non-compliant: page cache, ORC ByteBuffer. + // 2. objects that inherit Allocator will only free memory allocated by themselves. + // non-compliant: phmap, the memory alloced by an object may be transferred to another object and then free. + // 3. the memory tracker in TLS is the same during the construction of objects that inherit Allocator + // and during subsequent memory allocation. + void consume_memory(size_t size) const; void release_memory(size_t size) const; void throw_bad_alloc(const std::string& err) const; #ifndef NDEBUG @@ -403,8 +410,6 @@ class Allocator { static constexpr bool clear_memory = clear_memory_; - std::shared_ptr mem_tracker_ {nullptr}; - // Freshly mmapped pages are copy-on-write references to a global zero page. // On the first write, a page fault occurs, and an actual writable page is // allocated. If we are going to use this memory soon, such as when resizing From f1c108deba609b6afbd5f419324d7025caee3ed1 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Wed, 6 Dec 2023 00:26:34 +0800 Subject: [PATCH 11/13] fix owned slice capacity (#28002) --- be/src/util/slice.h | 5 ----- 1 file changed, 5 deletions(-) diff --git a/be/src/util/slice.h b/be/src/util/slice.h index a81593dc913906..b38b1147894f9e 100644 --- a/be/src/util/slice.h +++ b/be/src/util/slice.h @@ -378,11 +378,6 @@ class OwnedSlice : private Allocator Date: Mon, 9 Sep 2024 22:03:53 +0800 Subject: [PATCH 12/13] fix --- be/src/runtime/exec_env.h | 1 + be/src/runtime/stream_load/stream_load_context.h | 9 --------- be/src/vec/common/allocator.cpp | 10 +++++----- be/src/vec/exec/scan/group_commit_scan_node.cpp | 4 ++-- 4 files changed, 8 insertions(+), 16 deletions(-) diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 584f0d70c6688f..18061e04528c05 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -187,6 +187,7 @@ class ExecEnv { } std::shared_ptr stream_load_pipe_tracker() { return _stream_load_pipe_tracker; + } std::shared_ptr point_query_executor_mem_tracker() { return _point_query_executor_mem_tracker; } diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index 918bc5924d3ca5..2ccf8ce5014a88 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -130,15 +130,6 @@ class StreamLoadContext { ByteBufferPtr schema_buffer() { return _schema_buffer; } - ByteBufferPtr schema_buffer() { - if (_schema_buffer == nullptr) { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( - ExecEnv::GetInstance()->stream_load_pipe_tracker()); - _schema_buffer = ByteBuffer::allocate(config::stream_tvf_buffer_size); - } - return _schema_buffer; - } - public: static const int default_txn_id = -1; // load type, eg: ROUTINE LOAD/MANUAL LOAD diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp index 43c212ae3a53d5..480734dcade4c5 100644 --- a/be/src/vec/common/allocator.cpp +++ b/be/src/vec/common/allocator.cpp @@ -211,9 +211,9 @@ void Allocator::throw_b } #ifndef NDEBUG -template -void Allocator::add_address_sanitizers(void* buf, - size_t size) const { +template +void Allocator::add_address_sanitizers( + void* buf, size_t size) const { #ifdef BE_TEST if (!doris::ExecEnv::ready()) { return; @@ -222,8 +222,8 @@ void Allocator::add_address_sanitizers(v doris::thread_context()->thread_mem_tracker()->add_address_sanitizers(buf, size); } -template -void Allocator::remove_address_sanitizers( +template +void Allocator::remove_address_sanitizers( void* buf, size_t size) const { #ifdef BE_TEST if (!doris::ExecEnv::ready()) { diff --git a/be/src/vec/exec/scan/group_commit_scan_node.cpp b/be/src/vec/exec/scan/group_commit_scan_node.cpp index 9f6e6a041eceb8..be48bf7bf31bae 100644 --- a/be/src/vec/exec/scan/group_commit_scan_node.cpp +++ b/be/src/vec/exec/scan/group_commit_scan_node.cpp @@ -43,8 +43,8 @@ Status GroupCommitScanNode::get_next(RuntimeState* state, vectorized::Block* blo Status GroupCommitScanNode::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(VScanNode::init(tnode, state)); #ifndef NDEBUG - DCHECK(state->get_query_ctx() != nullptr); - state->get_query_ctx()->query_mem_tracker->is_group_commit_load = true; + DCHECK(state->get_query_ctx() != nullptr); + state->get_query_ctx()->query_mem_tracker->is_group_commit_load = true; #endif return state->exec_env()->group_commit_mgr()->get_load_block_queue( _table_id, state->fragment_instance_id(), load_block_queue); From 8f137117165851f01679590a05bc32b976128e04 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Tue, 10 Sep 2024 11:11:55 +0800 Subject: [PATCH 13/13] fix2 --- be/src/runtime/memory/mem_tracker_limiter.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index a1e89145d37b14..49e9064938114f 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -134,7 +134,7 @@ MemTrackerLimiter::~MemTrackerLimiter() { fmt::format("mem tracker label: {}, consumption: {}, peak consumption: {}, {}.", label(), _consumption->current_value(), _consumption->peak_value(), mem_tracker_inaccurate_msg); - LOG(FATAL) << err_msg << print_address_sanitizers(); + LOG(INFO) << err_msg << print_address_sanitizers(); } #endif if (ExecEnv::tracking_memory()) { @@ -143,10 +143,10 @@ MemTrackerLimiter::~MemTrackerLimiter() { _consumption->set(0); #ifndef NDEBUG } else if (!_address_sanitizers.empty() && !is_group_commit_load) { - LOG(FATAL) << "[Address Sanitizer] consumption is 0, but address sanitizers not empty. " - << ", mem tracker label: " << _label - << ", peak consumption: " << _consumption->peak_value() - << print_address_sanitizers(); + LOG(INFO) << "[Address Sanitizer] consumption is 0, but address sanitizers not empty. " + << ", mem tracker label: " << _label + << ", peak consumption: " << _consumption->peak_value() + << print_address_sanitizers(); #endif } memory_memtrackerlimiter_cnt << -1;