From 1c902bf3b38aff9c89f2332d99902b25830b489b Mon Sep 17 00:00:00 2001 From: zhiqiang-hhhh Date: Wed, 21 Aug 2024 14:14:41 +0800 Subject: [PATCH 1/3] FIX --- be/src/common/config.cpp | 3 ++- be/src/pipeline/pipeline_fragment_context.cpp | 7 ------- .../pipeline_x/pipeline_x_fragment_context.cpp | 8 ++++++++ be/src/runtime/fragment_mgr.cpp | 16 ++++++++++++++++ 4 files changed, 26 insertions(+), 8 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 461e328044d259..fa26d93967ebcc 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1270,7 +1270,8 @@ DEFINE_mBool(enable_parquet_page_index, "false"); DEFINE_mBool(ignore_not_found_file_in_external_table, "true"); -DEFINE_mInt64(pipeline_task_leakage_detect_period_secs, "60"); +// TODO: Revert to 60s after bug fixed. +DEFINE_mInt64(pipeline_task_leakage_detect_period_secs, "1"); // clang-format off #ifdef BE_TEST diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index df7cd2ece28025..4c677216e6ad64 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -186,13 +186,6 @@ void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason, // We need a more detail discussion. _query_ctx->cancel(msg, Status::Cancelled(msg)); - if (reason == PPlanFragmentCancelReason::INTERNAL_ERROR && !msg.empty()) { - if (msg.find("Pipeline task leak.") != std::string::npos) { - LOG_WARNING("PipelineFragmentContext is cancelled due to illegal state : {}", - this->debug_string()); - } - } - if (reason == PPlanFragmentCancelReason::LIMIT_REACH) { _is_report_on_cancel = false; } else { diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 55a25718e0043d..d736879f0eb457 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -147,6 +147,14 @@ void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason, LOG(WARNING) << "PipelineXFragmentContext is cancelled due to timeout : " << debug_string(); } _query_ctx->cancel(msg, Status::Cancelled(msg), _fragment_id); + + if (reason == PPlanFragmentCancelReason::INTERNAL_ERROR && !msg.empty()) { + if (msg.find("Pipeline task leak.") != std::string::npos) { + LOG_WARNING("PipelineFragmentContext is cancelled due to illegal state : {}", + this->debug_string()); + } + } + if (reason == PPlanFragmentCancelReason::LIMIT_REACH) { _is_report_on_cancel = false; } else { diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 74f42a28b8a0c8..be2f97d7c33c69 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -183,6 +183,10 @@ static Status _do_fetch_running_queries_rpc(const FrontendInfo& fe_info, query_set = std::unordered_set(rpc_result.running_queries.begin(), rpc_result.running_queries.end()); + if (query_set.empty()) { + LOG_INFO("No running queries from {}", + PrintThriftNetworkAddress(fe_info.info.coordinator_address)); + } return Status::OK(); }; @@ -1384,6 +1388,18 @@ void FragmentMgr::cancel_worker() { << print_id(id); } + if (!queries_pipeline_task_leak.empty()) { + // Print running_queries_on_all_fes + for (const auto& [fe_process_uuid, query_ids] : running_queries_on_all_fes) { + std::string msg; + msg += fmt::format("Frontend process uuid: {}, running_queries: ", fe_process_uuid); + for (const auto& qid : query_ids) { + msg += fmt::format("{} ", print_id(qid)); + } + LOG_INFO(msg); + } + } + for (const auto& qid : queries_pipeline_task_leak) { // Cancel the query, and maybe try to report debug info to fe so that we can // collect debug info by sql or http api instead of search log. From 6738ecdf0d6c257fd65075238621bdf09d626b69 Mon Sep 17 00:00:00 2001 From: zhiqiang-hhhh Date: Wed, 21 Aug 2024 22:11:38 +0800 Subject: [PATCH 2/3] FIX --- be/src/common/config.cpp | 3 +-- be/src/runtime/fragment_mgr.cpp | 17 +---------------- 2 files changed, 2 insertions(+), 18 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index fa26d93967ebcc..461e328044d259 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1270,8 +1270,7 @@ DEFINE_mBool(enable_parquet_page_index, "false"); DEFINE_mBool(ignore_not_found_file_in_external_table, "true"); -// TODO: Revert to 60s after bug fixed. -DEFINE_mInt64(pipeline_task_leakage_detect_period_secs, "1"); +DEFINE_mInt64(pipeline_task_leakage_detect_period_secs, "60"); // clang-format off #ifdef BE_TEST diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index be2f97d7c33c69..b39f37ef9803a1 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -183,10 +183,7 @@ static Status _do_fetch_running_queries_rpc(const FrontendInfo& fe_info, query_set = std::unordered_set(rpc_result.running_queries.begin(), rpc_result.running_queries.end()); - if (query_set.empty()) { - LOG_INFO("No running queries from {}", - PrintThriftNetworkAddress(fe_info.info.coordinator_address)); - } + return Status::OK(); }; @@ -1388,18 +1385,6 @@ void FragmentMgr::cancel_worker() { << print_id(id); } - if (!queries_pipeline_task_leak.empty()) { - // Print running_queries_on_all_fes - for (const auto& [fe_process_uuid, query_ids] : running_queries_on_all_fes) { - std::string msg; - msg += fmt::format("Frontend process uuid: {}, running_queries: ", fe_process_uuid); - for (const auto& qid : query_ids) { - msg += fmt::format("{} ", print_id(qid)); - } - LOG_INFO(msg); - } - } - for (const auto& qid : queries_pipeline_task_leak) { // Cancel the query, and maybe try to report debug info to fe so that we can // collect debug info by sql or http api instead of search log. From 971200dfcc931e587213c93bd55ebaaea2c8bfea Mon Sep 17 00:00:00 2001 From: zhiqiang-hhhh Date: Wed, 21 Aug 2024 22:13:15 +0800 Subject: [PATCH 3/3] ne --- be/src/runtime/fragment_mgr.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index b39f37ef9803a1..b68839a0d6245d 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -1311,8 +1311,8 @@ void FragmentMgr::cancel_worker() { itr != running_queries_on_all_fes.end()) { // Query not found on this frontend, and the query arrives before the last check if (itr->second.find(q_ctx->query_id()) == itr->second.end() && - q_ctx->get_query_arrival_timestamp().tv_nsec < - check_invalid_query_last_timestamp.tv_nsec && + q_ctx->get_query_arrival_timestamp().tv_sec < + check_invalid_query_last_timestamp.tv_sec && q_ctx->get_query_source() == QuerySource::INTERNAL_FRONTEND) { if (q_ctx->enable_pipeline_x_exec()) { queries_pipeline_task_leak.push_back(q_ctx->query_id());