Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,6 @@ void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
// We need a more detail discussion.
_query_ctx->cancel(msg, Status::Cancelled(msg));

if (reason == PPlanFragmentCancelReason::INTERNAL_ERROR && !msg.empty()) {
if (msg.find("Pipeline task leak.") != std::string::npos) {
LOG_WARNING("PipelineFragmentContext is cancelled due to illegal state : {}",
this->debug_string());
}
}

if (reason == PPlanFragmentCancelReason::LIMIT_REACH) {
_is_report_on_cancel = false;
} else {
Expand Down
8 changes: 8 additions & 0 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,14 @@ void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
LOG(WARNING) << "PipelineXFragmentContext is cancelled due to timeout : " << debug_string();
}
_query_ctx->cancel(msg, Status::Cancelled(msg), _fragment_id);

if (reason == PPlanFragmentCancelReason::INTERNAL_ERROR && !msg.empty()) {
if (msg.find("Pipeline task leak.") != std::string::npos) {
LOG_WARNING("PipelineFragmentContext is cancelled due to illegal state : {}",
this->debug_string());
}
}

if (reason == PPlanFragmentCancelReason::LIMIT_REACH) {
_is_report_on_cancel = false;
} else {
Expand Down
5 changes: 3 additions & 2 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ static Status _do_fetch_running_queries_rpc(const FrontendInfo& fe_info,

query_set = std::unordered_set<TUniqueId>(rpc_result.running_queries.begin(),
rpc_result.running_queries.end());

return Status::OK();
};

Expand Down Expand Up @@ -1310,8 +1311,8 @@ void FragmentMgr::cancel_worker() {
itr != running_queries_on_all_fes.end()) {
// Query not found on this frontend, and the query arrives before the last check
if (itr->second.find(q_ctx->query_id()) == itr->second.end() &&
q_ctx->get_query_arrival_timestamp().tv_nsec <
check_invalid_query_last_timestamp.tv_nsec &&
q_ctx->get_query_arrival_timestamp().tv_sec <
check_invalid_query_last_timestamp.tv_sec &&
q_ctx->get_query_source() == QuerySource::INTERNAL_FRONTEND) {
if (q_ctx->enable_pipeline_x_exec()) {
queries_pipeline_task_leak.push_back(q_ctx->query_id());
Expand Down