diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index fc70a6d3dbf972..ac9bff86df5812 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -119,16 +119,19 @@ class ExecEnv { std::shared_ptr process_mem_tracker() { return _process_mem_tracker; } void set_global_mem_tracker(const std::shared_ptr& process_tracker, - const std::shared_ptr& orphan_tracker) { + const std::shared_ptr& orphan_tracker, + const std::shared_ptr& bthread_mem_tracker) { _process_mem_tracker = process_tracker; _orphan_mem_tracker = orphan_tracker; _orphan_mem_tracker_raw = orphan_tracker.get(); + _bthread_mem_tracker = bthread_mem_tracker; } std::shared_ptr allocator_cache_mem_tracker() { return _allocator_cache_mem_tracker; } std::shared_ptr orphan_mem_tracker() { return _orphan_mem_tracker; } MemTrackerLimiter* orphan_mem_tracker_raw() { return _orphan_mem_tracker_raw; } + std::shared_ptr bthread_mem_tracker() { return _bthread_mem_tracker; } std::shared_ptr query_pool_mem_tracker() { return _query_pool_mem_tracker; } std::shared_ptr load_pool_mem_tracker() { return _load_pool_mem_tracker; } MemTrackerTaskPool* task_pool_mem_tracker_registry() { return _task_pool_mem_tracker_registry; } @@ -227,6 +230,8 @@ class ExecEnv { // and the consumption of the orphan mem tracker is close to 0, but greater than 0. std::shared_ptr _orphan_mem_tracker; MemTrackerLimiter* _orphan_mem_tracker_raw; + // Bthread default mem tracker + std::shared_ptr _bthread_mem_tracker; // The ancestor for all querys tracker. std::shared_ptr _query_pool_mem_tracker; // The ancestor for all load tracker. diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index a55f5477782980..f2b550498a60de 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -208,7 +208,8 @@ Status ExecEnv::_init_mem_tracker() { std::make_shared(global_memory_limit_bytes, "Process"); _orphan_mem_tracker = std::make_shared(-1, "Orphan", _process_mem_tracker); _orphan_mem_tracker_raw = _orphan_mem_tracker.get(); - thread_context()->_thread_mem_tracker_mgr->init_impl(); + _bthread_mem_tracker = std::make_shared(-1, "Bthread", _orphan_mem_tracker); + thread_context()->_thread_mem_tracker_mgr->init(); thread_context()->_thread_mem_tracker_mgr->set_check_attach(false); #if defined(USE_MEM_TRACKER) && !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && \ !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) && !defined(USE_JEMALLOC) diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 1ca75b2af4d427..20639326f8593e 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -74,7 +74,7 @@ MemTrackerLimiter::MemTrackerLimiter(int64_t byte_limit, const std::string& labe MemTrackerLimiter::~MemTrackerLimiter() { // TCMalloc hook will be triggered during destructor memtracker, may cause crash. - if (_label == "Process") doris::thread_context_ptr._init = false; + if (_label == "Process") doris::thread_context_ptr.init = false; DCHECK(remain_child_count() == 0 || _label == "Process"); // In order to ensure `consumption of all limiter trackers` + `orphan tracker consumption` = `process tracker consumption` // in real time. Merge its consumption into orphan when parent is process, to avoid repetition. @@ -83,9 +83,7 @@ MemTrackerLimiter::~MemTrackerLimiter() { _consumption->current_value()); } if (_reset_zero) { - ExecEnv::GetInstance()->orphan_mem_tracker_raw()->cache_consume_local( - _consumption->current_value()); - cache_consume_local(-_consumption->current_value()); + reset_zero(); _all_ancestors.clear(); _all_ancestors.push_back(ExecEnv::GetInstance()->orphan_mem_tracker_raw()); } @@ -212,7 +210,6 @@ std::string MemTrackerLimiter::log_usage(int max_recursive_depth, } void MemTrackerLimiter::print_log_usage(const std::string& msg) { - DCHECK(_limit != -1); // only print the tracker log_usage in be log. std::string detail = msg; detail += "\n " + fmt::format( diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 7b072e9b9808f0..9a40e79b662e53 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -129,6 +129,12 @@ class MemTrackerLimiter final : public MemTracker { void enable_print_log_usage() { _print_log_usage = true; } void enable_reset_zero() { _reset_zero = true; } + void reset_zero() { + ExecEnv::GetInstance()->orphan_mem_tracker_raw()->cache_consume_local( + _consumption->current_value()); + cache_consume_local(-_consumption->current_value()); + } + // Logs the usage of this tracker limiter and optionally its children (recursively). // If 'logged_consumption' is non-nullptr, sets the consumption value logged. // 'max_recursive_depth' specifies the maximum number of levels of children diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 4c9099528d43bd..0b56911a77bff9 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -44,9 +44,15 @@ class ThreadMemTrackerMgr { ThreadMemTrackerMgr() {} ~ThreadMemTrackerMgr() { - flush_untracked_mem(); - DCHECK(_consumer_tracker_stack.empty()); - DCHECK(_limiter_tracker_stack.size() == 1); + // if _init == false, exec env is not initialized when init(). and never consumed mem tracker once. + if (_init) { + flush_untracked_mem(); + if (bthread_self() == 0) { + DCHECK(_consumer_tracker_stack.empty()); + DCHECK(_limiter_tracker_stack.size() == 1) + << ", limiter_tracker_stack.size(): " << _limiter_tracker_stack.size(); + } + } } // only for tcmalloc hook @@ -60,12 +66,15 @@ class ThreadMemTrackerMgr { // to avoid memory tracking loss. void init(); void init_impl(); + void clear(); // After attach, the current thread TCMalloc Hook starts to consume/release task mem_tracker void attach_limiter_tracker(const std::string& task_id, const TUniqueId& fragment_instance_id, const std::shared_ptr& mem_tracker); - void detach_limiter_tracker(); + // Usually there are only two layers, the first is the default trackerOrphan; + // the second is the query tracker or bthread tracker. + int64_t get_attach_layers() { return _limiter_tracker_stack.size(); } // Must be fast enough! Thread update_tracker may be called very frequently. // So for performance, add tracker as early as possible, and then call update_tracker. @@ -88,11 +97,11 @@ class ThreadMemTrackerMgr { bool is_attach_query() { return _fragment_instance_id_stack.back() != TUniqueId(); } std::shared_ptr limiter_mem_tracker() { - if (_limiter_tracker_raw == nullptr) init_impl(); + if (!_init) init(); return _limiter_tracker_stack.back(); } MemTrackerLimiter* limiter_mem_tracker_raw() { - if (_limiter_tracker_raw == nullptr) init_impl(); + if (!_init) init(); return _limiter_tracker_raw; } @@ -119,6 +128,8 @@ class ThreadMemTrackerMgr { void exceeded(const std::string& failed_msg); private: + // is false: ExecEnv::GetInstance()->initialized() = false when thread local is initialized + bool _init = false; // Cache untracked mem, only update to _untracked_mems when switching mem tracker. // Frequent calls to unordered_map _untracked_mems[] in consume will degrade performance. int64_t _untracked_mem = 0; @@ -141,23 +152,28 @@ class ThreadMemTrackerMgr { }; inline void ThreadMemTrackerMgr::init() { + DCHECK(_limiter_tracker_stack.size() == 0); + DCHECK(_limiter_tracker_raw == nullptr); DCHECK(_consumer_tracker_stack.empty()); - // _limiter_tracker_stack[0] = orphan_mem_tracker - DCHECK(_limiter_tracker_stack.size() <= 1) - << "limiter_tracker_stack.size(): " << _limiter_tracker_stack.size(); - if (_limiter_tracker_raw == nullptr && ExecEnv::GetInstance()->initialized()) { - init_impl(); - } + init_impl(); } inline void ThreadMemTrackerMgr::init_impl() { - DCHECK(_limiter_tracker_stack.size() == 0); - DCHECK(_limiter_tracker_raw == nullptr); _limiter_tracker_stack.push_back(ExecEnv::GetInstance()->orphan_mem_tracker()); _limiter_tracker_raw = ExecEnv::GetInstance()->orphan_mem_tracker_raw(); _task_id_stack.push_back(""); _fragment_instance_id_stack.push_back(TUniqueId()); _check_limit = true; + _init = true; +} + +inline void ThreadMemTrackerMgr::clear() { + flush_untracked_mem(); + std::vector>().swap(_limiter_tracker_stack); + std::vector().swap(_consumer_tracker_stack); + std::vector().swap(_task_id_stack); + std::vector().swap(_fragment_instance_id_stack); + init_impl(); } inline void ThreadMemTrackerMgr::push_consumer_tracker(MemTracker* tracker) { @@ -196,7 +212,7 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() { // Temporary memory may be allocated during the consumption of the mem tracker, which will lead to entering // the TCMalloc Hook again, so suspend consumption to avoid falling into an infinite loop. _stop_consume = true; - if (_limiter_tracker_raw == nullptr) init_impl(); + if (!_init) init(); DCHECK(_limiter_tracker_raw); old_untracked_mem = _untracked_mem; if (CheckLimit) { diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp index 432c5be8798f1f..7631abaa7b0d96 100644 --- a/be/src/runtime/thread_context.cpp +++ b/be/src/runtime/thread_context.cpp @@ -26,7 +26,7 @@ DEFINE_STATIC_THREAD_LOCAL(ThreadContext, ThreadContextPtr, _ptr); ThreadContextPtr::ThreadContextPtr() { INIT_STATIC_THREAD_LOCAL(ThreadContext, _ptr); - _init = true; + init = true; } AttachTask::AttachTask(const std::shared_ptr& mem_tracker, @@ -70,34 +70,4 @@ AddThreadMemTrackerConsumer::~AddThreadMemTrackerConsumer() { } } -SwitchBthread::SwitchBthread() { - _bthread_context = static_cast(bthread_getspecific(btls_key)); - // First call to bthread_getspecific (and before any bthread_setspecific) returns NULL - if (_bthread_context == nullptr) { - // Create thread-local data on demand. - _bthread_context = new ThreadContext; - // set the data so that next time bthread_getspecific in the thread returns the data. - CHECK_EQ(0, bthread_setspecific(btls_key, _bthread_context)); - } else { - DCHECK(_bthread_context->type() == ThreadContext::TaskType::UNKNOWN); - _bthread_context->_thread_mem_tracker_mgr->flush_untracked_mem(); - } - _bthread_context->_thread_mem_tracker_mgr->init(); - _bthread_context->set_type(ThreadContext::TaskType::BRPC); - bthread_context_key = btls_key; - bthread_context = _bthread_context; -} - -SwitchBthread::~SwitchBthread() { - DCHECK(_bthread_context != nullptr); - _bthread_context->_thread_mem_tracker_mgr->flush_untracked_mem(); - _bthread_context->_thread_mem_tracker_mgr->init(); - _bthread_context->set_type(ThreadContext::TaskType::UNKNOWN); - bthread_context = nullptr; - bthread_context_key = EMPTY_BTLS_KEY; -#ifndef NDEBUG - DorisMetrics::instance()->switch_bthread_count->increment(1); -#endif // NDEBUG -} - } // namespace doris diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 46b2e86d44cff9..e15fd8f924355f 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -33,16 +33,12 @@ // Add thread mem tracker consumer during query execution. #define SCOPED_CONSUME_MEM_TRACKER(mem_tracker) \ auto VARNAME_LINENUM(add_mem_consumer) = doris::AddThreadMemTrackerConsumer(mem_tracker) - // Attach to task when thread starts #define SCOPED_ATTACH_TASK(arg1, ...) \ auto VARNAME_LINENUM(attach_task) = AttachTask(arg1, ##__VA_ARGS__) - -#define SCOPED_SWITCH_BTHREAD_TLS() auto VARNAME_LINENUM(switch_bthread) = SwitchBthread() #else #define SCOPED_CONSUME_MEM_TRACKER(mem_tracker) (void)0 #define SCOPED_ATTACH_TASK(arg1, ...) (void)0 -#define SCOPED_SWITCH_BTHREAD_TLS() (void)0 #endif namespace doris { @@ -81,7 +77,7 @@ class ThreadContextPtr { // Cannot add destructor `~ThreadContextPtr`, otherwise it will no longer be of type POD, the reason is as above. // TCMalloc hook is triggered during ThreadContext construction, which may lead to deadlock. - bool _init = false; + bool init = false; DECLARE_STATIC_THREAD_LOCAL(ThreadContext, _ptr); }; @@ -91,7 +87,7 @@ inline thread_local ThreadContextPtr thread_context_ptr; // To avoid performance problems caused by frequently calling `bthread_getspecific` to obtain bthread TLS // in tcmalloc hook, cache the key and value of bthread TLS in pthread TLS. inline thread_local ThreadContext* bthread_context; -inline thread_local bthread_key_t bthread_context_key; +inline thread_local bthread_t bthread_id; // The thread context saves some info about a working thread. // 2 required info: @@ -122,18 +118,18 @@ class ThreadContext { } ~ThreadContext() { - // Restore to the memory state before _init=true to ensure accurate overall memory statistics. + // Restore to the memory state before init=true to ensure accurate overall memory statistics. // Thereby ensuring that the memory alloc size is not tracked during the initialization of the - // ThreadContext before `_init = true in ThreadContextPtr()`, + // ThreadContext before `init = true in ThreadContextPtr()`, // Equal to the size of the memory release that is not tracked during the destruction of the - // ThreadContext after `_init = false in ~ThreadContextPtr()`, - init(); - thread_context_ptr._init = false; + // ThreadContext after `init = false in ~ThreadContextPtr()`, + if (ExecEnv::GetInstance()->initialized()) _thread_mem_tracker_mgr->clear(); + thread_context_ptr.init = false; } void init() { _type = TaskType::UNKNOWN; - _thread_mem_tracker_mgr->init(); + if (ExecEnv::GetInstance()->initialized()) _thread_mem_tracker_mgr->init(); _thread_id = get_thread_id(); } @@ -192,17 +188,42 @@ class ThreadContext { TUniqueId _fragment_instance_id; }; -static void update_bthread_context() { - if (btls_key != bthread_context_key) { - // pthread switch occurs, updating bthread_context and bthread_context_key cached in pthread tls. - bthread_context = static_cast(bthread_getspecific(btls_key)); - bthread_context_key = btls_key; +static void attach_bthread() { + bthread_id = bthread_self(); + bthread_context = static_cast(bthread_getspecific(btls_key)); + if (bthread_context == nullptr) { + // A new bthread starts, two scenarios: + // 1. First call to bthread_getspecific (and before any bthread_setspecific) returns NULL + // 2. There are not enough reusable btls in btls pool. +#ifndef BE_TEST + DCHECK(ExecEnv::GetInstance()->initialized()); +#endif + // Create thread-local data on demand. + bthread_context = new ThreadContext; + std::shared_ptr btls_tracker = + std::make_shared(-1, "Bthread:id=" + std::to_string(bthread_id), + ExecEnv::GetInstance()->bthread_mem_tracker()); + bthread_context->attach_task(ThreadContext::TaskType::BRPC, "", TUniqueId(), btls_tracker); + // set the data so that next time bthread_getspecific in the thread returns the data. + CHECK_EQ(0, bthread_setspecific(btls_key, bthread_context)); + } else { + // two scenarios: + // 1. A new bthread starts, but get a reuses btls. + // 2. A pthread switch occurs. Because the pthread switch cannot be accurately identified at the moment. + // So tracker call reset 0 like reuses btls. + DCHECK(bthread_context->_thread_mem_tracker_mgr->get_attach_layers() == 2); + bthread_context->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->reset_zero(); } } static ThreadContext* thread_context() { - if (btls_key != EMPTY_BTLS_KEY && bthread_context != nullptr) { - update_bthread_context(); + if (bthread_self() != 0) { + if (bthread_self() != bthread_id) { + // A new bthread starts or pthread switch occurs. + thread_context_ptr.init = false; + attach_bthread(); + thread_context_ptr.init = true; + } return bthread_context; } else { return thread_context_ptr._ptr; @@ -228,16 +249,6 @@ class AddThreadMemTrackerConsumer { ~AddThreadMemTrackerConsumer(); }; -class SwitchBthread { -public: - explicit SwitchBthread(); - - ~SwitchBthread(); - -private: - ThreadContext* _bthread_context; -}; - class StopCheckThreadMemTrackerLimit { public: explicit StopCheckThreadMemTrackerLimit() { @@ -273,30 +284,22 @@ class StopCheckThreadMemTrackerLimit { ->_thread_mem_tracker_mgr->last_consumer_tracker(), \ msg), \ ##__VA_ARGS__); - // Mem Hook to consume thread mem tracker -#define MEM_MALLOC_HOOK(size) \ - do { \ - if (doris::btls_key != doris::EMPTY_BTLS_KEY && doris::bthread_context != nullptr) { \ - doris::update_bthread_context(); \ - doris::bthread_context->_thread_mem_tracker_mgr->consume(size); \ - } else if (LIKELY(doris::thread_context_ptr._init)) { \ - doris::thread_context_ptr._ptr->_thread_mem_tracker_mgr->consume(size); \ - } else { \ - doris::ThreadMemTrackerMgr::consume_no_attach(size); \ - } \ +#define MEM_MALLOC_HOOK(size) \ + do { \ + if (doris::thread_context_ptr.init) { \ + doris::thread_context()->_thread_mem_tracker_mgr->consume(size); \ + } else { \ + doris::ThreadMemTrackerMgr::consume_no_attach(size); \ + } \ } while (0) - -#define MEM_FREE_HOOK(size) \ - do { \ - if (doris::btls_key != doris::EMPTY_BTLS_KEY && doris::bthread_context != nullptr) { \ - doris::update_bthread_context(); \ - doris::bthread_context->_thread_mem_tracker_mgr->consume(-size); \ - } else if (doris::thread_context_ptr._init) { \ - doris::thread_context_ptr._ptr->_thread_mem_tracker_mgr->consume(-size); \ - } else { \ - doris::ThreadMemTrackerMgr::consume_no_attach(-size); \ - } \ +#define MEM_FREE_HOOK(size) \ + do { \ + if (doris::thread_context_ptr.init) { \ + doris::thread_context()->_thread_mem_tracker_mgr->consume(-size); \ + } else { \ + doris::ThreadMemTrackerMgr::consume_no_attach(-size); \ + } \ } while (0) #else #define STOP_CHECK_THREAD_MEM_TRACKER_LIMIT() (void)0 diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 87f89f48e00691..d7f1834843c123 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -101,7 +101,6 @@ void PInternalServiceImpl::transmit_data(google::protobuf::RpcController* cntl_b const PTransmitDataParams* request, PTransmitDataResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); // TODO(zxy) delete in 1.2 version brpc::Controller* cntl = static_cast(cntl_base); attachment_transfer_request_row_batch(request, cntl); @@ -113,7 +112,6 @@ void PInternalServiceImpl::transmit_data_by_http(google::protobuf::RpcController const PEmptyRequest* request, PTransmitDataResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); PTransmitDataParams* request_raw = new PTransmitDataParams(); google::protobuf::Closure* done_raw = new NewHttpClosure(request_raw, done); @@ -169,7 +167,6 @@ void PInternalServiceImpl::tablet_writer_open(google::protobuf::RpcController* c const PTabletWriterOpenRequest* request, PTabletWriterOpenResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); VLOG_RPC << "tablet writer open, id=" << request->id() << ", index_id=" << request->index_id() << ", txn_id=" << request->txn_id(); brpc::ClosureGuard closure_guard(done); @@ -188,7 +185,6 @@ void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* c google::protobuf::Closure* done) { auto span = telemetry::start_rpc_server_span("exec_plan_fragment", cntl_base); auto scope = OpentelemetryScope {span}; - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); auto st = Status::OK(); bool compact = request->has_compact() ? request->compact() : false; @@ -214,7 +210,6 @@ void PInternalServiceImpl::exec_plan_fragment_start(google::protobuf::RpcControl google::protobuf::Closure* done) { auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", controller); auto scope = OpentelemetryScope {span}; - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); auto st = _exec_env->fragment_mgr()->start_query_execution(request); st.to_protobuf(result->mutable_status()); @@ -224,7 +219,6 @@ void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcControll const PTabletWriterAddBlockRequest* request, PTabletWriterAddBlockResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); // TODO(zxy) delete in 1.2 version brpc::Controller* cntl = static_cast(cntl_base); attachment_transfer_request_block(request, cntl); @@ -235,7 +229,6 @@ void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcControll void PInternalServiceImpl::tablet_writer_add_block_by_http( google::protobuf::RpcController* cntl_base, const ::doris::PEmptyRequest* request, PTabletWriterAddBlockResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); PTabletWriterAddBlockRequest* request_raw = new PTabletWriterAddBlockRequest(); google::protobuf::Closure* done_raw = new NewHttpClosure(request_raw, done); @@ -263,7 +256,6 @@ void PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcControl int64_t execution_time_ns = 0; { SCOPED_RAW_TIMER(&execution_time_ns); - SCOPED_SWITCH_BTHREAD_TLS(); auto st = _exec_env->load_channel_mgr()->add_batch(*request, response); if (!st.ok()) { @@ -283,14 +275,12 @@ void PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcControll const PTabletWriterAddBatchRequest* request, PTabletWriterAddBatchResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); _tablet_writer_add_batch(cntl_base, request, response, done); } void PInternalServiceImpl::tablet_writer_add_batch_by_http( google::protobuf::RpcController* cntl_base, const ::doris::PEmptyRequest* request, PTabletWriterAddBatchResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); PTabletWriterAddBatchRequest* request_raw = new PTabletWriterAddBatchRequest(); google::protobuf::Closure* done_raw = new NewHttpClosure(request_raw, done); @@ -321,7 +311,6 @@ void PInternalServiceImpl::_tablet_writer_add_batch(google::protobuf::RpcControl int64_t execution_time_ns = 0; { SCOPED_RAW_TIMER(&execution_time_ns); - SCOPED_SWITCH_BTHREAD_TLS(); // TODO(zxy) delete in 1.2 version brpc::Controller* cntl = static_cast(cntl_base); @@ -345,7 +334,6 @@ void PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController* const PTabletWriterCancelRequest* request, PTabletWriterCancelResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); VLOG_RPC << "tablet writer cancel, id=" << request->id() << ", index_id=" << request->index_id() << ", sender_id=" << request->sender_id(); brpc::ClosureGuard closure_guard(done); @@ -391,7 +379,6 @@ void PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* google::protobuf::Closure* done) { auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", cntl_base); auto scope = OpentelemetryScope {span}; - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); TUniqueId tid; tid.__set_hi(request->finst_id().hi()); @@ -415,7 +402,6 @@ void PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* cntl_base, const PFetchDataRequest* request, PFetchDataResult* result, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::Controller* cntl = static_cast(cntl_base); GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done); _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx); @@ -424,7 +410,6 @@ void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* cntl_base void PInternalServiceImpl::get_info(google::protobuf::RpcController* controller, const PProxyRequest* request, PProxyResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); // PProxyRequest is defined in gensrc/proto/internal_service.proto // Currently it supports 2 kinds of requests: @@ -485,7 +470,6 @@ void PInternalServiceImpl::get_info(google::protobuf::RpcController* controller, void PInternalServiceImpl::update_cache(google::protobuf::RpcController* controller, const PUpdateCacheRequest* request, PCacheResponse* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); _exec_env->result_cache()->update(request, response); } @@ -493,7 +477,6 @@ void PInternalServiceImpl::update_cache(google::protobuf::RpcController* control void PInternalServiceImpl::fetch_cache(google::protobuf::RpcController* controller, const PFetchCacheRequest* request, PFetchCacheResult* result, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); _exec_env->result_cache()->fetch(request, result); } @@ -501,7 +484,6 @@ void PInternalServiceImpl::fetch_cache(google::protobuf::RpcController* controll void PInternalServiceImpl::clear_cache(google::protobuf::RpcController* controller, const PClearCacheRequest* request, PCacheResponse* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); _exec_env->result_cache()->clear(request, response); } @@ -510,7 +492,6 @@ void PInternalServiceImpl::merge_filter(::google::protobuf::RpcController* contr const ::doris::PMergeFilterRequest* request, ::doris::PMergeFilterResponse* response, ::google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); auto buf = static_cast(controller)->request_attachment(); Status st = _exec_env->fragment_mgr()->merge_filter(request, buf.to_string().data()); @@ -524,7 +505,6 @@ void PInternalServiceImpl::apply_filter(::google::protobuf::RpcController* contr const ::doris::PPublishFilterRequest* request, ::doris::PPublishFilterResponse* response, ::google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); auto attachment = static_cast(controller)->request_attachment(); UniqueId unique_id(request->query_id()); @@ -540,7 +520,6 @@ void PInternalServiceImpl::apply_filter(::google::protobuf::RpcController* contr void PInternalServiceImpl::send_data(google::protobuf::RpcController* controller, const PSendDataRequest* request, PSendDataResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); TUniqueId fragment_instance_id; fragment_instance_id.hi = request->fragment_instance_id().hi(); @@ -563,7 +542,6 @@ void PInternalServiceImpl::send_data(google::protobuf::RpcController* controller void PInternalServiceImpl::commit(google::protobuf::RpcController* controller, const PCommitRequest* request, PCommitResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); TUniqueId fragment_instance_id; fragment_instance_id.hi = request->fragment_instance_id().hi(); @@ -581,7 +559,6 @@ void PInternalServiceImpl::commit(google::protobuf::RpcController* controller, void PInternalServiceImpl::rollback(google::protobuf::RpcController* controller, const PRollbackRequest* request, PRollbackResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); TUniqueId fragment_instance_id; fragment_instance_id.hi = request->fragment_instance_id().hi(); @@ -600,7 +577,6 @@ void PInternalServiceImpl::fold_constant_expr(google::protobuf::RpcController* c const PConstantExprRequest* request, PConstantExprResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); brpc::Controller* cntl = static_cast(cntl_base); @@ -635,7 +611,6 @@ void PInternalServiceImpl::transmit_block(google::protobuf::RpcController* cntl_ const PTransmitDataParams* request, PTransmitDataResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); // TODO(zxy) delete in 1.2 version brpc::Controller* cntl = static_cast(cntl_base); attachment_transfer_request_block(request, cntl); @@ -647,7 +622,6 @@ void PInternalServiceImpl::transmit_block_by_http(google::protobuf::RpcControlle const PEmptyRequest* request, PTransmitDataResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); PTransmitDataParams* request_raw = new PTransmitDataParams(); google::protobuf::Closure* done_raw = new NewHttpClosure(request_raw, done); @@ -703,7 +677,6 @@ void PInternalServiceImpl::check_rpc_channel(google::protobuf::RpcController* co const PCheckRPCChannelRequest* request, PCheckRPCChannelResponse* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); response->mutable_status()->set_status_code(0); if (request->data().size() != request->size()) { @@ -730,7 +703,6 @@ void PInternalServiceImpl::reset_rpc_channel(google::protobuf::RpcController* co const PResetRPCChannelRequest* request, PResetRPCChannelResponse* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); response->mutable_status()->set_status_code(0); if (request->all()) { @@ -764,7 +736,6 @@ void PInternalServiceImpl::hand_shake(google::protobuf::RpcController* cntl_base const PHandShakeRequest* request, PHandShakeResponse* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); if (request->has_hello()) { response->set_hello(request->hello()); diff --git a/be/test/testutil/run_all_tests.cpp b/be/test/testutil/run_all_tests.cpp index ea518722113ae4..ffe239cc0f9368 100644 --- a/be/test/testutil/run_all_tests.cpp +++ b/be/test/testutil/run_all_tests.cpp @@ -30,9 +30,12 @@ int main(int argc, char** argv) { std::shared_ptr process_mem_tracker = std::make_shared(-1, "Process"); - std::shared_ptr _orphan_mem_tracker = + std::shared_ptr orphan_mem_tracker = std::make_shared(-1, "Orphan", process_mem_tracker); - doris::ExecEnv::GetInstance()->set_global_mem_tracker(process_mem_tracker, _orphan_mem_tracker); + std::shared_ptr bthread_mem_tracker = + std::make_shared(-1, "Bthread", orphan_mem_tracker); + doris::ExecEnv::GetInstance()->set_global_mem_tracker(process_mem_tracker, orphan_mem_tracker, + bthread_mem_tracker); doris::thread_context()->_thread_mem_tracker_mgr->init(); doris::TabletSchemaCache::create_global_schema_cache(); doris::StoragePageCache::create_global_cache(1 << 30, 10);