From da929ccf62d817914eee72356e649d104d118541 Mon Sep 17 00:00:00 2001 From: zhiqiang-hhhh Date: Wed, 27 Dec 2023 10:17:50 +0800 Subject: [PATCH 1/3] ref --- be/src/pipeline/pipeline_fragment_context.cpp | 6 ++---- be/src/runtime/fragment_mgr.cpp | 7 +++---- be/src/vec/runtime/vdata_stream_recvr.cpp | 14 ++++---------- be/src/vec/runtime/vdata_stream_recvr.h | 6 ++---- .../java/org/apache/doris/qe/QeProcessorImpl.java | 10 +++------- 5 files changed, 14 insertions(+), 29 deletions(-) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 7957983366373d..e6cd0c8c32a85a 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -161,10 +161,8 @@ void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason, // We need a more detail discussion. if (_query_ctx->cancel(true, msg, Status::Cancelled(msg))) { if (reason != PPlanFragmentCancelReason::LIMIT_REACH) { - LOG(WARNING) << "PipelineFragmentContext " - << PrintInstanceStandardInfo(_query_id, _fragment_instance_id) - << " is canceled, cancel message: " << msg; - + LOG_WARNING("Instance {} cancelled, reason {}", print_id(_fragment_instance_id), + msg.substr(0, 50)); } else { _set_is_report_on_cancel(false); // TODO bug llj fix this not projected by lock } diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 65e1ba475ae1c0..c16198267a1d93 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -402,10 +402,9 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { VLOG_DEBUG << "reportExecStatus params is " << apache::thrift::ThriftDebugString(params).c_str(); if (!exec_status.ok()) { - LOG(WARNING) << "report error status: " << exec_status.msg() - << " to coordinator: " << req.coord_addr - << ", query id: " << print_id(req.query_id) - << ", instance id: " << print_id(req.fragment_instance_id); + LOG_WARNING("Query {} instance {} report error status to coor {}:{}, error status: {}", + print_id(req.query_id), print_id(req.fragment_instance_id), + req.coord_addr.hostname, req.coord_addr.port, exec_status.to_string_no_stack()); } try { try { diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 0dc47363a8eef5..2a181dbb4d91a7 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -472,24 +472,18 @@ Status VDataStreamRecvr::get_next(Block* block, bool* eos) { } } -void VDataStreamRecvr::remove_sender(int sender_id, int be_number, Status exec_status) { +void VDataStreamRecvr::remove_sender(int sender_id, int be_number, Status exec_status, + QueryStatisticsPtr statistics) { if (!exec_status.ok()) { cancel_stream(exec_status); return; } int use_sender_id = _is_merging ? sender_id : 0; _sender_queues[use_sender_id]->decrement_senders(be_number); -} -void VDataStreamRecvr::remove_sender(int sender_id, int be_number, QueryStatisticsPtr statistics, - Status exec_status) { - if (!exec_status.ok()) { - cancel_stream(exec_status); - return; + if (statistics != nullptr) { + _sub_plan_query_statistics_recvr->insert(statistics, sender_id); } - int use_sender_id = _is_merging ? sender_id : 0; - _sender_queues[use_sender_id]->decrement_senders(be_number); - _sub_plan_query_statistics_recvr->insert(statistics, sender_id); } void VDataStreamRecvr::cancel_stream(Status exec_status) { diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index 122a9d763e1be9..b1fb387a9a7e48 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -110,10 +110,8 @@ class VDataStreamRecvr { // Indicate that a particular sender is done. Delegated to the appropriate // sender queue. Called from DataStreamMgr. - void remove_sender(int sender_id, int be_number, Status exec_status); - - void remove_sender(int sender_id, int be_number, QueryStatisticsPtr statistics, - Status exec_status); + void remove_sender(int sender_id, int be_number, Status exec_status, + QueryStatisticsPtr statistics = nullptr); void cancel_stream(Status exec_status); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java index a3ee3c09e4ed12..024878d637d75c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java @@ -140,9 +140,7 @@ public Map getInstancesNumPerUser() { public void unregisterQuery(TUniqueId queryId) { QueryInfo queryInfo = coordinatorMap.remove(queryId); if (queryInfo != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Deregister query id {}", DebugUtil.printId(queryId)); - } + LOG.info("Deregister query id {}", DebugUtil.printId(queryId)); if (queryInfo.getConnectContext() != null && !Strings.isNullOrEmpty(queryInfo.getConnectContext().getQualifiedUser()) @@ -152,7 +150,7 @@ public void unregisterQuery(TUniqueId queryId) { String user = queryInfo.getConnectContext().getQualifiedUser(); AtomicInteger instancesNum = userToInstancesCount.get(user); if (instancesNum == null) { - LOG.warn("WTF?? query {} in queryToInstancesNum but not in userToInstancesCount", + LOG.warn("Query {} in queryToInstancesNum but not in userToInstancesCount", DebugUtil.printId(queryId) ); } else { @@ -161,9 +159,7 @@ public void unregisterQuery(TUniqueId queryId) { } } } else { - if (LOG.isDebugEnabled()) { - LOG.debug("not found query {} when unregisterQuery", DebugUtil.printId(queryId)); - } + LOG.warn("Query {} not found when unregister.", DebugUtil.printId(queryId)); } // commit hive tranaction if needed From c20536c492ab24969218848d557264b385ad3a1e Mon Sep 17 00:00:00 2001 From: zhiqiang-hhhh Date: Wed, 27 Dec 2023 10:34:25 +0800 Subject: [PATCH 2/3] compile --- be/src/vec/sink/vdata_stream_sender.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 53c140ef5a8034..0ad21734738422 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -290,8 +290,8 @@ Status Channel::close_internal(Status exec_status) { if (is_local()) { if (_recvr_is_valid()) { if constexpr (std::is_same_v) { - _local_recvr->remove_sender(_parent->sender_id(), _be_number, - _parent->query_statisticsPtr(), exec_status); + _local_recvr->remove_sender(_parent->sender_id(), _be_number, exec_status, + _parent->query_statisticsPtr()); } else { _local_recvr->remove_sender(_parent->sender_id(), _be_number, exec_status); } From f680418fb1fbccd65a105bbc2796db9cc304ff1d Mon Sep 17 00:00:00 2001 From: zhiqiang-hhhh Date: Wed, 27 Dec 2023 11:41:31 +0800 Subject: [PATCH 3/3] compile 2 --- be/src/vec/sink/vdata_stream_sender.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 0ad21734738422..a38d0167084261 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -154,8 +154,8 @@ Status Channel::send_local_block(Status exec_status, bool eos) { if (eos) { /// TODO: Supported on pipelineX, we can hold QueryStatistics on the fragment instead of on instances. if constexpr (std::is_same_v) { - _local_recvr->remove_sender(_parent->sender_id(), _be_number, - _parent->query_statisticsPtr(), exec_status); + _local_recvr->remove_sender(_parent->sender_id(), _be_number, exec_status, + _parent->query_statisticsPtr()); } else { _local_recvr->remove_sender(_parent->sender_id(), _be_number, exec_status); }