diff --git a/be/src/common/utils.h b/be/src/common/utils.h index c0ca4594233a2d..91eb4427c99aea 100644 --- a/be/src/common/utils.h +++ b/be/src/common/utils.h @@ -21,7 +21,9 @@ namespace doris { +#ifndef ARRAY_SIZE #define ARRAY_SIZE(a) (sizeof(a)/sizeof((a)[0])) +#endif struct AuthInfo { std::string user; diff --git a/be/src/exec/exchange_node.cpp b/be/src/exec/exchange_node.cpp index 53473f90a297c1..f1c221b65cb120 100644 --- a/be/src/exec/exchange_node.cpp +++ b/be/src/exec/exchange_node.cpp @@ -80,6 +80,7 @@ Status ExchangeNode::prepare(RuntimeState* state) { Status ExchangeNode::open(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker()); + ADD_THREAD_LOCAL_MEM_TRACKER(_stream_recvr->mem_tracker()); RETURN_IF_ERROR(ExecNode::open(state)); if (_is_merging) { RETURN_IF_ERROR(_sort_exec_exprs.open(state)); @@ -215,7 +216,6 @@ Status ExchangeNode::get_next_merging(RuntimeState* state, RowBatch* output_batc RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(state->check_query_state("Exchange, while merging next.")); - ADD_THREAD_LOCAL_MEM_TRACKER(_stream_recvr->mem_tracker()); RETURN_IF_ERROR(_stream_recvr->get_next(output_batch, eos)); while ((_num_rows_skipped < _offset)) { _num_rows_skipped += output_batch->num_rows(); diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index 53f9af57a8e5d0..68a522236cf7d2 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -52,7 +52,7 @@ OlapScanner::OlapScanner(RuntimeState* runtime_state, OlapScanNode* parent, bool _version(-1), _mem_tracker(MemTracker::create_tracker( tracker->limit(), - tracker->label() + ":OlapScanner:" + thread_local_ctx.get()->thread_id_str(), + tracker->label() + ":OlapScanner:" + tls_ctx()->thread_id_str(), tracker)) {} Status OlapScanner::prepare( diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index e8b35c16d19a0d..9928b7e26fda2d 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -50,7 +50,7 @@ NodeChannel::NodeChannel(OlapTableSink* parent, IndexChannel* index_channel, int _tuple_data_buffer_ptr = &_tuple_data_buffer; } _node_channel_tracker = - MemTracker::create_tracker(-1, "NodeChannel" + thread_local_ctx.get()->thread_id_str()); + MemTracker::create_tracker(-1, "NodeChannel" + tls_ctx()->thread_id_str()); } NodeChannel::~NodeChannel() noexcept { @@ -654,6 +654,7 @@ void IndexChannel::add_row(BlockRow& block_row, int64_t tablet_id) { void IndexChannel::mark_as_failed(int64_t node_id, const std::string& host, const std::string& err, int64_t tablet_id) { + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_index_channel_tracker); const auto& it = _tablets_by_channel.find(node_id); if (it == _tablets_by_channel.end()) { return; diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 2fa75587e707fa..1a902e834bff09 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -33,6 +33,7 @@ #include "exec/tablet_info.h" #include "gen_cpp/Types_types.h" #include "gen_cpp/internal_service.pb.h" +#include "runtime/thread_context.h" #include "util/bitmap.h" #include "util/countdown_latch.h" #include "util/ref_count_closure.h" @@ -325,6 +326,7 @@ class IndexChannel { void for_each_node_channel( const std::function&)>& func) { + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_index_channel_tracker); for (auto& it : _node_channels) { func(it.second); } @@ -365,7 +367,7 @@ class IndexChannel { std::unordered_map _failed_channels_msgs; Status _intolerable_failure_status = Status::OK(); - std::shared_ptr _index_channel_tracker; // TODO(zxy) use after + std::shared_ptr _index_channel_tracker; }; // Write data to Olap Table. diff --git a/be/src/olap/byte_buffer.cpp b/be/src/olap/byte_buffer.cpp index a3099e4e25b5ed..822b18cc504a5a 100644 --- a/be/src/olap/byte_buffer.cpp +++ b/be/src/olap/byte_buffer.cpp @@ -20,6 +20,7 @@ #include #include "olap/utils.h" +#include "runtime/thread_context.h" namespace doris { @@ -42,6 +43,8 @@ void StorageByteBuffer::BufDeleter::operator()(char* p) { if (0 != munmap(p, _mmap_length)) { LOG(FATAL) << "fail to munmap: mem=" << p << ", len=" << _mmap_length << ", errno=" << Errno::no() << ", errno_str=" << Errno::str(); + } else { + RELEASE_THREAD_LOCAL_MEM_TRACKER(_mmap_length); } } else { delete[] p; @@ -93,10 +96,12 @@ StorageByteBuffer* StorageByteBuffer::reference_buffer(StorageByteBuffer* refere StorageByteBuffer* StorageByteBuffer::mmap(void* start, uint64_t length, int prot, int flags, int fd, uint64_t offset) { + CONSUME_THREAD_LOCAL_MEM_TRACKER(length); char* memory = (char*)::mmap(start, length, prot, flags, fd, offset); if (MAP_FAILED == memory) { OLAP_LOG_WARNING("fail to mmap. [errno='%d' errno_str='%s']", Errno::no(), Errno::str()); + RELEASE_THREAD_LOCAL_MEM_TRACKER(length); return nullptr; } @@ -108,6 +113,7 @@ StorageByteBuffer* StorageByteBuffer::mmap(void* start, uint64_t length, int pro if (nullptr == buf) { deleter(memory); OLAP_LOG_WARNING("fail to allocate StorageByteBuffer."); + RELEASE_THREAD_LOCAL_MEM_TRACKER(length); return nullptr; } @@ -128,10 +134,12 @@ StorageByteBuffer* StorageByteBuffer::mmap(FileHandler* handler, uint64_t offset size_t length = handler->length(); int fd = handler->fd(); + CONSUME_THREAD_LOCAL_MEM_TRACKER(length); char* memory = (char*)::mmap(nullptr, length, prot, flags, fd, offset); if (MAP_FAILED == memory) { OLAP_LOG_WARNING("fail to mmap. [errno='%d' errno_str='%s']", Errno::no(), Errno::str()); + RELEASE_THREAD_LOCAL_MEM_TRACKER(length); return nullptr; } @@ -143,6 +151,7 @@ StorageByteBuffer* StorageByteBuffer::mmap(FileHandler* handler, uint64_t offset if (nullptr == buf) { deleter(memory); OLAP_LOG_WARNING("fail to allocate StorageByteBuffer."); + RELEASE_THREAD_LOCAL_MEM_TRACKER(length); return nullptr; } @@ -173,7 +182,7 @@ Status StorageByteBuffer::put(uint64_t index, char src) { } Status StorageByteBuffer::put(const char* src, uint64_t src_size, uint64_t offset, - uint64_t length) { + uint64_t length) { //没有足够的空间可以写 if (length > remaining()) { return Status::OLAPInternalError(OLAP_ERR_BUFFER_OVERFLOW); diff --git a/be/src/olap/lru_cache.cpp b/be/src/olap/lru_cache.cpp index 0f5ebaa395194e..1a045b24a3ba41 100644 --- a/be/src/olap/lru_cache.cpp +++ b/be/src/olap/lru_cache.cpp @@ -475,8 +475,7 @@ Cache::Handle* ShardedLRUCache::insert(const CacheKey& key, void* value, size_t CachePriority priority) { // The memory of the parameter value should be recorded in the tls mem tracker, // transfer the memory ownership of the value to ShardedLRUCache::_mem_tracker. - thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(_mem_tracker.get(), - charge); + tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(_mem_tracker.get(), charge); SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); const uint32_t hash = _hash_slice(key); return _shards[_shard(hash)]->insert(key, hash, value, charge, deleter, priority); diff --git a/be/src/olap/out_stream.cpp b/be/src/olap/out_stream.cpp index 475d0ca98fe8a0..b5d19b841a7612 100644 --- a/be/src/olap/out_stream.cpp +++ b/be/src/olap/out_stream.cpp @@ -25,7 +25,7 @@ namespace doris { OutStreamFactory::OutStreamFactory(CompressKind compress_kind, uint32_t stream_buffer_size) - : _compress_kind(compress_kind), _stream_buffer_size(stream_buffer_size) { + : _stream_buffer_size(stream_buffer_size) { switch (compress_kind) { case COMPRESS_NONE: _compressor = nullptr; diff --git a/be/src/olap/out_stream.h b/be/src/olap/out_stream.h index a67a6c52044bf0..b1954ce6cf1571 100644 --- a/be/src/olap/out_stream.h +++ b/be/src/olap/out_stream.h @@ -141,7 +141,6 @@ class OutStreamFactory { private: std::map _streams; // All created streams - CompressKind _compress_kind; Compressor _compressor; uint32_t _stream_buffer_size; diff --git a/be/src/olap/rowset/column_writer.cpp b/be/src/olap/rowset/column_writer.cpp index d750db1ec1ced1..6128ab4f6f597e 100644 --- a/be/src/olap/rowset/column_writer.cpp +++ b/be/src/olap/rowset/column_writer.cpp @@ -488,8 +488,7 @@ void ByteColumnWriter::record_position() { IntegerColumnWriter::IntegerColumnWriter(uint32_t column_id, uint32_t unique_column_id, OutStreamFactory* stream_factory, bool is_singed) - : _column_id(column_id), - _unique_column_id(unique_column_id), + : _unique_column_id(unique_column_id), _stream_factory(stream_factory), _writer(nullptr), _is_signed(is_singed) {} diff --git a/be/src/olap/rowset/column_writer.h b/be/src/olap/rowset/column_writer.h index 9fe2d60f2a5d25..a40a3780ba6580 100644 --- a/be/src/olap/rowset/column_writer.h +++ b/be/src/olap/rowset/column_writer.h @@ -179,7 +179,6 @@ class IntegerColumnWriter { Status flush() { return _writer->flush(); } private: - uint32_t _column_id; uint32_t _unique_column_id; OutStreamFactory* _stream_factory; RunLengthIntegerWriter* _writer; diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index b06083e996f823..7970d0da5b261a 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -51,10 +51,9 @@ Segment::Segment(const FilePathDesc& path_desc, uint32_t segment_id, const TabletSchema* tablet_schema) : _path_desc(path_desc), _segment_id(segment_id), _tablet_schema(tablet_schema) { #ifndef BE_TEST - _mem_tracker = MemTracker::create_virtual_tracker( - -1, "Segment", StorageEngine::instance()->tablet_mem_tracker()); + _mem_tracker = StorageEngine::instance()->tablet_mem_tracker(); #else - _mem_tracker = MemTracker::create_virtual_tracker(-1, "Segment"); + _mem_tracker = MemTracker::get_process_tracker(); #endif } diff --git a/be/src/runtime/bufferpool/system_allocator.cc b/be/src/runtime/bufferpool/system_allocator.cc index a2dfc394b132db..3fa69e981e561e 100644 --- a/be/src/runtime/bufferpool/system_allocator.cc +++ b/be/src/runtime/bufferpool/system_allocator.cc @@ -22,6 +22,7 @@ #include "common/config.h" #include "gutil/strings/substitute.h" +#include "runtime/thread_context.h" #include "util/bit_util.h" #include "util/error_util.h" @@ -75,9 +76,11 @@ Status SystemAllocator::AllocateViaMMap(int64_t len, uint8_t** buffer_mem) { // Map an extra huge page so we can fix up the alignment if needed. map_len += HUGE_PAGE_SIZE; } + CONSUME_THREAD_LOCAL_MEM_TRACKER(map_len); uint8_t* mem = reinterpret_cast( mmap(nullptr, map_len, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0)); if (mem == MAP_FAILED) { + RELEASE_THREAD_LOCAL_MEM_TRACKER(map_len); return Status::BufferAllocFailed("mmap failed"); } @@ -89,10 +92,12 @@ Status SystemAllocator::AllocateViaMMap(int64_t len, uint8_t** buffer_mem) { if (misalignment != 0) { uintptr_t fixup = HUGE_PAGE_SIZE - misalignment; munmap(mem, fixup); + RELEASE_THREAD_LOCAL_MEM_TRACKER(fixup); mem += fixup; map_len -= fixup; } munmap(mem + len, map_len - len); + RELEASE_THREAD_LOCAL_MEM_TRACKER(map_len - len); DCHECK_EQ(reinterpret_cast(mem) % HUGE_PAGE_SIZE, 0) << mem; // Mark the buffer as a candidate for promotion to huge pages. The Linux Transparent // Huge Pages implementation will try to back the memory with a huge page if it is @@ -142,6 +147,7 @@ Status SystemAllocator::AllocateViaMalloc(int64_t len, uint8_t** buffer_mem) { void SystemAllocator::Free(BufferPool::BufferHandle&& buffer) { if (config::mmap_buffers) { int rc = munmap(buffer.data(), buffer.len()); + RELEASE_THREAD_LOCAL_MEM_TRACKER(buffer.len()); DCHECK_EQ(rc, 0) << "Unexpected munmap() error: " << errno; } else { bool use_huge_pages = buffer.len() % HUGE_PAGE_SIZE == 0 && config::madvise_huge_pages; diff --git a/be/src/runtime/disk_io_mgr.cc b/be/src/runtime/disk_io_mgr.cc index ebedb58057a19b..1f6585180917b5 100644 --- a/be/src/runtime/disk_io_mgr.cc +++ b/be/src/runtime/disk_io_mgr.cc @@ -220,7 +220,7 @@ void DiskIoMgr::BufferDescriptor::reset(RequestContext* reader, ScanRange* range _eosr = false; _status = Status::OK(); // Consume in the tls mem tracker when the buffer is allocated. - _buffer_mem_tracker = thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker().get(); + _buffer_mem_tracker = tls_ctx()->_thread_mem_tracker_mgr->mem_tracker().get(); } void DiskIoMgr::BufferDescriptor::return_buffer() { @@ -739,7 +739,7 @@ char* DiskIoMgr::get_free_buffer(int64_t* buffer_size) { buffer = new char[*buffer_size]; } else { // This means the buffer's memory ownership is transferred from DiskIoMgr to tls tracker. - _mem_tracker->transfer_to(thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker().get(), *buffer_size); + _mem_tracker->transfer_to(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker().get(), *buffer_size); buffer = _free_buffers[idx].front(); _free_buffers[idx].pop_front(); } @@ -767,7 +767,7 @@ void DiskIoMgr::gc_io_buffers(int64_t bytes_to_free) { // The deleted buffer is released in the tls mem tracker, the deleted buffer belongs to DiskIoMgr, // so the freed memory should be recorded in the DiskIoMgr mem tracker. So if the tls mem tracker // and the DiskIoMgr tracker are different, transfer memory ownership. - _mem_tracker->transfer_to(thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker().get(), bytes_freed); + _mem_tracker->transfer_to(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker().get(), bytes_freed); } void DiskIoMgr::return_free_buffer(BufferDescriptor* desc) { @@ -793,7 +793,7 @@ void DiskIoMgr::return_free_buffer(char* buffer, int64_t buffer_size, MemTracker // The deleted buffer is released in the tls mem tracker. When the buffer was allocated, // it was consumed in BufferDescriptor->buffer_mem_tracker, so if the tls mem tracker and // the tracker in the parameters are different, transfer memory ownership. - tracker->transfer_to(thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker().get(), buffer_size); + tracker->transfer_to(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker().get(), buffer_size); } } diff --git a/be/src/runtime/fold_constant_executor.cpp b/be/src/runtime/fold_constant_executor.cpp index 4a51b91932c1d1..5cdfcb084edb83 100644 --- a/be/src/runtime/fold_constant_executor.cpp +++ b/be/src/runtime/fold_constant_executor.cpp @@ -44,6 +44,7 @@ TUniqueId FoldConstantExecutor::_dummy_id; Status FoldConstantExecutor::fold_constant_expr( const TFoldConstantParams& params, PConstantExprResult* response) { + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); const auto& expr_map = params.expr_map; auto expr_result_map = response->mutable_expr_result_map(); @@ -53,7 +54,6 @@ Status FoldConstantExecutor::fold_constant_expr( if (UNLIKELY(!status.ok())) { return status; } - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); for (const auto& m : expr_map) { PExprResultMap pexpr_result_map; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 34ac3569094e5e..d12bc85f2623eb 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -467,8 +467,7 @@ void FragmentMgr::_exec_actual(std::shared_ptr exec_state, Fi .instance_id(exec_state->fragment_instance_id()) .tag("pthread_id", std::to_string((uintptr_t)pthread_self())); #ifndef BE_TEST - SCOPED_ATTACH_TASK_THREAD(exec_state->executor()->runtime_state()->query_type(), - print_id(exec_state->query_id()), exec_state->fragment_instance_id(), + SCOPED_ATTACH_TASK_THREAD(exec_state->executor()->runtime_state(), exec_state->executor()->runtime_state()->instance_mem_tracker()); #endif exec_state->execute(); diff --git a/be/src/runtime/mem_pool.cpp b/be/src/runtime/mem_pool.cpp index a2b9bf3424208d..adc86f0e67849b 100644 --- a/be/src/runtime/mem_pool.cpp +++ b/be/src/runtime/mem_pool.cpp @@ -65,7 +65,7 @@ MemPool::MemPool() total_allocated_bytes_(0), total_reserved_bytes_(0), peak_allocated_bytes_(0), - _mem_tracker(thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker().get()) {} + _mem_tracker(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker().get()) {} MemPool::ChunkInfo::ChunkInfo(const Chunk& chunk_) : chunk(chunk_), allocated_bytes(0) { DorisMetrics::instance()->memory_pool_bytes_total->increment(chunk.size); diff --git a/be/src/runtime/mem_tracker.cpp b/be/src/runtime/mem_tracker.cpp index 86f7c0370bfa0a..49f8862b0bdef5 100644 --- a/be/src/runtime/mem_tracker.cpp +++ b/be/src/runtime/mem_tracker.cpp @@ -60,6 +60,19 @@ MemTracker* MemTracker::get_raw_process_tracker() { return raw_process_tracker; } +// Track memory for all brpc server responses. +static std::shared_ptr brpc_server_tracker; +static GoogleOnceType brpc_server_tracker_once = GOOGLE_ONCE_INIT; + +void MemTracker::create_brpc_server_tracker() { + brpc_server_tracker = MemTracker::create_tracker(-1, "Brpc", get_process_tracker(), MemTrackerLevel::OVERVIEW); +} + +std::shared_ptr MemTracker::get_brpc_server_tracker() { + GoogleOnceInit(&brpc_server_tracker_once, &MemTracker::create_brpc_server_tracker); + return brpc_server_tracker; +} + void MemTracker::list_process_trackers(std::vector>* trackers) { trackers->clear(); std::deque> to_process; @@ -88,7 +101,8 @@ std::shared_ptr MemTracker::create_tracker(int64_t byte_limit, const const std::shared_ptr& parent, MemTrackerLevel level, RuntimeProfile* profile) { - std::shared_ptr reset_parent = parent ? parent : thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker(); + std::shared_ptr reset_parent = + parent ? parent : tls_ctx()->_thread_mem_tracker_mgr->mem_tracker(); DCHECK(reset_parent); std::shared_ptr tracker( @@ -102,7 +116,8 @@ std::shared_ptr MemTracker::create_tracker(int64_t byte_limit, const std::shared_ptr MemTracker::create_virtual_tracker( int64_t byte_limit, const std::string& label, const std::shared_ptr& parent, MemTrackerLevel level) { - std::shared_ptr reset_parent = parent ? parent : thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker(); + std::shared_ptr reset_parent = + parent ? parent : tls_ctx()->_thread_mem_tracker_mgr->mem_tracker(); DCHECK(reset_parent); std::shared_ptr tracker( @@ -121,6 +136,7 @@ MemTracker::MemTracker(int64_t byte_limit, const std::string& label, RuntimeProfile* profile) : _limit(byte_limit), _label(label), + // Not 100% sure the id is unique. This is generated because it is faster than converting to int after hash. _id((GetCurrentTimeMicros() % 1000000) * 100 + _label.length()), _parent(parent), _level(level) { diff --git a/be/src/runtime/mem_tracker.h b/be/src/runtime/mem_tracker.h index 3d0e3f7271ebd2..74a7b4bef6bb12 100644 --- a/be/src/runtime/mem_tracker.h +++ b/be/src/runtime/mem_tracker.h @@ -97,6 +97,8 @@ class MemTracker { // Gets a shared_ptr to the "process" tracker, creating it if necessary. static std::shared_ptr get_process_tracker(); static MemTracker* get_raw_process_tracker(); + // Gets a shared_ptr to the "brpc server" tracker, creating it if necessary. + static std::shared_ptr get_brpc_server_tracker(); Status check_sys_mem_info(int64_t bytes) { if (MemInfo::initialized() && MemInfo::current_mem() + bytes >= MemInfo::mem_limit()) { @@ -464,6 +466,8 @@ class MemTracker { // Creates the process tracker. static void create_process_tracker(); + // Creates the brpc server tracker. + static void create_brpc_server_tracker(); // Limit on memory consumption, in bytes. If limit_ == -1, there is no consumption limit. int64_t _limit; diff --git a/be/src/runtime/memory/chunk_allocator.cpp b/be/src/runtime/memory/chunk_allocator.cpp index 2d2a9a5c018057..7f8259c03496d8 100644 --- a/be/src/runtime/memory/chunk_allocator.cpp +++ b/be/src/runtime/memory/chunk_allocator.cpp @@ -17,6 +17,8 @@ #include "runtime/memory/chunk_allocator.h" +#include + #include #include @@ -134,8 +136,7 @@ ChunkAllocator::ChunkAllocator(size_t reserve_limit) Status ChunkAllocator::allocate(size_t size, Chunk* chunk, MemTracker* tracker, bool check_limits) { MemTracker* reset_tracker = - tracker ? tracker - : thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker().get(); + tracker ? tracker : tls_ctx()->_thread_mem_tracker_mgr->mem_tracker().get(); // In advance, transfer the memory ownership of allocate from ChunkAllocator::tracker to the parameter tracker. // Next, if the allocate is successful, it will exit normally; // if the allocate fails, return this part of the memory to the parameter tracker. @@ -178,7 +179,7 @@ Status ChunkAllocator::allocate(size_t size, Chunk* chunk, MemTracker* tracker, chunk->data = SystemAllocator::allocate(size); // The allocated chunk is consumed in the tls mem tracker, we want to consume in the ChunkAllocator tracker, // transfer memory ownership. TODO(zxy) replace with switch tls tracker - thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(_mem_tracker.get(), size); + tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(_mem_tracker.get(), size); } chunk_pool_system_alloc_count->increment(1); chunk_pool_system_alloc_cost_ns->increment(cost_ns); @@ -208,9 +209,8 @@ void ChunkAllocator::free(const Chunk& chunk, MemTracker* tracker) { // it was consumed in the parameter tracker, so if the tls mem tracker and the parameter // tracker are different, transfer memory ownership. if (tracker) - tracker->transfer_to( - thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker().get(), - chunk.size); + tracker->transfer_to(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker().get(), + chunk.size); } chunk_pool_system_free_count->increment(1); chunk_pool_system_free_cost_ns->increment(cost_ns); @@ -223,8 +223,8 @@ void ChunkAllocator::free(const Chunk& chunk, MemTracker* tracker) { if (tracker) { tracker->transfer_to(_mem_tracker.get(), chunk.size); } else { - thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to( - _mem_tracker.get(), chunk.size); + tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(_mem_tracker.get(), + chunk.size); } _arenas[chunk.core_id]->push_free_chunk(chunk.data, chunk.size); } diff --git a/be/src/runtime/memory/system_allocator.cpp b/be/src/runtime/memory/system_allocator.cpp index 374cec5557ff97..6ed5906f005f7e 100644 --- a/be/src/runtime/memory/system_allocator.cpp +++ b/be/src/runtime/memory/system_allocator.cpp @@ -23,6 +23,7 @@ #include "common/config.h" #include "common/logging.h" +#include "runtime/thread_context.h" namespace doris { @@ -43,6 +44,8 @@ void SystemAllocator::free(uint8_t* ptr, size_t length) { char buf[64]; LOG(ERROR) << "fail to free memory via munmap, errno=" << errno << ", errmsg=" << strerror_r(errno, buf, 64); + } else { + RELEASE_THREAD_LOCAL_MEM_TRACKER(length); } } else { ::free(ptr); @@ -63,12 +66,14 @@ uint8_t* SystemAllocator::allocate_via_malloc(size_t length) { } uint8_t* SystemAllocator::allocate_via_mmap(size_t length) { + CONSUME_THREAD_LOCAL_MEM_TRACKER(length); auto ptr = (uint8_t*)mmap(nullptr, length, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0); if (ptr == MAP_FAILED) { char buf[64]; LOG(ERROR) << "fail to allocate memory via mmap, errno=" << errno << ", errmsg=" << strerror_r(errno, buf, 64); + RELEASE_THREAD_LOCAL_MEM_TRACKER(length); return nullptr; } return ptr; diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 6cdbce39d2a428..2745215bfbe4b3 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -89,9 +89,7 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request, _runtime_state->set_query_fragments_ctx(fragments_ctx); RETURN_IF_ERROR(_runtime_state->init_mem_trackers(_query_id)); - SCOPED_ATTACH_TASK_THREAD(_runtime_state->query_type(), print_id(_runtime_state->query_id()), - _runtime_state->fragment_instance_id(), - _runtime_state->instance_mem_tracker()); + SCOPED_ATTACH_TASK_THREAD(_runtime_state.get(), _runtime_state->instance_mem_tracker()); _runtime_state->set_be_number(request.backend_num); if (request.__isset.backend_id) { _runtime_state->set_backend_id(request.backend_id); @@ -442,9 +440,7 @@ void PlanFragmentExecutor::_collect_node_statistics() { } void PlanFragmentExecutor::report_profile() { - SCOPED_ATTACH_TASK_THREAD(_runtime_state->query_type(), print_id(_runtime_state->query_id()), - _runtime_state->fragment_instance_id(), - _runtime_state->instance_mem_tracker()); + SCOPED_ATTACH_TASK_THREAD(_runtime_state.get(), _runtime_state->instance_mem_tracker()); VLOG_FILE << "report_profile(): instance_id=" << _runtime_state->fragment_instance_id(); DCHECK(_report_status_cb); diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp index 614f7888f1e91f..06306df512c3d0 100644 --- a/be/src/runtime/row_batch.cpp +++ b/be/src/runtime/row_batch.cpp @@ -44,7 +44,7 @@ const int RowBatch::AT_CAPACITY_MEM_USAGE = 8 * 1024 * 1024; const int RowBatch::FIXED_LEN_BUFFER_LIMIT = AT_CAPACITY_MEM_USAGE / 2; RowBatch::RowBatch(const RowDescriptor& row_desc, int capacity) - : _mem_tracker(thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker()), + : _mem_tracker(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()), _has_in_flight_row(false), _num_rows(0), _num_uncommitted_rows(0), @@ -70,7 +70,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, int capacity) // to allocated string data in special mempool // (change via python script that runs over Data_types.cc) RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch) - : _mem_tracker(thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker()), + : _mem_tracker(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()), _has_in_flight_row(false), _num_rows(input_batch.num_rows()), _num_uncommitted_rows(0), diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 4eade51b46757e..008e9d2e588c2a 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -156,8 +156,8 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc( // LOG(INFO) << "entity filter id:" << filter_id; cntVal->filter->init_with_desc(&cntVal->runtime_filter_desc, query_options, _fragment_instance_id); cntVal->tracker = MemTracker::create_tracker( - -1, thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker()->label() + ":FilterID:" + filter_id, - thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker()); + -1, tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()->label() + ":FilterID:" + filter_id, + tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()); _filter_map.emplace(filter_id, cntVal); return Status::OK(); } diff --git a/be/src/runtime/sorted_run_merger.cc b/be/src/runtime/sorted_run_merger.cc index 11042a4d9d03e1..5bf518178db14b 100644 --- a/be/src/runtime/sorted_run_merger.cc +++ b/be/src/runtime/sorted_run_merger.cc @@ -129,7 +129,7 @@ class SortedRunMerger::ParallelBatchedRowSupplier : public SortedRunMerger::Batc *done = false; _pull_task_thread = std::thread(&SortedRunMerger::ParallelBatchedRowSupplier::process_sorted_run_task, - this, thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker()); + this, tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()); RETURN_IF_ERROR(next(nullptr, done)); return Status::OK(); diff --git a/be/src/runtime/tcmalloc_hook.h b/be/src/runtime/tcmalloc_hook.h index 9ba55fde8e0fa0..548b8862990b8f 100644 --- a/be/src/runtime/tcmalloc_hook.h +++ b/be/src/runtime/tcmalloc_hook.h @@ -36,11 +36,11 @@ // destructor to control the behavior of consume can lead to unexpected behavior, // like this: if (LIKELY(doris::start_thread_mem_tracker)) { void new_hook(const void* ptr, size_t size) { - doris::thread_local_ctx.get()->consume_mem(tc_nallocx(size, 0)); + doris::tls_ctx()->consume_mem(tc_nallocx(size, 0)); } void delete_hook(const void* ptr) { - doris::thread_local_ctx.get()->release_mem(tc_malloc_size(const_cast(ptr))); + doris::tls_ctx()->release_mem(tc_malloc_size(const_cast(ptr))); } void init_hook() { diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp index 871cd1ebf0c34d..0b71101d971c28 100644 --- a/be/src/runtime/thread_context.cpp +++ b/be/src/runtime/thread_context.cpp @@ -17,6 +17,9 @@ #include "runtime/thread_context.h" +#include "runtime/runtime_state.h" +#include "util/doris_metrics.h" + namespace doris { DEFINE_STATIC_THREAD_LOCAL(ThreadContext, ThreadContextPtr, thread_local_ctx); @@ -29,4 +32,128 @@ ThreadContext* ThreadContextPtr::get() { return thread_local_ctx; } +AttachTaskThread::AttachTaskThread(const ThreadContext::TaskType& type, const std::string& task_id, + const TUniqueId& fragment_instance_id, + const std::shared_ptr& mem_tracker) { + DCHECK(task_id != ""); + tls_ctx()->attach(type, task_id, fragment_instance_id, mem_tracker); +} + +AttachTaskThread::AttachTaskThread(const ThreadContext::TaskType& type, + const std::shared_ptr& mem_tracker) { +#ifndef BE_TEST + DCHECK(mem_tracker); +#endif + tls_ctx()->attach(type, "", TUniqueId(), mem_tracker); +} + +AttachTaskThread::AttachTaskThread(const TQueryType::type& query_type, + const std::shared_ptr& mem_tracker) { +#ifndef BE_TEST + DCHECK(mem_tracker); +#endif + tls_ctx()->attach(query_to_task_type(query_type), "", TUniqueId(), mem_tracker); +} + +AttachTaskThread::AttachTaskThread(const TQueryType::type& query_type, + const std::shared_ptr& mem_tracker, + const std::string& task_id, + const TUniqueId& fragment_instance_id) { +#ifndef BE_TEST + DCHECK(task_id != ""); + DCHECK(fragment_instance_id != TUniqueId()); + DCHECK(mem_tracker); +#endif + tls_ctx()->attach(query_to_task_type(query_type), task_id, fragment_instance_id, mem_tracker); +} + +AttachTaskThread::AttachTaskThread(const RuntimeState* runtime_state, + const std::shared_ptr& mem_tracker) { +#ifndef BE_TEST + DCHECK(print_id(runtime_state->query_id()) != ""); + DCHECK(runtime_state->fragment_instance_id() != TUniqueId()); + DCHECK(mem_tracker); +#endif + tls_ctx()->attach(query_to_task_type(runtime_state->query_type()), + print_id(runtime_state->query_id()), runtime_state->fragment_instance_id(), + mem_tracker); +} + +AttachTaskThread::~AttachTaskThread() { + tls_ctx()->detach(); + DorisMetrics::instance()->attach_task_thread_count->increment(1); +} + +template +SwitchThreadMemTracker::SwitchThreadMemTracker( + const std::shared_ptr& mem_tracker, bool in_task) { + if (config::memory_verbose_track) { +#ifndef BE_TEST + DCHECK(mem_tracker); + // The thread tracker must be switched after the attach task, otherwise switching + // in the main thread will cause the cached tracker not be cleaned up in time. + DCHECK(in_task == false || tls_ctx()->_thread_mem_tracker_mgr->is_attach_task()); + if (Existed) { + _old_tracker_id = tls_ctx()->_thread_mem_tracker_mgr->update_tracker(mem_tracker); + } else { + _old_tracker_id = + tls_ctx()->_thread_mem_tracker_mgr->update_tracker(mem_tracker); + } +#endif +#ifndef NDEBUG + tls_ctx()->_thread_mem_tracker_mgr->switch_count += 1; +#endif + } +} + +template +SwitchThreadMemTracker::~SwitchThreadMemTracker() { + if (config::memory_verbose_track) { +#ifndef NDEBUG + tls_ctx()->_thread_mem_tracker_mgr->switch_count -= 1; + DorisMetrics::instance()->switch_thread_mem_tracker_count->increment(1); +#endif +#ifndef BE_TEST + tls_ctx()->_thread_mem_tracker_mgr->update_tracker_id(_old_tracker_id); +#endif + } +} + +SwitchThreadMemTrackerErrCallBack::SwitchThreadMemTrackerErrCallBack( + const std::string& action_type, bool cancel_work, ERRCALLBACK err_call_back_func) { + DCHECK(action_type != std::string()); + _old_tracker_cb = tls_ctx()->_thread_mem_tracker_mgr->update_consume_err_cb( + action_type, cancel_work, err_call_back_func); +} + +SwitchThreadMemTrackerErrCallBack::~SwitchThreadMemTrackerErrCallBack() { + tls_ctx()->_thread_mem_tracker_mgr->update_consume_err_cb(_old_tracker_cb); + DorisMetrics::instance()->switch_thread_mem_tracker_err_cb_count->increment(1); +} + +SwitchBthread::SwitchBthread() { + tls = static_cast(bthread_getspecific(btls_key)); + // First call to bthread_getspecific (and before any bthread_setspecific) returns NULL + if (tls == nullptr) { + // Create thread-local data on demand. + tls = new ThreadContext; + tls->_thread_mem_tracker_mgr->init_bthread(); + // set the data so that next time bthread_getspecific in the thread returns the data. + CHECK_EQ(0, bthread_setspecific(btls_key, tls)); + } else { + tls->_thread_mem_tracker_mgr->init_bthread(); + } +} + +SwitchBthread::~SwitchBthread() { + DCHECK(tls != nullptr); + tls->_thread_mem_tracker_mgr->clear_untracked_mems(); +#ifndef NDEBUG + DorisMetrics::instance()->switch_bthread_count->increment(1); +#endif +} + +template class SwitchThreadMemTracker; +template class SwitchThreadMemTracker; + } // namespace doris diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 4d9d60078c1543..8ab72be634df0d 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -17,15 +17,17 @@ #pragma once +#include +// After brpc_conflict.h +#include + #include #include #include "common/logging.h" -#include "gen_cpp/Types_types.h" -#include "runtime/runtime_state.h" +#include "gen_cpp/PaloInternalService_types.h" // for TQueryType #include "runtime/thread_mem_tracker_mgr.h" #include "runtime/threadlocal.h" -#include "util/doris_metrics.h" // Attach to task when thread starts #define SCOPED_ATTACH_TASK_THREAD(type, ...) \ @@ -34,34 +36,50 @@ // may be different from the order of execution of instructions, which will cause the position of // the memory track to be unexpected. #define SCOPED_STOP_THREAD_LOCAL_MEM_TRACKER() \ - auto VARNAME_LINENUM(stop_tracker) = StopThreadMemTracker(true) + auto VARNAME_LINENUM(stop_tracker) = doris::StopThreadMemTracker(true) #define GLOBAL_STOP_THREAD_LOCAL_MEM_TRACKER() \ - auto VARNAME_LINENUM(stop_tracker) = StopThreadMemTracker(false) + auto VARNAME_LINENUM(stop_tracker) = doris::StopThreadMemTracker(false) // Switch thread mem tracker during task execution. // After the non-query thread switches the mem tracker, if the thread will not switch the mem // tracker again in the short term, can consider manually clear_untracked_mems. // The query thread will automatically clear_untracked_mems when detach_task. #define SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(mem_tracker) \ - auto VARNAME_LINENUM(switch_tracker) = SwitchThreadMemTracker(mem_tracker, false) + auto VARNAME_LINENUM(switch_tracker) = doris::SwitchThreadMemTracker(mem_tracker, false) #define SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker) \ - auto VARNAME_LINENUM(switch_tracker) = SwitchThreadMemTracker(mem_tracker, true); + auto VARNAME_LINENUM(switch_tracker) = doris::SwitchThreadMemTracker(mem_tracker, true); +#define SCOPED_SWITCH_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker) \ + auto VARNAME_LINENUM(switch_tracker) = doris::SwitchThreadMemTracker(mem_tracker, false) #define SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker) \ - auto VARNAME_LINENUM(switch_tracker) = SwitchThreadMemTracker(mem_tracker, true) + auto VARNAME_LINENUM(switch_tracker) = doris::SwitchThreadMemTracker(mem_tracker, true) // After the non-query thread switches the mem tracker, if the thread will not switch the mem // tracker again in the short term, can consider manually clear_untracked_mems. // The query thread will automatically clear_untracked_mems when detach_task. #define SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_END_CLEAR(mem_tracker) \ - auto VARNAME_LINENUM(switch_tracker) = SwitchThreadMemTrackerEndClear(mem_tracker) + auto VARNAME_LINENUM(switch_tracker) = doris::SwitchThreadMemTrackerEndClear(mem_tracker) #define SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB(action_type, ...) \ auto VARNAME_LINENUM(witch_tracker_cb) = \ - SwitchThreadMemTrackerErrCallBack(action_type, ##__VA_ARGS__) + doris::SwitchThreadMemTrackerErrCallBack(action_type, ##__VA_ARGS__) +#define SCOPED_SWITCH_BTHREAD() auto VARNAME_LINENUM(switch_bthread) = SwitchBthread() +// Before switching the same tracker multiple times, add tracker as early as possible, +// and then call `SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER` to reduce one map find. +// For example, in the exec_node open phase `add tracker`, it is no longer necessary to determine +// whether the tracker exists in TLS when switching the tracker in the exec_node get_next phase. +// TODO(zxy): Duplicate add tracker is currently prohibited, because it will, +// 1. waste time 2. `_untracked_mems[mem_tracker->id()] = 0` will cause the memory track to be lost. +// Some places may have to repeat the add tracker. optimize after. #define ADD_THREAD_LOCAL_MEM_TRACKER(mem_tracker) \ - thread_local_ctx.get()->_thread_mem_tracker_mgr->add_tracker(mem_tracker) + doris::tls_ctx()->_thread_mem_tracker_mgr->add_tracker(mem_tracker) +#define CONSUME_THREAD_LOCAL_MEM_TRACKER(size) \ + doris::tls_ctx()->_thread_mem_tracker_mgr->noncache_consume(size) +#define RELEASE_THREAD_LOCAL_MEM_TRACKER(size) \ + doris::tls_ctx()->_thread_mem_tracker_mgr->noncache_consume(-size) namespace doris { class TUniqueId; +extern bthread_key_t btls_key; + // The thread context saves some info about a working thread. // 2 requried info: // 1. thread_id: Current thread id, Auto generated. @@ -80,19 +98,20 @@ class ThreadContext { STORAGE = 4 // to be added ... }; - inline static const std::string TaskTypeStr[] = {"UNKNOWN", "QUERY", "LOAD", "COMPACTION", "STORAGE"}; + inline static const std::string TaskTypeStr[] = {"UNKNOWN", "QUERY", "LOAD", "COMPACTION", + "STORAGE"}; public: - ThreadContext() : _thread_id(std::this_thread::get_id()), _type(TaskType::UNKNOWN) { + ThreadContext() : _type(TaskType::UNKNOWN) { _thread_mem_tracker_mgr.reset(new ThreadMemTrackerMgr()); - std::stringstream ss; - ss << _thread_id; - _thread_id_str = ss.str(); + _thread_mem_tracker_mgr->init(); + start_thread_mem_tracker = true; + _thread_id = get_thread_id(); } void attach(const TaskType& type, const std::string& task_id, const TUniqueId& fragment_instance_id, - const std::shared_ptr& mem_tracker) { + const std::shared_ptr& mem_tracker) { DCHECK(_type == TaskType::UNKNOWN && _task_id == "") << ",old tracker label: " << mem_tracker->label() << ",new tracker label: " << _thread_mem_tracker_mgr->mem_tracker()->label(); @@ -111,10 +130,15 @@ class ThreadContext { } const std::string& task_id() const { return _task_id; } - const std::thread::id& thread_id() const { return _thread_id; } - const std::string& thread_id_str() const { return _thread_id_str; } + const std::string& thread_id_str() const { return _thread_id; } const TUniqueId& fragment_instance_id() const { return _fragment_instance_id; } + std::string get_thread_id() { + std::stringstream ss; + ss << std::this_thread::get_id(); + return ss.str(); + } + void consume_mem(int64_t size) { if (start_thread_mem_tracker) { _thread_mem_tracker_mgr->cache_consume(size); @@ -136,8 +160,7 @@ class ThreadContext { std::unique_ptr _thread_mem_tracker_mgr; private: - std::thread::id _thread_id; - std::string _thread_id_str; + std::string _thread_id; TaskType _type; std::string _task_id; TUniqueId _fragment_instance_id; @@ -178,55 +201,33 @@ class ThreadContextPtr { inline thread_local ThreadContextPtr thread_local_ctx; +static ThreadContext* tls_ctx() { + ThreadContext* tls = static_cast(bthread_getspecific(btls_key)); + if (tls != nullptr) { + return tls; + } else { + return thread_local_ctx.get(); + } +} + class AttachTaskThread { public: explicit AttachTaskThread(const ThreadContext::TaskType& type, const std::string& task_id, const TUniqueId& fragment_instance_id = TUniqueId(), - const std::shared_ptr& mem_tracker = nullptr) { - DCHECK(task_id != ""); - thread_local_ctx.get()->attach(type, task_id, fragment_instance_id, mem_tracker); - } + const std::shared_ptr& mem_tracker = nullptr); explicit AttachTaskThread(const ThreadContext::TaskType& type, - const std::shared_ptr& mem_tracker) { -#ifndef BE_TEST - DCHECK(mem_tracker); -#endif - thread_local_ctx.get()->attach(type, "", TUniqueId(), mem_tracker); - } + const std::shared_ptr& mem_tracker); explicit AttachTaskThread(const TQueryType::type& query_type, - const std::shared_ptr& mem_tracker) { -#ifndef BE_TEST - DCHECK(mem_tracker); -#endif - thread_local_ctx.get()->attach(query_to_task_type(query_type), "", TUniqueId(), - mem_tracker); - } + const std::shared_ptr& mem_tracker); - explicit AttachTaskThread(const TQueryType::type& query_type, const std::string& task_id, - const TUniqueId& fragment_instance_id, - const std::shared_ptr& mem_tracker) { -#ifndef BE_TEST - DCHECK(task_id != ""); - DCHECK(fragment_instance_id != TUniqueId()); - DCHECK(mem_tracker); -#endif - thread_local_ctx.get()->attach(query_to_task_type(query_type), task_id, - fragment_instance_id, mem_tracker); - } + explicit AttachTaskThread(const TQueryType::type& query_type, + const std::shared_ptr& mem_tracker, + const std::string& task_id, const TUniqueId& fragment_instance_id); explicit AttachTaskThread(const RuntimeState* runtime_state, - const std::shared_ptr& mem_tracker) { -#ifndef BE_TEST - DCHECK(print_id(runtime_state->query_id()) != ""); - DCHECK(runtime_state->fragment_instance_id() != TUniqueId()); - DCHECK(mem_tracker); -#endif - thread_local_ctx.get()->attach(query_to_task_type(runtime_state->query_type()), - print_id(runtime_state->query_id()), - runtime_state->fragment_instance_id(), mem_tracker); - } + const std::shared_ptr& mem_tracker); const ThreadContext::TaskType query_to_task_type(const TQueryType::type& query_type) { switch (query_type) { @@ -240,10 +241,7 @@ class AttachTaskThread { } } - ~AttachTaskThread() { - thread_local_ctx.get()->detach(); - DorisMetrics::instance()->attach_task_thread_count->increment(1); - } + ~AttachTaskThread(); }; class StopThreadMemTracker { @@ -263,36 +261,10 @@ class StopThreadMemTracker { template class SwitchThreadMemTracker { public: - explicit SwitchThreadMemTracker(const std::shared_ptr& mem_tracker, - bool in_task = true) { - if (config::memory_verbose_track) { -#ifndef BE_TEST - DCHECK(mem_tracker); - // The thread tracker must be switched after the attach task, otherwise switching - // in the main thread will cause the cached tracker not be cleaned up in time. - DCHECK(in_task == false || - thread_local_ctx.get()->_thread_mem_tracker_mgr->is_attach_task()); - if (Existed) { - _old_tracker_id = - thread_local_ctx.get()->_thread_mem_tracker_mgr->update_tracker( - mem_tracker); - } else { - _old_tracker_id = - thread_local_ctx.get()->_thread_mem_tracker_mgr->update_tracker( - mem_tracker); - } -#endif - } - } + explicit SwitchThreadMemTracker(const std::shared_ptr& mem_tracker, + bool in_task = true); - ~SwitchThreadMemTracker() { - if (config::memory_verbose_track) { -#ifndef BE_TEST - thread_local_ctx.get()->_thread_mem_tracker_mgr->update_tracker_id(_old_tracker_id); - DorisMetrics::instance()->switch_thread_mem_tracker_count->increment(1); -#endif - } - } + ~SwitchThreadMemTracker(); protected: int64_t _old_tracker_id = 0; @@ -300,11 +272,11 @@ class SwitchThreadMemTracker { class SwitchThreadMemTrackerEndClear : public SwitchThreadMemTracker { public: - explicit SwitchThreadMemTrackerEndClear(const std::shared_ptr& mem_tracker) + explicit SwitchThreadMemTrackerEndClear(const std::shared_ptr& mem_tracker) : SwitchThreadMemTracker(mem_tracker, false) {} ~SwitchThreadMemTrackerEndClear() { - thread_local_ctx.get()->_thread_mem_tracker_mgr->clear_untracked_mems(); + tls_ctx()->_thread_mem_tracker_mgr->clear_untracked_mems(); } }; @@ -312,19 +284,22 @@ class SwitchThreadMemTrackerErrCallBack { public: explicit SwitchThreadMemTrackerErrCallBack(const std::string& action_type, bool cancel_work = true, - ERRCALLBACK err_call_back_func = nullptr) { - DCHECK(action_type != std::string()); - _old_tracker_cb = thread_local_ctx.get()->_thread_mem_tracker_mgr->update_consume_err_cb( - action_type, cancel_work, err_call_back_func); - } + ERRCALLBACK err_call_back_func = nullptr); - ~SwitchThreadMemTrackerErrCallBack() { - thread_local_ctx.get()->_thread_mem_tracker_mgr->update_consume_err_cb(_old_tracker_cb); - DorisMetrics::instance()->switch_thread_mem_tracker_err_cb_count->increment(1); - } + ~SwitchThreadMemTrackerErrCallBack(); private: ConsumeErrCallBackInfo _old_tracker_cb; }; +class SwitchBthread { +public: + explicit SwitchBthread(); + + ~SwitchBthread(); + +private: + ThreadContext* tls; +}; + } // namespace doris diff --git a/be/src/runtime/thread_mem_tracker_mgr.cpp b/be/src/runtime/thread_mem_tracker_mgr.cpp index 06fd521faf1551..e55a4620f00e31 100644 --- a/be/src/runtime/thread_mem_tracker_mgr.cpp +++ b/be/src/runtime/thread_mem_tracker_mgr.cpp @@ -17,6 +17,8 @@ #include "runtime/thread_mem_tracker_mgr.h" +#include "runtime/exec_env.h" +#include "runtime/fragment_mgr.h" #include "runtime/mem_tracker_task_pool.h" #include "service/backend_options.h" @@ -25,6 +27,7 @@ namespace doris { void ThreadMemTrackerMgr::attach_task(const std::string& cancel_msg, const std::string& task_id, const TUniqueId& fragment_instance_id, const std::shared_ptr& mem_tracker) { + DCHECK(switch_count == 0) << print_debug_string(); _task_id = task_id; _fragment_instance_id = fragment_instance_id; _consume_err_cb.cancel_msg = cancel_msg; @@ -44,26 +47,15 @@ void ThreadMemTrackerMgr::attach_task(const std::string& cancel_msg, const std:: } void ThreadMemTrackerMgr::detach_task() { + DCHECK(switch_count == 0) << print_debug_string(); _task_id = ""; _fragment_instance_id = TUniqueId(); _consume_err_cb.init(); clear_untracked_mems(); - _tracker_id = 0; - // The following memory changes for the two map operations of _untracked_mems and _mem_trackers - // will be re-recorded in _untracked_mem. - _untracked_mems.clear(); - _untracked_mems[0] = 0; - _mem_trackers.clear(); - _mem_trackers[0] = MemTracker::get_process_tracker(); - _mem_tracker_labels.clear(); - _mem_tracker_labels[0] = MemTracker::get_process_tracker()->label(); + init(); } void ThreadMemTrackerMgr::exceeded_cancel_task(const std::string& cancel_details) { - _temp_task_mem_tracker = - ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->get_task_mem_tracker( - _task_id); - DCHECK(_temp_task_mem_tracker); if (_fragment_instance_id != TUniqueId()) { ExecEnv::GetInstance()->fragment_mgr()->cancel( _fragment_instance_id, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, diff --git a/be/src/runtime/thread_mem_tracker_mgr.h b/be/src/runtime/thread_mem_tracker_mgr.h index 4ca2adba3ebaa0..404837a73a0aa8 100644 --- a/be/src/runtime/thread_mem_tracker_mgr.h +++ b/be/src/runtime/thread_mem_tracker_mgr.h @@ -20,8 +20,6 @@ #include #include -#include "runtime/exec_env.h" -#include "runtime/fragment_mgr.h" #include "runtime/mem_tracker.h" namespace doris { @@ -61,33 +59,20 @@ inline thread_local bool start_thread_mem_tracker = false; // need to manually call cosume after stop_mem_tracker, and then start_mem_tracker. class ThreadMemTrackerMgr { public: - ThreadMemTrackerMgr() { - _mem_trackers[0] = MemTracker::get_process_tracker(); - _untracked_mems[0] = 0; - _tracker_id = 0; - _mem_tracker_labels[0] = MemTracker::get_process_tracker()->label(); - start_thread_mem_tracker = true; - } + ThreadMemTrackerMgr() {} + ~ThreadMemTrackerMgr() { clear_untracked_mems(); start_thread_mem_tracker = false; } - void clear_untracked_mems() { - for (const auto& untracked_mem : _untracked_mems) { - if (untracked_mem.second != 0) { - DCHECK(_mem_trackers[untracked_mem.first]) - << ", label: " << _mem_tracker_labels[untracked_mem.first]; - if (_mem_trackers[untracked_mem.first]) { - _mem_trackers[untracked_mem.first]->consume(untracked_mem.second); - } else { - MemTracker::get_process_tracker()->consume(untracked_mem.second); - } - } - } - mem_tracker()->consume(_untracked_mem); - _untracked_mem = 0; - } + // After thread initialization, calling `init` again must call `clear_untracked_mems` first + // to avoid memory tracking loss. + void init(); + + void init_bthread(); + + void clear_untracked_mems(); // After attach, the current thread TCMalloc Hook starts to consume/release task mem_tracker void attach_task(const std::string& cancel_msg, const std::string& task_id, @@ -96,21 +81,18 @@ class ThreadMemTrackerMgr { void detach_task(); - // Must be fast enough! - // Thread update_tracker may be called very frequently, adding a memory copy will be slow. + // Must be fast enough! Thread update_tracker may be called very frequently. + // So for performance, add tracker as early as possible, and then call update_tracker. template int64_t update_tracker(const std::shared_ptr& mem_tracker); void update_tracker_id(int64_t tracker_id); - void add_tracker(const std::shared_ptr& mem_tracker) { - _mem_trackers[mem_tracker->id()] = mem_tracker; - DCHECK(_mem_trackers[mem_tracker->id()]); - _untracked_mems[mem_tracker->id()] = 0; - _mem_tracker_labels[_temp_tracker_id] = mem_tracker->label(); - } + // Before switching the same tracker multiple times, add tracker as early as possible, + // update_tracker can reduce one map find. + void add_tracker(const std::shared_ptr& mem_tracker); - ConsumeErrCallBackInfo update_consume_err_cb(const std::string& cancel_msg, - bool cancel_task, ERRCALLBACK cb_func) { + ConsumeErrCallBackInfo update_consume_err_cb(const std::string& cancel_msg, bool cancel_task, + ERRCALLBACK cb_func) { _temp_consume_err_cb = _consume_err_cb; _consume_err_cb.cancel_msg = cancel_msg; _consume_err_cb.cancel_task = cancel_task; @@ -127,17 +109,34 @@ class ThreadMemTrackerMgr { // must increase the control to avoid entering infinite recursion, otherwise it may cause crash or stuck, void cache_consume(int64_t size); - void noncache_consume(); + void noncache_consume(int64_t size); bool is_attach_task() { return _task_id != ""; } - std::shared_ptr mem_tracker() { - DCHECK(_mem_trackers[_tracker_id]) << ", label: " << _mem_tracker_labels[_tracker_id]; - if (_mem_trackers[_tracker_id]) { - return _mem_trackers[_tracker_id]; - } else { - return MemTracker::get_process_tracker(); + std::shared_ptr mem_tracker(); + + int64_t switch_count = 0; + + std::string print_debug_string() { + fmt::memory_buffer mem_trackers_buf; + for (const auto& [key, value] : _mem_trackers) { + fmt::format_to(mem_trackers_buf, "{}_{},", std::to_string(key), value->log_usage(1)); } + fmt::memory_buffer untracked_mems_buf; + for (const auto& [key, value] : _untracked_mems) { + fmt::format_to(untracked_mems_buf, "{}_{},", std::to_string(key), + std::to_string(value)); + } + fmt::memory_buffer mem_tracker_labels_buf; + for (const auto& [key, value] : _mem_tracker_labels) { + fmt::format_to(mem_tracker_labels_buf, "{}_{},", std::to_string(key), value); + } + return fmt::format( + "ThreadMemTrackerMgr debug string, _tracker_id:{}, _untracked_mem:{}, _task_id:{}, " + "_mem_trackers:<{}>, _untracked_mems:<{}>, _mem_tracker_labels:<{}>", + std::to_string(_tracker_id), std::to_string(_untracked_mem), _task_id, + fmt::to_string(mem_trackers_buf), fmt::to_string(untracked_mems_buf), + fmt::to_string(mem_tracker_labels_buf)); } private: @@ -175,39 +174,71 @@ class ThreadMemTrackerMgr { ConsumeErrCallBackInfo _consume_err_cb; }; +inline void ThreadMemTrackerMgr::init() { + _tracker_id = 0; + _mem_trackers.clear(); + _mem_trackers[0] = MemTracker::get_process_tracker(); + _untracked_mems.clear(); + _untracked_mems[0] = 0; + _mem_tracker_labels.clear(); + _mem_tracker_labels[0] = MemTracker::get_process_tracker()->label(); +} + +inline void ThreadMemTrackerMgr::init_bthread() { + init(); + _mem_trackers[1] = MemTracker::get_brpc_server_tracker(); + _untracked_mems[1] = 0; + _mem_tracker_labels[1] = MemTracker::get_brpc_server_tracker()->label(); + _tracker_id = 1; +} + +inline void ThreadMemTrackerMgr::clear_untracked_mems() { + for (const auto& untracked_mem : _untracked_mems) { + if (untracked_mem.second != 0) { + DCHECK(_mem_trackers[untracked_mem.first]) << print_debug_string(); + _mem_trackers[untracked_mem.first]->consume(untracked_mem.second); + } + } + mem_tracker()->consume(_untracked_mem); + _untracked_mem = 0; +} + template inline int64_t ThreadMemTrackerMgr::update_tracker(const std::shared_ptr& mem_tracker) { - DCHECK(mem_tracker); + DCHECK(mem_tracker) << print_debug_string(); _temp_tracker_id = mem_tracker->id(); if (_temp_tracker_id == _tracker_id) { return _tracker_id; } if (Existed) { - DCHECK(_mem_trackers.find(_temp_tracker_id) != _mem_trackers.end()); + DCHECK(_mem_trackers.find(_temp_tracker_id) != _mem_trackers.end()) << print_debug_string(); } else { + // If the tracker has already been added, avoid `_untracked_mems[x] = 0;` again causing the memory track to be lost. if (_mem_trackers.find(_temp_tracker_id) == _mem_trackers.end()) { _mem_trackers[_temp_tracker_id] = mem_tracker; - DCHECK(_mem_trackers[_temp_tracker_id]); + DCHECK(_mem_trackers[_temp_tracker_id]) << print_debug_string(); _untracked_mems[_temp_tracker_id] = 0; _mem_tracker_labels[_temp_tracker_id] = mem_tracker->label(); } } + DCHECK(_mem_trackers.find(_tracker_id) != _mem_trackers.end()) << print_debug_string(); + DCHECK(_mem_trackers[_tracker_id]) << print_debug_string(); _untracked_mems[_tracker_id] += _untracked_mem; _untracked_mem = 0; std::swap(_tracker_id, _temp_tracker_id); - DCHECK(_mem_trackers[_tracker_id]) << ", label: " << _mem_tracker_labels[_tracker_id]; + DCHECK(_mem_trackers[_tracker_id]) << print_debug_string(); return _temp_tracker_id; // old tracker_id } inline void ThreadMemTrackerMgr::update_tracker_id(int64_t tracker_id) { + DCHECK(switch_count >= 0) << print_debug_string(); if (tracker_id != _tracker_id) { _untracked_mems[_tracker_id] += _untracked_mem; _untracked_mem = 0; _tracker_id = tracker_id; - DCHECK(_untracked_mems.find(_tracker_id) != _untracked_mems.end()) - << ", label: " << _mem_tracker_labels[_tracker_id]; - DCHECK(_mem_trackers[_tracker_id]); + DCHECK(_untracked_mems.find(_tracker_id) != _untracked_mems.end()) << print_debug_string(); + DCHECK(_mem_trackers[_tracker_id]) << print_debug_string(); } } @@ -218,7 +249,7 @@ inline void ThreadMemTrackerMgr::cache_consume(int64_t size) { // it will cause tracker->consumption to be temporarily less than 0. if (_untracked_mem >= config::mem_tracker_consume_min_size_bytes || _untracked_mem <= -config::mem_tracker_consume_min_size_bytes) { - DCHECK(_untracked_mems.find(_tracker_id) != _untracked_mems.end()); + DCHECK(_untracked_mems.find(_tracker_id) != _untracked_mems.end()) << print_debug_string(); // Allocating memory in the Hook command causes the TCMalloc Hook to be entered again, infinite recursion. // Needs to ensure that all memory allocated in mem_tracker.consume/try_consume is freed in time to avoid tracking misses. start_thread_mem_tracker = false; @@ -227,21 +258,36 @@ inline void ThreadMemTrackerMgr::cache_consume(int64_t size) { _untracked_mem += _untracked_mems[_tracker_id]; _untracked_mems[_tracker_id] = 0; } - noncache_consume(); + noncache_consume(_untracked_mem); + _untracked_mem = 0; start_thread_mem_tracker = true; } } -inline void ThreadMemTrackerMgr::noncache_consume() { - DCHECK(_mem_trackers[_tracker_id]) << ", label: " << _mem_tracker_labels[_tracker_id]; - Status st = mem_tracker()->try_consume(_untracked_mem); +inline void ThreadMemTrackerMgr::noncache_consume(int64_t size) { + Status st = mem_tracker()->try_consume(size); if (!st) { // The memory has been allocated, so when TryConsume fails, need to continue to complete // the consume to ensure the accuracy of the statistics. - mem_tracker()->consume(_untracked_mem); - exceeded(_untracked_mem, st); + mem_tracker()->consume(size); + exceeded(size, st); } - _untracked_mem = 0; +} + +inline void ThreadMemTrackerMgr::add_tracker(const std::shared_ptr& mem_tracker) { + DCHECK(_mem_trackers.find(mem_tracker->id()) == _mem_trackers.end()) << print_debug_string(); + _mem_trackers[mem_tracker->id()] = mem_tracker; + DCHECK(_mem_trackers[mem_tracker->id()]) << print_debug_string(); + _untracked_mems[mem_tracker->id()] = 0; + _mem_tracker_labels[_temp_tracker_id] = mem_tracker->label(); +} + +inline std::shared_ptr ThreadMemTrackerMgr::mem_tracker() { + // Whether the key _tracker_id exists in _mem_trackers. + DCHECK(_mem_trackers.find(_tracker_id) != _mem_trackers.end()) << print_debug_string(); + // If the key _tracker_id exists in _mem_trackers, check whether the value is null. + DCHECK(_mem_trackers[_tracker_id]) << print_debug_string(); + return _mem_trackers[_tracker_id]; } } // namespace doris diff --git a/be/src/service/brpc.h b/be/src/service/brpc.h index 031a9d6697ef64..6e1b348ac7a4b9 100644 --- a/be/src/service/brpc.h +++ b/be/src/service/brpc.h @@ -17,33 +17,10 @@ #pragma once -// This file is used to fixed macro conflict between butil and gutil // all header need by brpc is contain in this file. -// include this file instead of include -// and this file must put the first include in source file +// include this file instead of include . -#include "gutil/macros.h" -// Macros in the guti/macros.h, use butil's define -#ifdef DISALLOW_IMPLICIT_CONSTRUCTORS -#undef DISALLOW_IMPLICIT_CONSTRUCTORS -#endif - -#ifdef arraysize -#undef arraysize -#endif - -#undef OVERRIDE -#undef FINAL - -// use be/src/gutil/integral_types.h override butil/basictypes.h -#include "gutil/integral_types.h" -#ifdef BASE_INTEGRAL_TYPES_H_ -#define BUTIL_BASICTYPES_H_ -#endif - -#ifdef DEBUG_MODE -#undef DEBUG_MODE -#endif +#include #include #include @@ -51,6 +28,8 @@ #include #include #include +#include +#include #include #include #include diff --git a/be/src/service/brpc_conflict.h b/be/src/service/brpc_conflict.h new file mode 100644 index 00000000000000..35ef1b815c45e2 --- /dev/null +++ b/be/src/service/brpc_conflict.h @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +// This file is used to fixed macro conflict between butil and gutil +// and this file must put the first include in source file + +#include "gutil/macros.h" +// Macros in the guti/macros.h, use butil's define +#ifdef DISALLOW_IMPLICIT_CONSTRUCTORS +#undef DISALLOW_IMPLICIT_CONSTRUCTORS +#endif + +#ifdef arraysize +#undef arraysize +#endif + +#ifdef ARRAY_SIZE +#undef ARRAY_SIZE +#endif + +#undef OVERRIDE +#undef FINAL + +// use be/src/gutil/integral_types.h override butil/basictypes.h +#include "gutil/integral_types.h" +#ifdef BASE_INTEGRAL_TYPES_H_ +#define BUTIL_BASICTYPES_H_ +#endif + +#ifdef DEBUG_MODE +#undef DEBUG_MODE +#endif diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 4cb6b8f7ee84c9..5080b1c21825b2 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -29,6 +29,7 @@ #include "runtime/result_buffer_mgr.h" #include "runtime/routine_load/routine_load_task_executor.h" #include "runtime/runtime_state.h" +#include "runtime/thread_context.h" #include "service/brpc.h" #include "util/brpc_client_cache.h" #include "util/md5.h" @@ -42,16 +43,24 @@ namespace doris { DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(add_batch_task_queue_size, MetricUnit::NOUNIT); +bthread_key_t btls_key; + +static void thread_context_deleter(void* d) { + delete static_cast(d); +} + template PInternalServiceImpl::PInternalServiceImpl(ExecEnv* exec_env) : _exec_env(exec_env), _tablet_worker_pool(config::number_tablet_writer_threads, 10240) { REGISTER_HOOK_METRIC(add_batch_task_queue_size, [this]() { return _tablet_worker_pool.get_queue_size(); }); + CHECK_EQ(0, bthread_key_create(&btls_key, thread_context_deleter)); } template PInternalServiceImpl::~PInternalServiceImpl() { DEREGISTER_HOOK_METRIC(add_batch_task_queue_size); + CHECK_EQ(0, bthread_key_delete(btls_key)); } template @@ -59,6 +68,7 @@ void PInternalServiceImpl::transmit_data(google::protobuf::RpcController* cnt const PTransmitDataParams* request, PTransmitDataResult* response, google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); VLOG_ROW << "transmit data: fragment_instance_id=" << print_id(request->finst_id()) << " node=" << request->node_id(); brpc::Controller* cntl = static_cast(cntl_base); @@ -84,6 +94,7 @@ void PInternalServiceImpl::tablet_writer_open(google::protobuf::RpcController const PTabletWriterOpenRequest* request, PTabletWriterOpenResult* response, google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); VLOG_RPC << "tablet writer open, id=" << request->id() << ", index_id=" << request->index_id() << ", txn_id=" << request->txn_id(); brpc::ClosureGuard closure_guard(done); @@ -101,6 +112,7 @@ void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController const PExecPlanFragmentRequest* request, PExecPlanFragmentResult* response, google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); auto st = Status::OK(); bool compact = request->has_compact() ? request->compact() : false; @@ -116,6 +128,7 @@ void PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcContr const PTabletWriterAddBatchRequest* request, PTabletWriterAddBatchResult* response, google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); VLOG_RPC << "tablet writer add batch, id=" << request->id() << ", index_id=" << request->index_id() << ", sender_id=" << request->sender_id() << ", current_queued_size=" << _tablet_worker_pool.get_queue_size(); @@ -150,6 +163,7 @@ void PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcControll const PTabletWriterCancelRequest* request, PTabletWriterCancelResult* response, google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); VLOG_RPC << "tablet writer cancel, id=" << request->id() << ", index_id=" << request->index_id() << ", sender_id=" << request->sender_id(); brpc::ClosureGuard closure_guard(done); @@ -177,6 +191,7 @@ void PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcControll const PCancelPlanFragmentRequest* request, PCancelPlanFragmentResult* result, google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); TUniqueId tid; tid.__set_hi(request->finst_id().hi()); @@ -201,6 +216,7 @@ template void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* cntl_base, const PFetchDataRequest* request, PFetchDataResult* result, google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); brpc::Controller* cntl = static_cast(cntl_base); GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done); _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx); @@ -210,6 +226,7 @@ template void PInternalServiceImpl::get_info(google::protobuf::RpcController* controller, const PProxyRequest* request, PProxyResult* response, google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); // PProxyRequest is defined in gensrc/proto/internal_service.proto // Currently it supports 2 kinds of requests: @@ -272,6 +289,7 @@ void PInternalServiceImpl::update_cache(google::protobuf::RpcController* cont const PUpdateCacheRequest* request, PCacheResponse* response, google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); _exec_env->result_cache()->update(request, response); } @@ -281,6 +299,7 @@ void PInternalServiceImpl::fetch_cache(google::protobuf::RpcController* contr const PFetchCacheRequest* request, PFetchCacheResult* result, google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); _exec_env->result_cache()->fetch(request, result); } @@ -290,6 +309,7 @@ void PInternalServiceImpl::clear_cache(google::protobuf::RpcController* contr const PClearCacheRequest* request, PCacheResponse* response, google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); _exec_env->result_cache()->clear(request, response); } @@ -299,6 +319,7 @@ void PInternalServiceImpl::merge_filter(::google::protobuf::RpcController* co const ::doris::PMergeFilterRequest* request, ::doris::PMergeFilterResponse* response, ::google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); auto buf = static_cast(controller)->request_attachment(); Status st = _exec_env->fragment_mgr()->merge_filter(request, buf.to_string().data()); @@ -313,6 +334,7 @@ void PInternalServiceImpl::apply_filter(::google::protobuf::RpcController* co const ::doris::PPublishFilterRequest* request, ::doris::PPublishFilterResponse* response, ::google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); auto attachment = static_cast(controller)->request_attachment(); UniqueId unique_id(request->query_id()); @@ -329,6 +351,7 @@ template void PInternalServiceImpl::send_data(google::protobuf::RpcController* controller, const PSendDataRequest* request, PSendDataResult* response, google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); TUniqueId fragment_instance_id; fragment_instance_id.hi = request->fragment_instance_id().hi(); @@ -352,6 +375,7 @@ template void PInternalServiceImpl::commit(google::protobuf::RpcController* controller, const PCommitRequest* request, PCommitResult* response, google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); TUniqueId fragment_instance_id; fragment_instance_id.hi = request->fragment_instance_id().hi(); @@ -370,6 +394,7 @@ template void PInternalServiceImpl::rollback(google::protobuf::RpcController* controller, const PRollbackRequest* request, PRollbackResult* response, google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); TUniqueId fragment_instance_id; fragment_instance_id.hi = request->fragment_instance_id().hi(); @@ -389,6 +414,7 @@ void PInternalServiceImpl::fold_constant_expr(google::protobuf::RpcController const PConstantExprRequest* request, PConstantExprResult* response, google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); brpc::Controller* cntl = static_cast(cntl_base); @@ -425,6 +451,7 @@ void PInternalServiceImpl::transmit_block(google::protobuf::RpcController* cn const PTransmitDataParams* request, PTransmitDataResult* response, google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); VLOG_ROW << "transmit data: fragment_instance_id=" << print_id(request->finst_id()) << " node=" << request->node_id(); brpc::Controller* cntl = static_cast(cntl_base); @@ -450,6 +477,7 @@ void PInternalServiceImpl::check_rpc_channel(google::protobuf::RpcController* const PCheckRPCChannelRequest* request, PCheckRPCChannelResponse* response, google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); response->mutable_status()->set_status_code(0); if (request->data().size() != request->size()) { @@ -477,6 +505,7 @@ void PInternalServiceImpl::reset_rpc_channel(google::protobuf::RpcController* const PResetRPCChannelRequest* request, PResetRPCChannelResponse* response, google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); response->mutable_status()->set_status_code(0); if (request->all()) { @@ -511,6 +540,7 @@ void PInternalServiceImpl::hand_shake(google::protobuf::RpcController* cntl_b const PHandShakeRequest* request, PHandShakeResponse* response, google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); if (request->has_hello()) { response->set_hello(request->hello()); diff --git a/be/src/util/bit_util.h b/be/src/util/bit_util.h index 7526a7728ac345..dabf87ee0f266a 100644 --- a/be/src/util/bit_util.h +++ b/be/src/util/bit_util.h @@ -25,7 +25,6 @@ #include "common/compiler_util.h" #include "gutil/bits.h" -#include "gutil/port.h" #include "util/cpu_info.h" #ifdef __aarch64__ #include "sse2neon.h" diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp index 0163815c760ecc..8bd9b058477f55 100644 --- a/be/src/util/doris_metrics.cpp +++ b/be/src/util/doris_metrics.cpp @@ -137,6 +137,7 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(memtable_flush_duration_us, MetricUnit::MIC DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(attach_task_thread_count, MetricUnit::NOUNIT); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(switch_thread_mem_tracker_count, MetricUnit::NOUNIT); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(switch_thread_mem_tracker_err_cb_count, MetricUnit::NOUNIT); +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(switch_bthread_count, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(memory_pool_bytes_total, MetricUnit::BYTES); DEFINE_GAUGE_CORE_METRIC_PROTOTYPE_2ARG(process_thread_num, MetricUnit::NOUNIT); @@ -286,6 +287,7 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) { INT_COUNTER_METRIC_REGISTER(_server_metric_entity, attach_task_thread_count); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, switch_thread_mem_tracker_count); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, switch_thread_mem_tracker_err_cb_count); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, switch_bthread_count); _server_metric_entity->register_hook(_s_hook_name, std::bind(&DorisMetrics::_update, this)); diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index aa59d1770b6a7a..602eb78a7e20ae 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -130,6 +130,8 @@ class DorisMetrics { IntCounter* attach_task_thread_count; IntCounter* switch_thread_mem_tracker_count; IntCounter* switch_thread_mem_tracker_err_cb_count; + // brpc server response count + IntCounter* switch_bthread_count; IntGauge* memory_pool_bytes_total; IntGauge* process_thread_num; diff --git a/be/src/util/file_utils.cpp b/be/src/util/file_utils.cpp index 3fd4a3bba6791d..a971acce864beb 100644 --- a/be/src/util/file_utils.cpp +++ b/be/src/util/file_utils.cpp @@ -34,6 +34,7 @@ #include "gutil/strings/strip.h" #include "gutil/strings/substitute.h" #include "olap/file_helper.h" +#include "runtime/thread_context.h" #include "util/defer_op.h" namespace doris { @@ -196,11 +197,13 @@ Status FileUtils::md5sum(const std::string& file, std::string* md5sum) { return Status::InternalError("failed to stat file"); } size_t file_len = statbuf.st_size; + CONSUME_THREAD_LOCAL_MEM_TRACKER(file_len); void* buf = mmap(0, file_len, PROT_READ, MAP_SHARED, fd, 0); unsigned char result[MD5_DIGEST_LENGTH]; MD5((unsigned char*)buf, file_len, result); munmap(buf, file_len); + RELEASE_THREAD_LOCAL_MEM_TRACKER(file_len); std::stringstream ss; for (int32_t i = 0; i < MD5_DIGEST_LENGTH; i++) { diff --git a/be/src/vec/common/allocator.h b/be/src/vec/common/allocator.h index 2a50dabf5c3745..216864dbb89a61 100644 --- a/be/src/vec/common/allocator.h +++ b/be/src/vec/common/allocator.h @@ -29,6 +29,7 @@ #include #include "common/status.h" +#include "runtime/thread_context.h" #ifdef NDEBUG #define ALLOCATOR_ASLR 0 @@ -137,15 +138,18 @@ class Allocator { } else if (old_size >= MMAP_THRESHOLD && new_size >= MMAP_THRESHOLD) { /// Resize mmap'd memory region. // CurrentMemoryTracker::realloc(old_size, new_size); + CONSUME_THREAD_LOCAL_MEM_TRACKER(new_size - old_size); // On apple and freebsd self-implemented mremap used (common/mremap.h) buf = clickhouse_mremap(buf, old_size, new_size, MREMAP_MAYMOVE, PROT_READ | PROT_WRITE, mmap_flags, -1, 0); - if (MAP_FAILED == buf) + if (MAP_FAILED == buf){ + RELEASE_THREAD_LOCAL_MEM_TRACKER(new_size - old_size); doris::vectorized::throwFromErrno("Allocator: Cannot mremap memory chunk from " + std::to_string(old_size) + " to " + std::to_string(new_size) + ".", doris::TStatusCode::VEC_CANNOT_MREMAP); + } /// No need for zero-fill, because mmap guarantees it. } else if (new_size < MMAP_THRESHOLD) { @@ -197,10 +201,13 @@ class Allocator { alignment, size), doris::TStatusCode::VEC_BAD_ARGUMENTS); + CONSUME_THREAD_LOCAL_MEM_TRACKER(size); buf = mmap(get_mmap_hint(), size, PROT_READ | PROT_WRITE, mmap_flags, -1, 0); - if (MAP_FAILED == buf) + if (MAP_FAILED == buf) { + RELEASE_THREAD_LOCAL_MEM_TRACKER(size); doris::vectorized::throwFromErrno(fmt::format("Allocator: Cannot mmap {}.", size), doris::TStatusCode::VEC_CANNOT_ALLOCATE_MEMORY); + } /// No need for zero-fill, because mmap guarantees it. } else { @@ -231,9 +238,12 @@ class Allocator { void free_no_track(void* buf, size_t size) { if (size >= MMAP_THRESHOLD) { - if (0 != munmap(buf, size)) + if (0 != munmap(buf, size)) { doris::vectorized::throwFromErrno(fmt::format("Allocator: Cannot munmap {}.", size), doris::TStatusCode::VEC_CANNOT_MUNMAP); + } else { + RELEASE_THREAD_LOCAL_MEM_TRACKER(size); + } } else { ::free(buf); } diff --git a/be/src/vec/exec/vexchange_node.cpp b/be/src/vec/exec/vexchange_node.cpp index ea4e61e7fe8eaf..57cbcff921fb46 100644 --- a/be/src/vec/exec/vexchange_node.cpp +++ b/be/src/vec/exec/vexchange_node.cpp @@ -66,6 +66,7 @@ Status VExchangeNode::prepare(RuntimeState* state) { Status VExchangeNode::open(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker()); + ADD_THREAD_LOCAL_MEM_TRACKER(_stream_recvr->mem_tracker()); RETURN_IF_ERROR(ExecNode::open(state)); if (_is_merging) { @@ -84,7 +85,6 @@ Status VExchangeNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* e Status VExchangeNode::get_next(RuntimeState* state, Block* block, bool* eos) { SCOPED_TIMER(runtime_profile()->total_time_counter()); SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker()); - ADD_THREAD_LOCAL_MEM_TRACKER(_stream_recvr->mem_tracker()); auto status = _stream_recvr->get_next(block, eos); if (block != nullptr) { if (_num_rows_returned + block->rows() < _limit) {