diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index df7cd2ece28025..4c677216e6ad64 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -186,13 +186,6 @@ void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason, // We need a more detail discussion. _query_ctx->cancel(msg, Status::Cancelled(msg)); - if (reason == PPlanFragmentCancelReason::INTERNAL_ERROR && !msg.empty()) { - if (msg.find("Pipeline task leak.") != std::string::npos) { - LOG_WARNING("PipelineFragmentContext is cancelled due to illegal state : {}", - this->debug_string()); - } - } - if (reason == PPlanFragmentCancelReason::LIMIT_REACH) { _is_report_on_cancel = false; } else { diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 55a25718e0043d..d736879f0eb457 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -147,6 +147,14 @@ void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason, LOG(WARNING) << "PipelineXFragmentContext is cancelled due to timeout : " << debug_string(); } _query_ctx->cancel(msg, Status::Cancelled(msg), _fragment_id); + + if (reason == PPlanFragmentCancelReason::INTERNAL_ERROR && !msg.empty()) { + if (msg.find("Pipeline task leak.") != std::string::npos) { + LOG_WARNING("PipelineFragmentContext is cancelled due to illegal state : {}", + this->debug_string()); + } + } + if (reason == PPlanFragmentCancelReason::LIMIT_REACH) { _is_report_on_cancel = false; } else { diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 74f42a28b8a0c8..b68839a0d6245d 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -183,6 +183,7 @@ static Status _do_fetch_running_queries_rpc(const FrontendInfo& fe_info, query_set = std::unordered_set(rpc_result.running_queries.begin(), rpc_result.running_queries.end()); + return Status::OK(); }; @@ -1310,8 +1311,8 @@ void FragmentMgr::cancel_worker() { itr != running_queries_on_all_fes.end()) { // Query not found on this frontend, and the query arrives before the last check if (itr->second.find(q_ctx->query_id()) == itr->second.end() && - q_ctx->get_query_arrival_timestamp().tv_nsec < - check_invalid_query_last_timestamp.tv_nsec && + q_ctx->get_query_arrival_timestamp().tv_sec < + check_invalid_query_last_timestamp.tv_sec && q_ctx->get_query_source() == QuerySource::INTERNAL_FRONTEND) { if (q_ctx->enable_pipeline_x_exec()) { queries_pipeline_task_leak.push_back(q_ctx->query_id());