diff --git a/be/src/runtime/query_statistics.cpp b/be/src/runtime/query_statistics.cpp index 22c18faa1e6f72..dfe63f576260db 100644 --- a/be/src/runtime/query_statistics.cpp +++ b/be/src/runtime/query_statistics.cpp @@ -88,9 +88,10 @@ void QueryStatistics::merge(QueryStatisticsRecvr* recvr) { } void QueryStatistics::merge(QueryStatisticsRecvr* recvr, int sender_id) { - auto it = recvr->_query_statistics.find(sender_id); - if (it != recvr->_query_statistics.end()) { - merge(*it->second); + DCHECK(recvr != nullptr); + auto QueryStatisticsptr = recvr->find(sender_id); + if (QueryStatisticsptr) { + merge(*QueryStatisticsptr); } } @@ -120,4 +121,13 @@ void QueryStatisticsRecvr::insert(QueryStatisticsPtr statistics, int sender_id) _query_statistics[sender_id] = statistics; } +QueryStatisticsPtr QueryStatisticsRecvr::find(int sender_id) { + std::lock_guard l(_lock); + auto it = _query_statistics.find(sender_id); + if (it != _query_statistics.end()) { + return it->second; + } + return nullptr; +} + } // namespace doris diff --git a/be/src/runtime/query_statistics.h b/be/src/runtime/query_statistics.h index 42c1457472f6ba..d32b7a60e2a7e0 100644 --- a/be/src/runtime/query_statistics.h +++ b/be/src/runtime/query_statistics.h @@ -140,6 +140,8 @@ class QueryStatisticsRecvr { // using local_exchange for transmission, only need to hold a shared pointer. void insert(QueryStatisticsPtr statistics, int sender_id); + QueryStatisticsPtr find(int sender_id); + private: friend class QueryStatistics; diff --git a/be/src/vec/exec/vexchange_node.cpp b/be/src/vec/exec/vexchange_node.cpp index 797ff590ca99fd..17a85767ec793d 100644 --- a/be/src/vec/exec/vexchange_node.cpp +++ b/be/src/vec/exec/vexchange_node.cpp @@ -149,7 +149,10 @@ Status VExchangeNode::collect_query_statistics(QueryStatistics* statistics) { } Status VExchangeNode::collect_query_statistics(QueryStatistics* statistics, int sender_id) { RETURN_IF_ERROR(ExecNode::collect_query_statistics(statistics)); - statistics->merge(_sub_plan_query_statistics_recvr.get(), sender_id); + DCHECK(statistics != nullptr); + if (_sub_plan_query_statistics_recvr) { + statistics->merge(_sub_plan_query_statistics_recvr.get(), sender_id); + } return Status::OK(); } Status VExchangeNode::close(RuntimeState* state) {