From 3e4cd1afa76f9130da73ad84c7dfc111e93aa8d1 Mon Sep 17 00:00:00 2001 From: liulijia Date: Thu, 17 Aug 2023 22:52:44 +0800 Subject: [PATCH 1/8] pipeline report --- be/src/common/status.h | 2 +- be/src/pipeline/pipeline_fragment_context.cpp | 37 +++++++++++++++++-- be/src/pipeline/pipeline_fragment_context.h | 1 + .../pipeline_x_fragment_context.cpp | 17 ++++++++- be/src/runtime/fragment_mgr.cpp | 2 + be/src/service/internal_service.cpp | 7 +++- 6 files changed, 58 insertions(+), 8 deletions(-) diff --git a/be/src/common/status.h b/be/src/common/status.h index 162d3b493bddb6..95e684bd33ce7b 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -375,7 +375,7 @@ class Status { status._err_msg->_msg = fmt::format(msg, std::forward(args)...); } #ifdef ENABLE_STACKTRACE - if constexpr (stacktrace && capture_stacktrace(code)) { + if constexpr (stacktrace && capture_stacktrace(code) && code != 0) { status._err_msg->_stack = get_stack_trace(); LOG(WARNING) << "meet error status: " << status; // may print too many stacks. } diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 65cd718949466c..99b462a56d55ae 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -151,11 +151,25 @@ PipelineFragmentContext::~PipelineFragmentContext() { void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason, const std::string& msg) { + LOG_INFO("PipelineFragmentContext::cancel") + .tag("query_id", print_id(_query_ctx->query_id())) + .tag("fragment_id", _fragment_id) + .tag("instance_id", print_id(_runtime_state->fragment_instance_id())) + .tag("reason", PPlanFragmentCancelReason_Name(reason)) + .tag("message", msg); + if (_query_ctx->cancel(true, msg, Status::Cancelled(msg))) { - LOG(WARNING) << "PipelineFragmentContext " - << PrintInstanceStandardInfo(_query_id, _fragment_id, _fragment_instance_id) - << " is canceled, cancel message: " << msg; + if (reason != PPlanFragmentCancelReason::LIMIT_REACH) { + LOG(WARNING) << "PipelineFragmentContext " + << PrintInstanceStandardInfo(_query_id, _fragment_id, _fragment_instance_id) + << " is canceled, cancel message: " << msg; + + } else { + _set_is_report_on_cancel(false); + } + _runtime_state->set_is_cancelled(true, msg); + _runtime_state->set_process_status(_query_ctx->exec_status()); // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe // For stream load the fragment's query_id == load id, it is set in FE. auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id); @@ -199,6 +213,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re LOG_INFO("PipelineFragmentContext::prepare") .tag("query_id", print_id(_query_id)) + .tag("fragment_id", _fragment_id) .tag("instance_id", print_id(local_params.fragment_instance_id)) .tag("backend_num", local_params.backend_num) .tag("pthread_id", (uintptr_t)pthread_self()); @@ -311,6 +326,8 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re _runtime_state->runtime_profile()->add_child(_root_plan->runtime_profile(), true, nullptr); _runtime_state->runtime_profile()->add_child(_runtime_profile.get(), true, nullptr); + _start_report_thread(); + _prepared = true; return Status::OK(); } @@ -344,6 +361,19 @@ Status PipelineFragmentContext::_build_pipeline_tasks( return Status::OK(); } +void PipelineFragmentContext::_start_report_thread() { + if (_is_report_success && config::status_report_interval > 0) { + std::unique_lock l(_report_thread_lock); + _exec_env->send_report_thread_pool()->submit_func([this] { + Defer defer {[&]() { this->_report_thread_promise.set_value(true); }}; + this->report_profile(); + }); + // make sure the thread started up, otherwise report_profile() might get into a race + // with stop_report_thread() + _report_thread_started_cv.wait(l); + } +} + void PipelineFragmentContext::_stop_report_thread() { if (!_report_thread_active) { return; @@ -705,6 +735,7 @@ void PipelineFragmentContext::close_sink() { } void PipelineFragmentContext::close_if_prepare_failed() { + LOG(WARNING) << "close PipelineFragmentContext because prepare failed"; if (_tasks.empty()) { if (_root_plan) { _root_plan->close(_runtime_state.get()); diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 8e80a73143f9f9..ff314bd3c44372 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -147,6 +147,7 @@ class PipelineFragmentContext : public std::enable_shared_from_this Status _build_operators_for_set_operation_node(ExecNode*, PipelinePtr); virtual void _close_action(); + void _start_report_thread(); void _stop_report_thread(); void _set_is_report_on_cancel(bool val) { _is_report_on_cancel = val; } diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 7a0cc805568b24..251c1c68f6474d 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -112,7 +112,8 @@ PipelineXFragmentContext::~PipelineXFragmentContext() { if (!_runtime_states.empty()) { // The memory released by the query end is recorded in the query mem tracker, main memory in _runtime_state. SCOPED_ATTACH_TASK(_runtime_state.get()); - FOR_EACH_RUNTIME_STATE(_call_back(runtime_state.get(), &st); runtime_state.reset();) + FOR_EACH_RUNTIME_STATE(_call_back(runtime_state.get(), &st); + runtime_state.reset();) } else { _call_back(nullptr, &st); } @@ -122,8 +123,18 @@ PipelineXFragmentContext::~PipelineXFragmentContext() { void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason, const std::string& msg) { + LOG_INFO("PipelineXFragmentContext::cancel") + .tag("query_id", print_id(_query_ctx->query_id())) + .tag("fragment_id", _fragment_id) + .tag("reason", reason) + .tag("error message", msg); if (_query_ctx->cancel(true, msg, Status::Cancelled(msg))) { - LOG(WARNING) << "PipelineFragmentContext Canceled. reason=" << msg; + if (reason != PPlanFragmentCancelReason::LIMIT_REACH) { + FOR_EACH_RUNTIME_STATE(LOG(WARNING) << "PipelineXFragmentContext cancel instance: " + << print_id(runtime_state->fragment_instance_id());) + } else { + _set_is_report_on_cancel(false); + } // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe // For stream load the fragment's query_id == load id, it is set in FE. auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id); @@ -213,6 +224,8 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r // 5. Build pipeline tasks and initialize local state. RETURN_IF_ERROR(_build_pipeline_tasks(request)); + _start_report_thread(); + _prepared = true; return Status::OK(); } diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 79e67503e112ba..ed7afbc2d15d0e 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -539,9 +539,11 @@ void FragmentMgr::remove_pipeline_context( f_context->instance_ids(ins_ids); bool all_done = q_context->countdown(ins_ids.size()); for (const auto& ins_id : ins_ids) { + LOG(INFO) << "remove pipeline context " << print_id(ins_id); _pipeline_map.erase(ins_id); } if (all_done) { + LOG(INFO) << "remove query context " << print_id(query_id); _query_ctx_map.erase(query_id); } } diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 5367135c134ada..6ef5066d3b8975 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -556,7 +556,7 @@ void PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* TUniqueId tid; tid.__set_hi(request->finst_id().hi()); tid.__set_lo(request->finst_id().lo()); - + signal::set_signal_task_id(tid); Status st = Status::OK(); if (request->has_cancel_reason()) { LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid) @@ -1184,7 +1184,10 @@ void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* cont if (!st.ok()) { LOG(WARNING) << "transmit_block failed, message=" << st << ", fragment_instance_id=" << print_id(request->finst_id()) - << ", node=" << request->node_id(); + << ", node=" << request->node_id() + << ", from sender_id: " << request->sender_id() + << ", be_number: " << request->be_number() + << ", packet_seq: " << request->packet_seq(); } } else { st = extract_st; From 793d246377d6aedf0805851e6de646fba8f20e4f Mon Sep 17 00:00:00 2001 From: liulijia Date: Mon, 4 Sep 2023 15:38:23 +0800 Subject: [PATCH 2/8] pipeline report async --- be/src/common/status.h | 2 +- .../pipeline_context_report_executor.cpp | 78 +++++++++++++++ .../pipeline_context_report_executor.h | 46 +++++++++ be/src/pipeline/pipeline_fragment_context.cpp | 99 ++++++------------- be/src/pipeline/pipeline_fragment_context.h | 30 +++--- .../pipeline_x_fragment_context.cpp | 69 +------------ .../pipeline_x/pipeline_x_fragment_context.h | 10 +- be/src/pipeline/task_scheduler.cpp | 1 + be/src/runtime/fragment_mgr.cpp | 20 +++- be/src/runtime/fragment_mgr.h | 5 + be/src/runtime/query_context.h | 2 +- 11 files changed, 202 insertions(+), 160 deletions(-) create mode 100644 be/src/pipeline/pipeline_context_report_executor.cpp create mode 100644 be/src/pipeline/pipeline_context_report_executor.h diff --git a/be/src/common/status.h b/be/src/common/status.h index 95e684bd33ce7b..162d3b493bddb6 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -375,7 +375,7 @@ class Status { status._err_msg->_msg = fmt::format(msg, std::forward(args)...); } #ifdef ENABLE_STACKTRACE - if constexpr (stacktrace && capture_stacktrace(code) && code != 0) { + if constexpr (stacktrace && capture_stacktrace(code)) { status._err_msg->_stack = get_stack_trace(); LOG(WARNING) << "meet error status: " << status; // may print too many stacks. } diff --git a/be/src/pipeline/pipeline_context_report_executor.cpp b/be/src/pipeline/pipeline_context_report_executor.cpp new file mode 100644 index 00000000000000..151ce87eac1ccc --- /dev/null +++ b/be/src/pipeline/pipeline_context_report_executor.cpp @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "pipeline_context_report_executor.h" + +namespace doris::pipeline { + +PipelineContextReportExecutor::PipelineContextReportExecutor() { + auto st = ThreadPoolBuilder("FragmentInstanceReportThreadPool") + .set_min_threads(48) + .set_max_threads(512) + .set_max_queue_size(102400) + .build(&_report_thread_pool); + CHECK(st.ok()) << st.to_string(); +} + +Status PipelineContextReportExecutor::submit_report_task(doris::pipeline::ReportTask&& report_task, + bool uniq) { + std::unique_lock w_lock(_report_task_lock); + if (_closed) { + return Status::Aborted("PipelineContextReportExecutor already closed"); + } + auto instance_id = report_task.instance_id; + if (uniq && _report_tasks.count(instance_id) > 0) { + return Status::AlreadyExist("Reporting"); + } + _report_tasks[instance_id].push_back(report_task); + auto st = _report_thread_pool->submit_func([this, instance_id, report_task]() { + report_task.work_func(); + std::unique_lock w_lock(_report_task_lock); + _report_tasks[report_task.instance_id].pop_front(); + if (_report_tasks[report_task.instance_id].empty()) { + _report_tasks.erase(report_task.instance_id); + } + }); + if (!st.ok()) { + _report_tasks[report_task.instance_id].pop_back(); + if (_report_tasks[report_task.instance_id].empty()) { + _report_tasks.erase(report_task.instance_id); + } + return st; + } else { + return Status::OK(); + } +} + +bool PipelineContextReportExecutor::is_reporting(TUniqueId instance_id) { + std::shared_lock r_lock(_report_task_lock); + return _report_tasks.count(instance_id) > 0; +} + +void PipelineContextReportExecutor::close() { + { + std::unique_lock w_lock(_report_task_lock); + if (_closed) { + return; + } else { + _closed = true; + } + } + _report_thread_pool->shutdown(); +} + +} // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_context_report_executor.h b/be/src/pipeline/pipeline_context_report_executor.h new file mode 100644 index 00000000000000..08a182b533bce9 --- /dev/null +++ b/be/src/pipeline/pipeline_context_report_executor.h @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include + +#include "pipeline/pipeline_fragment_context.h" +#include "pipeline_x/pipeline_x_fragment_context.h" + +namespace doris::pipeline { +using WorkFunction = std::function; + +struct ReportTask { + TUniqueId instance_id; + WorkFunction work_func; + std::shared_ptr pipeline_fragment_ctx; +}; + +class PipelineContextReportExecutor { +public: + PipelineContextReportExecutor(); + Status submit_report_task(ReportTask&&, bool uniq = false); + bool is_reporting(TUniqueId instance_id); + void close(); + +private: + std::unique_ptr _report_thread_pool; + std::unordered_map> _report_tasks; // 任务 + std::shared_mutex _report_task_lock; + bool _closed = false; +}; +} // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 99b462a56d55ae..02144751d67155 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -28,6 +28,9 @@ #include #include // IWYU pragma: no_include +#include +#include + #include // IWYU pragma: keep #include #include @@ -125,14 +128,12 @@ PipelineFragmentContext::PipelineFragmentContext( _exec_env(exec_env), _query_ctx(std::move(query_ctx)), _call_back(call_back), - _report_thread_active(false), - _report_status_cb(report_status_cb), _is_report_on_cancel(true), + _report_status_cb(report_status_cb), _group_commit(group_commit) { if (_query_ctx->get_task_group()) { _task_group_entity = _query_ctx->get_task_group()->task_entity(); } - _report_thread_future = _report_thread_promise.get_future(); _fragment_watcher.start(); } @@ -146,7 +147,6 @@ PipelineFragmentContext::~PipelineFragmentContext() { } else { _call_back(_runtime_state.get(), &st); } - DCHECK(!_report_thread_active); } void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason, @@ -161,14 +161,14 @@ void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason, if (_query_ctx->cancel(true, msg, Status::Cancelled(msg))) { if (reason != PPlanFragmentCancelReason::LIMIT_REACH) { LOG(WARNING) << "PipelineFragmentContext " - << PrintInstanceStandardInfo(_query_id, _fragment_id, _fragment_instance_id) + << PrintInstanceStandardInfo(_query_id, _fragment_id, + _fragment_instance_id) << " is canceled, cancel message: " << msg; } else { _set_is_report_on_cancel(false); } - _runtime_state->set_is_cancelled(true, msg); _runtime_state->set_process_status(_query_ctx->exec_status()); // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe // For stream load the fragment's query_id == load id, it is set in FE. @@ -326,7 +326,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re _runtime_state->runtime_profile()->add_child(_root_plan->runtime_profile(), true, nullptr); _runtime_state->runtime_profile()->add_child(_runtime_profile.get(), true, nullptr); - _start_report_thread(); + _init_next_report_time(); _prepared = true; return Status::OK(); @@ -361,66 +361,34 @@ Status PipelineFragmentContext::_build_pipeline_tasks( return Status::OK(); } -void PipelineFragmentContext::_start_report_thread() { +void PipelineFragmentContext::_init_next_report_time() { if (_is_report_success && config::status_report_interval > 0) { - std::unique_lock l(_report_thread_lock); - _exec_env->send_report_thread_pool()->submit_func([this] { - Defer defer {[&]() { this->_report_thread_promise.set_value(true); }}; - this->report_profile(); - }); - // make sure the thread started up, otherwise report_profile() might get into a race - // with stop_report_thread() - _report_thread_started_cv.wait(l); + std::vector ins_ids; + instance_ids(ins_ids); + VLOG_FILE << "report_profile(): instance_id=" + << fmt::format("{}", fmt::join(ins_ids, ", ")); + uint64_t report_fragment_offset = + (uint64_t)(rand() % config::status_report_interval) * NANOS_PER_SEC; + // We don't want to wait longer than it takes to run the entire fragment. + _next_report_time = MonotonicNanos() + report_fragment_offset; } } -void PipelineFragmentContext::_stop_report_thread() { - if (!_report_thread_active) { +void PipelineFragmentContext::trigger_report_if_necessary() { + if (_next_report_time == 0) { return; } - - _report_thread_active = false; - - _stop_report_thread_cv.notify_one(); - // Wait infinitly to ensure that the report task is finished and the this variable - // is not used in report thread. - _report_thread_future.wait(); -} - -void PipelineFragmentContext::report_profile() { - SCOPED_ATTACH_TASK(_runtime_state.get()); - VLOG_FILE << "report_profile(): instance_id=" << _runtime_state->fragment_instance_id(); - - _report_thread_active = true; - - std::unique_lock l(_report_thread_lock); - // tell Open() that we started - _report_thread_started_cv.notify_one(); - - // Jitter the reporting time of remote fragments by a random amount between - // 0 and the report_interval. This way, the coordinator doesn't get all the - // updates at once so its better for contention as well as smoother progress - // reporting. - int report_fragment_offset = rand() % config::status_report_interval; - // We don't want to wait longer than it takes to run the entire fragment. - _stop_report_thread_cv.wait_for(l, std::chrono::seconds(report_fragment_offset)); - while (_report_thread_active) { - if (config::status_report_interval > 0) { - // wait_for can return because the timeout occurred or the condition variable - // was signaled. We can't rely on its return value to distinguish between the - // two cases (e.g. there is a race here where the wait timed out but before grabbing - // the lock, the condition variable was signaled). Instead, we will use an external - // flag, _report_thread_active, to coordinate this. - _stop_report_thread_cv.wait_for(l, - std::chrono::seconds(config::status_report_interval)); - } else { - LOG(WARNING) << "config::status_report_interval is equal to or less than zero, exiting " - "reporting thread."; - break; - } - + auto interval_s = config::status_report_interval; + if (interval_s <= 0) { + LOG(WARNING) + << "config::status_report_interval is equal to or less than zero, do not trigger " + "report."; + } + auto now = MonotonicNanos(); + if (now > _next_report_time) { + _next_report_time = now + (uint64_t)(interval_s)*NANOS_PER_SEC; if (VLOG_FILE_IS_ON) { - VLOG_FILE << "Reporting " << (!_report_thread_active ? "final " : " ") + VLOG_FILE << "Reporting " << "profile for instance " << _runtime_state->fragment_instance_id(); std::stringstream ss; _runtime_state->runtime_profile()->compute_time_in_profile(); @@ -431,15 +399,8 @@ void PipelineFragmentContext::report_profile() { } VLOG_FILE << ss.str(); } - - if (!_report_thread_active) { - break; - } - send_report(false); } - - VLOG_FILE << "exiting reporting thread: instance_id=" << _runtime_state->fragment_instance_id(); } // TODO: use virtual function to do abstruct @@ -846,7 +807,6 @@ Status PipelineFragmentContext::_create_sink(int sender_id, const TDataSink& thr void PipelineFragmentContext::_close_action() { _runtime_profile->total_time_counter()->update(_fragment_watcher.elapsed_time()); send_report(true); - _stop_report_thread(); // all submitted tasks done _exec_env->fragment_mgr()->remove_pipeline_context(shared_from_this()); } @@ -895,7 +855,8 @@ void PipelineFragmentContext::send_report(bool done) { _runtime_state.get(), std::bind(&PipelineFragmentContext::update_status, this, std::placeholders::_1), std::bind(&PipelineFragmentContext::cancel, this, std::placeholders::_1, - std::placeholders::_2)}); + std::placeholders::_2)}, + shared_from_this()); } } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index ff314bd3c44372..aef06d64cf98eb 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -58,7 +58,8 @@ class PipelineFragmentContext : public std::enable_shared_from_this; + using report_status_callback = std::function&&)>; PipelineFragmentContext(const TUniqueId& query_id, const TUniqueId& instance_id, const int fragment_id, int backend_num, std::shared_ptr query_ctx, ExecEnv* exec_env, @@ -120,8 +121,6 @@ class PipelineFragmentContext : public std::enable_shared_from_this l(_status_lock); if (!status.ok() && _query_ctx->exec_status().ok()) { @@ -133,12 +132,17 @@ class PipelineFragmentContext : public std::enable_shared_from_this& ins_ids) const { ins_ids.resize(1); ins_ids[0] = _fragment_instance_id; } + virtual void instance_ids(std::vector& ins_ids) const { + ins_ids.resize(1); + ins_ids[0] = print_id(_fragment_instance_id); + } protected: Status _create_sink(int sender_id, const TDataSink& t_data_sink, RuntimeState* state); @@ -147,8 +151,7 @@ class PipelineFragmentContext : public std::enable_shared_from_this Status _build_operators_for_set_operation_node(ExecNode*, PipelinePtr); virtual void _close_action(); - void _start_report_thread(); - void _stop_report_thread(); + void _init_next_report_time(); void _set_is_report_on_cancel(bool val) { _is_report_on_cancel = val; } // Id of this query @@ -201,22 +204,15 @@ class PipelineFragmentContext : public std::enable_shared_from_this _call_back; std::once_flag _close_once_flag; - std::condition_variable _report_thread_started_cv; - // true if we started the thread - bool _report_thread_active; - // profile reporting-related - report_status_callback _report_status_cb; - std::promise _report_thread_promise; - std::future _report_thread_future; - std::mutex _report_thread_lock; - - // Indicates that profile reporting thread should stop. - // Tied to _report_thread_lock. - std::condition_variable _stop_report_thread_cv; // If this is set to false, and '_is_report_success' is false as well, // This executor will not report status to FE on being cancelled. bool _is_report_on_cancel; + std::atomic_uint64_t _next_report_time = 0; + + // profile reporting-related + report_status_callback _report_status_cb; + private: std::vector> _tasks; bool _group_commit; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 251c1c68f6474d..99cbf85b1065a5 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -112,13 +112,11 @@ PipelineXFragmentContext::~PipelineXFragmentContext() { if (!_runtime_states.empty()) { // The memory released by the query end is recorded in the query mem tracker, main memory in _runtime_state. SCOPED_ATTACH_TASK(_runtime_state.get()); - FOR_EACH_RUNTIME_STATE(_call_back(runtime_state.get(), &st); - runtime_state.reset();) + FOR_EACH_RUNTIME_STATE(_call_back(runtime_state.get(), &st); runtime_state.reset();) } else { _call_back(nullptr, &st); } _runtime_state.reset(); - DCHECK(!_report_thread_active); } void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason, @@ -224,7 +222,7 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r // 5. Build pipeline tasks and initialize local state. RETURN_IF_ERROR(_build_pipeline_tasks(request)); - _start_report_thread(); + _init_next_report_time(); _prepared = true; return Status::OK(); @@ -426,64 +424,6 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( return Status::OK(); } -void PipelineXFragmentContext::report_profile() { - FOR_EACH_RUNTIME_STATE( - SCOPED_ATTACH_TASK(runtime_state.get()); - VLOG_FILE << "report_profile(): instance_id=" << runtime_state->fragment_instance_id(); - - _report_thread_active = true; - - std::unique_lock l(_report_thread_lock); - // tell Open() that we started - _report_thread_started_cv.notify_one(); - - // Jitter the reporting time of remote fragments by a random amount between - // 0 and the report_interval. This way, the coordinator doesn't get all the - // updates at once so its better for contention as well as smoother progress - // reporting. - int report_fragment_offset = rand() % config::status_report_interval; - // We don't want to wait longer than it takes to run the entire fragment. - _stop_report_thread_cv.wait_for(l, std::chrono::seconds(report_fragment_offset)); - while (_report_thread_active) { - if (config::status_report_interval > 0) { - // wait_for can return because the timeout occurred or the condition variable - // was signaled. We can't rely on its return value to distinguish between the - // two cases (e.g. there is a race here where the wait timed out but before grabbing - // the lock, the condition variable was signaled). Instead, we will use an external - // flag, _report_thread_active, to coordinate this. - _stop_report_thread_cv.wait_for( - l, std::chrono::seconds(config::status_report_interval)); - } else { - LOG(WARNING) << "config::status_report_interval is equal to or less than zero, " - "exiting " - "reporting thread."; - break; - } - - if (VLOG_FILE_IS_ON) { - VLOG_FILE << "Reporting " << (!_report_thread_active ? "final " : " ") - << "profile for instance " << runtime_state->fragment_instance_id(); - std::stringstream ss; - runtime_state->runtime_profile()->compute_time_in_profile(); - runtime_state->runtime_profile()->pretty_print(&ss); - if (runtime_state->load_channel_profile()) { - // runtime_state->load_channel_profile()->compute_time_in_profile(); // TODO load channel profile add timer - runtime_state->load_channel_profile()->pretty_print(&ss); - } - VLOG_FILE << ss.str(); - } - - if (!_report_thread_active) { - break; - } - - send_report(false); - } - - VLOG_FILE - << "exiting reporting thread: instance_id=" << runtime_state->fragment_instance_id();) -} - Status PipelineXFragmentContext::_build_pipelines(ObjectPool* pool, const doris::TPipelineFragmentParams& request, const DescriptorTbl& descs, OperatorXPtr* root, @@ -857,7 +797,6 @@ void PipelineXFragmentContext::close_if_prepare_failed() { void PipelineXFragmentContext::_close_action() { _runtime_profile->total_time_counter()->update(_fragment_watcher.elapsed_time()); send_report(true); - _stop_report_thread(); // all submitted tasks done _exec_env->fragment_mgr()->remove_pipeline_context(shared_from_this()); } @@ -894,7 +833,7 @@ void PipelineXFragmentContext::send_report(bool done) { _runtime_state.get(), std::bind(&PipelineFragmentContext::update_status, this, std::placeholders::_1), std::bind(&PipelineFragmentContext::cancel, this, std::placeholders::_1, - std::placeholders::_2)}); + std::placeholders::_2)}, + shared_from_this()); } - } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index 7c54a411c6e07a..2468cdde0d05d5 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -60,7 +60,6 @@ class PipelineXFragmentContext : public PipelineFragmentContext { // Note: this does not take a const RuntimeProfile&, because it might need to call // functions like PrettyPrint() or to_thrift(), neither of which is const // because they take locks. - using report_status_callback = std::function; PipelineXFragmentContext(const TUniqueId& query_id, const int fragment_id, std::shared_ptr query_ctx, ExecEnv* exec_env, const std::function& call_back, @@ -76,6 +75,13 @@ class PipelineXFragmentContext : public PipelineFragmentContext { } } + void instance_ids(std::vector& ins_ids) const override { + ins_ids.resize(_runtime_states.size()); + for (size_t i = 0; i < _runtime_states.size(); i++) { + ins_ids[i] = print_id(_runtime_states[i]->fragment_instance_id()); + } + } + void add_merge_controller_handler( std::shared_ptr& handler) override { _merge_controller_handlers.emplace_back(handler); @@ -96,8 +102,6 @@ class PipelineXFragmentContext : public PipelineFragmentContext { void send_report(bool) override; - void report_profile() override; - RuntimeState* get_runtime_state(UniqueId fragment_instance_id) override { std::lock_guard l(_state_map_lock); if (_instance_id_to_runtime_state.count(fragment_instance_id) > 0) { diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index e71d8da0cdc7c3..95f8d322b17234 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -280,6 +280,7 @@ void TaskScheduler::_do_work(size_t index) { _try_close_task(task, PipelineTaskState::CANCELED, status); continue; } + fragment_ctx->trigger_report_if_necessary(); if (eos) { task->set_eos_time(); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index ed7afbc2d15d0e..03413e7fa699f2 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -59,6 +59,7 @@ #include "gutil/strings/substitute.h" #include "io/fs/stream_load_pipe.h" #include "opentelemetry/trace/scope.h" +#include "pipeline/pipeline_context_report_executor.h" #include "pipeline/pipeline_fragment_context.h" #include "runtime/client_cache.h" #include "runtime/descriptors.h" @@ -137,6 +138,8 @@ FragmentMgr::FragmentMgr(ExecEnv* exec_env) REGISTER_HOOK_METRIC(fragment_thread_pool_queue_size, [this]() { return _thread_pool->get_queue_size(); }); CHECK(s.ok()) << s.to_string(); + + _pipeline_ctx_report_executor = std::make_unique(); } FragmentMgr::~FragmentMgr() = default; @@ -162,6 +165,7 @@ void FragmentMgr::stop() { } _pipeline_map.clear(); } + _pipeline_ctx_report_executor->close(); } std::string FragmentMgr::to_http_path(const std::string& file_name) { @@ -172,6 +176,13 @@ std::string FragmentMgr::to_http_path(const std::string& file_name) { return url.str(); } +Status FragmentMgr::trigger_pipeline_context_report( + const ReportStatusRequest& req, std::shared_ptr&& ctx) { + return _pipeline_ctx_report_executor->submit_report_task( + {req.fragment_instance_id, [this, req]() { coordinator_callback(req); }, ctx}, + !req.done); +} + // There can only be one of these callbacks in-flight at any moment, because // it is only invoked from the executor's reporting thread. // Also, the reported status will always reflect the most recent execution status, @@ -775,8 +786,8 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, std::shared_ptr context = std::make_shared( query_ctx->query_id(), params.fragment_id, query_ctx, _exec_env, cb, - std::bind(std::mem_fn(&FragmentMgr::coordinator_callback), this, - std::placeholders::_1), + std::bind(std::mem_fn(&FragmentMgr::trigger_pipeline_context_report), + this, std::placeholders::_1, std::placeholders::_2), params.group_commit); { SCOPED_RAW_TIMER(&duration_ns); @@ -853,8 +864,9 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, std::make_shared( query_ctx->query_id(), fragment_instance_id, params.fragment_id, local_params.backend_num, query_ctx, _exec_env, cb, - std::bind(std::mem_fn(&FragmentMgr::coordinator_callback), this, - std::placeholders::_1)); + std::bind( + std::mem_fn(&FragmentMgr::trigger_pipeline_context_report), + this, std::placeholders::_1, std::placeholders::_2)); { SCOPED_RAW_TIMER(&duration_ns); auto prepare_st = context->prepare(params, i); diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 14c63c559bbd3f..20817fefa4d30e 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -48,6 +48,8 @@ namespace doris { namespace pipeline { class PipelineFragmentContext; class PipelineXFragmentContext; +struct ReportTask; +class PipelineContextReportExecutor; } // namespace pipeline class QueryContext; class ExecEnv; @@ -99,6 +101,8 @@ class FragmentMgr : public RestMonitorIface { const PPlanFragmentCancelReason& reason, const std::unique_lock& state_lock, const std::string& msg = ""); + Status trigger_pipeline_context_report(const ReportStatusRequest&, + std::shared_ptr&&); // Pipeline version, cancel a fragment instance. void cancel_instance(const TUniqueId& instance_id, const PPlanFragmentCancelReason& reason, @@ -203,6 +207,7 @@ class FragmentMgr : public RestMonitorIface { UIntGauge* timeout_canceled_fragment_count = nullptr; RuntimeFilterMergeController _runtimefilter_controller; + std::unique_ptr _pipeline_ctx_report_executor; }; } // namespace doris diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 88a8367ff9fa82..95541c0e079f80 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -42,7 +42,7 @@ namespace doris { struct ReportStatusRequest { bool is_pipeline_x; - const Status& status; + const Status status; std::vector runtime_states; RuntimeProfile* profile; RuntimeProfile* load_channel_profile; From b779092b2f19e6a27e903e92e5fd2c43a90187eb Mon Sep 17 00:00:00 2001 From: liulijia Date: Tue, 19 Sep 2023 19:08:00 +0800 Subject: [PATCH 3/8] refactor --- be/src/common/config.cpp | 2 + be/src/common/config.h | 1 + be/src/exec/exec_node.cpp | 3 +- be/src/pipeline/exec/exchange_sink_buffer.cpp | 3 + be/src/pipeline/exec/exchange_sink_buffer.h | 1 + .../pipeline/exec/exchange_sink_operator.cpp | 14 ++-- .../pipeline_context_report_executor.cpp | 78 ------------------- .../pipeline_context_report_executor.h | 46 ----------- be/src/pipeline/pipeline_fragment_context.cpp | 64 ++++++++++----- be/src/pipeline/pipeline_fragment_context.h | 11 ++- .../pipeline_x_fragment_context.cpp | 12 +-- .../pipeline_x/pipeline_x_fragment_context.h | 2 +- be/src/pipeline/task_scheduler.cpp | 3 +- be/src/runtime/fragment_mgr.cpp | 30 ++++--- be/src/runtime/fragment_mgr.h | 4 +- be/src/runtime/plan_fragment_executor.cpp | 6 +- be/src/runtime/query_context.h | 6 +- be/src/runtime/runtime_state.cpp | 7 ++ be/src/runtime/runtime_state.h | 5 +- be/src/vec/runtime/vdata_stream_mgr.cpp | 11 ++- be/src/vec/runtime/vdata_stream_mgr.h | 2 +- be/src/vec/runtime/vdata_stream_recvr.cpp | 18 +++-- be/src/vec/runtime/vdata_stream_recvr.h | 7 +- be/src/vec/sink/vdata_stream_sender.cpp | 52 +++++++------ be/src/vec/sink/vdata_stream_sender.h | 33 ++++---- gensrc/proto/internal_service.proto | 1 + 26 files changed, 188 insertions(+), 234 deletions(-) delete mode 100644 be/src/pipeline/pipeline_context_report_executor.cpp delete mode 100644 be/src/pipeline/pipeline_context_report_executor.h diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 643f6eb23cd1b0..e8632d47d9d151 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -225,6 +225,8 @@ DEFINE_mBool(compress_rowbatches, "true"); DEFINE_mBool(rowbatch_align_tuple_offset, "false"); // interval between profile reports; in seconds DEFINE_mInt32(status_report_interval, "5"); +// The pipeline task has a high concurrency, therefore reducing its report frequency +DEFINE_mInt32(pipeline_status_report_interval, "10"); // if true, each disk will have a separate thread pool for scanner DEFINE_Bool(doris_enable_scanner_thread_pool_per_disk, "true"); // the timeout of a work thread to wait the blocking priority queue to get a task diff --git a/be/src/common/config.h b/be/src/common/config.h index 349a8a93a91192..11891691871a1f 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -271,6 +271,7 @@ DECLARE_mBool(compress_rowbatches); DECLARE_mBool(rowbatch_align_tuple_offset); // interval between profile reports; in seconds DECLARE_mInt32(status_report_interval); +DECLARE_mInt32(pipeline_status_report_interval); // if true, each disk will have a separate thread pool for scanner DECLARE_Bool(doris_enable_scanner_thread_pool_per_disk); // the timeout of a work thread to wait the blocking priority queue to get a task diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index c21ae2e5f579b6..f3870c63463e56 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -198,7 +198,8 @@ Status ExecNode::close(RuntimeState* state) { << " already closed"; return Status::OK(); } - LOG(INFO) << "fragment_instance_id=" << print_id(state->fragment_instance_id()) << " closed"; + LOG(INFO) << "fragment_instance_id=" << print_id(state->fragment_instance_id()) << ", " + << " id=" << _id << " type=" << print_plan_node_type(_type) << " closed"; _is_closed = true; Status result; diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index f49fea2c037d55..5877ae4de6c67d 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -225,6 +225,9 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { if (request.block) { brpc_request->set_allocated_block(request.block.get()); } + if (!request.exec_status.ok()) { + request.exec_status.to_protobuf(brpc_request->mutable_exec_status()); // should release??? + } auto* closure = request.channel->get_closure(id, request.eos, nullptr); _instance_to_rpc_ctx[id]._closure = closure; diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index 6086b36c221860..d5e530af5c0091 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -94,6 +94,7 @@ struct TransmitInfo { vectorized::PipChannel* channel; std::unique_ptr block; bool eos; + Status exec_status; }; template diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 1b00293258ef64..2958529e993548 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -278,7 +278,8 @@ template void ExchangeSinkOperatorX::_handle_eof_channel(RuntimeState* state, ChannelPtrType channel, Status st) { channel->set_receiver_eof(st); - channel->close(state); + Status ok = Status::OK(); + channel->close(state, ok); } Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block, @@ -337,8 +338,8 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block status = channel->send_local_block(&cur_block); } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); - status = channel->send_block(block_holder, &sent, - source_state == SourceState::FINISHED); + status = channel->send_broadcast_block( + block_holder, &sent, source_state == SourceState::FINISHED); } HANDLE_CHANNEL_STATUS(state, channel, status); } @@ -365,8 +366,8 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); RETURN_IF_ERROR(local_state._serializer.serialize_block( block, current_channel->ch_cur_pb_block())); - auto status = current_channel->send_block(current_channel->ch_cur_pb_block(), - source_state == SourceState::FINISHED); + auto status = current_channel->send_remote_block( + current_channel->ch_cur_pb_block(), source_state == SourceState::FINISHED); HANDLE_CHANNEL_STATUS(state, current_channel, status); current_channel->ch_roll_pb_block(); } @@ -520,8 +521,9 @@ Status ExchangeSinkOperatorX::try_close(RuntimeState* state, Status exec_status) CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state); local_state._serializer.reset_block(); Status final_st = Status::OK(); + auto exec_status = state->query_status(); for (int i = 0; i < local_state.channels.size(); ++i) { - Status st = local_state.channels[i]->close(state); + Status st = local_state.channels[i]->close(state, exec_status); if (!st.ok() && final_st.ok()) { final_st = st; } diff --git a/be/src/pipeline/pipeline_context_report_executor.cpp b/be/src/pipeline/pipeline_context_report_executor.cpp deleted file mode 100644 index 151ce87eac1ccc..00000000000000 --- a/be/src/pipeline/pipeline_context_report_executor.cpp +++ /dev/null @@ -1,78 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "pipeline_context_report_executor.h" - -namespace doris::pipeline { - -PipelineContextReportExecutor::PipelineContextReportExecutor() { - auto st = ThreadPoolBuilder("FragmentInstanceReportThreadPool") - .set_min_threads(48) - .set_max_threads(512) - .set_max_queue_size(102400) - .build(&_report_thread_pool); - CHECK(st.ok()) << st.to_string(); -} - -Status PipelineContextReportExecutor::submit_report_task(doris::pipeline::ReportTask&& report_task, - bool uniq) { - std::unique_lock w_lock(_report_task_lock); - if (_closed) { - return Status::Aborted("PipelineContextReportExecutor already closed"); - } - auto instance_id = report_task.instance_id; - if (uniq && _report_tasks.count(instance_id) > 0) { - return Status::AlreadyExist("Reporting"); - } - _report_tasks[instance_id].push_back(report_task); - auto st = _report_thread_pool->submit_func([this, instance_id, report_task]() { - report_task.work_func(); - std::unique_lock w_lock(_report_task_lock); - _report_tasks[report_task.instance_id].pop_front(); - if (_report_tasks[report_task.instance_id].empty()) { - _report_tasks.erase(report_task.instance_id); - } - }); - if (!st.ok()) { - _report_tasks[report_task.instance_id].pop_back(); - if (_report_tasks[report_task.instance_id].empty()) { - _report_tasks.erase(report_task.instance_id); - } - return st; - } else { - return Status::OK(); - } -} - -bool PipelineContextReportExecutor::is_reporting(TUniqueId instance_id) { - std::shared_lock r_lock(_report_task_lock); - return _report_tasks.count(instance_id) > 0; -} - -void PipelineContextReportExecutor::close() { - { - std::unique_lock w_lock(_report_task_lock); - if (_closed) { - return; - } else { - _closed = true; - } - } - _report_thread_pool->shutdown(); -} - -} // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_context_report_executor.h b/be/src/pipeline/pipeline_context_report_executor.h deleted file mode 100644 index 08a182b533bce9..00000000000000 --- a/be/src/pipeline/pipeline_context_report_executor.h +++ /dev/null @@ -1,46 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -#pragma once - -#include - -#include "pipeline/pipeline_fragment_context.h" -#include "pipeline_x/pipeline_x_fragment_context.h" - -namespace doris::pipeline { -using WorkFunction = std::function; - -struct ReportTask { - TUniqueId instance_id; - WorkFunction work_func; - std::shared_ptr pipeline_fragment_ctx; -}; - -class PipelineContextReportExecutor { -public: - PipelineContextReportExecutor(); - Status submit_report_task(ReportTask&&, bool uniq = false); - bool is_reporting(TUniqueId instance_id); - void close(); - -private: - std::unique_ptr _report_thread_pool; - std::unordered_map> _report_tasks; // 任务 - std::shared_mutex _report_task_lock; - bool _closed = false; -}; -} // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 02144751d67155..1b7f996408bb81 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -166,7 +166,7 @@ void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason, << " is canceled, cancel message: " << msg; } else { - _set_is_report_on_cancel(false); + _set_is_report_on_cancel(false); // TODO bug llj fix this not projected by lock } _runtime_state->set_process_status(_query_ctx->exec_status()); @@ -178,7 +178,9 @@ void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason, } // must close stream_mgr to avoid dead lock in Exchange Node - _exec_env->vstream_mgr()->cancel(_fragment_instance_id); + // TODO bug llj fix this other instance will not cancel + Status cancel_status = Status::Cancelled(msg); + _exec_env->vstream_mgr()->cancel(_fragment_instance_id, cancel_status); // Cancel the result queue manager used by spark doris connector // TODO pipeline incomp // _exec_env->result_queue_mgr()->update_queue_status(id, Status::Aborted(msg)); @@ -362,34 +364,55 @@ Status PipelineFragmentContext::_build_pipeline_tasks( } void PipelineFragmentContext::_init_next_report_time() { - if (_is_report_success && config::status_report_interval > 0) { + auto interval_s = config::pipeline_status_report_interval; + if (_is_report_success && interval_s > 0 && _query_ctx->timeout_second > interval_s) { std::vector ins_ids; instance_ids(ins_ids); - VLOG_FILE << "report_profile(): instance_id=" + VLOG_FILE << "enable period report: instance_id=" << fmt::format("{}", fmt::join(ins_ids, ", ")); - uint64_t report_fragment_offset = - (uint64_t)(rand() % config::status_report_interval) * NANOS_PER_SEC; + uint64_t report_fragment_offset = (uint64_t)(rand() % interval_s) * NANOS_PER_SEC; // We don't want to wait longer than it takes to run the entire fragment. - _next_report_time = MonotonicNanos() + report_fragment_offset; + _previous_report_time = + MonotonicNanos() + report_fragment_offset - (uint64_t)(interval_s)*NANOS_PER_SEC; + _disable_period_report = false; } } +void PipelineFragmentContext::refresh_next_report_time() { + auto disable = _disable_period_report.load(std::memory_order_acq_rel); + DCHECK(disable == true); + _previous_report_time = MonotonicNanos(); + _disable_period_report.compare_exchange_strong(disable, false); +} + void PipelineFragmentContext::trigger_report_if_necessary() { - if (_next_report_time == 0) { + if (!_is_report_success) { return; } - auto interval_s = config::status_report_interval; + auto disable = _disable_period_report.load(std::memory_order_acq_rel); + if (disable) { + return; + } + int32_t interval_s = config::pipeline_status_report_interval; if (interval_s <= 0) { LOG(WARNING) << "config::status_report_interval is equal to or less than zero, do not trigger " "report."; } - auto now = MonotonicNanos(); - if (now > _next_report_time) { - _next_report_time = now + (uint64_t)(interval_s)*NANOS_PER_SEC; + uint64_t next_report_time = _previous_report_time.load(std::memory_order_acq_rel) + + (uint64_t)(interval_s)*NANOS_PER_SEC; + if (MonotonicNanos() > next_report_time) { + if (!_disable_period_report.compare_exchange_strong(disable, true, + std::memory_order_acq_rel)) { + return; + } if (VLOG_FILE_IS_ON) { + std::vector ins_ids; + instance_ids(ins_ids); VLOG_FILE << "Reporting " - << "profile for instance " << _runtime_state->fragment_instance_id(); + << "profile for query_id " << print_id(_query_id) + << ", instance ids: " << fmt::format("{}", fmt::join(ins_ids, ", ")); + std::stringstream ss; _runtime_state->runtime_profile()->compute_time_in_profile(); _runtime_state->runtime_profile()->pretty_print(&ss); @@ -399,7 +422,12 @@ void PipelineFragmentContext::trigger_report_if_necessary() { } VLOG_FILE << ss.str(); } - send_report(false); + auto st = send_report(false); + if (!st.ok()) { + disable = true; + _disable_period_report.compare_exchange_strong(disable, false, + std::memory_order_acq_rel); + } } } @@ -819,7 +847,7 @@ void PipelineFragmentContext::close_a_pipeline() { } } -void PipelineFragmentContext::send_report(bool done) { +Status PipelineFragmentContext::send_report(bool done) { Status exec_status = Status::OK(); { std::lock_guard l(_status_lock); @@ -829,7 +857,7 @@ void PipelineFragmentContext::send_report(bool done) { // If plan is done successfully, but _is_report_success is false, // no need to send report. if (!_is_report_success && done && exec_status.ok()) { - return; + return Status::NeedSendAgain(""); } // If both _is_report_success and _is_report_on_cancel are false, @@ -837,10 +865,10 @@ void PipelineFragmentContext::send_report(bool done) { // This may happen when the query limit reached and // a internal cancellation being processed if (!_is_report_success && !_is_report_on_cancel) { - return; + return Status::NeedSendAgain(""); } - _report_status_cb( + return _report_status_cb( {false, exec_status, {}, diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index aef06d64cf98eb..80c38880bf332a 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -58,8 +58,8 @@ class PipelineFragmentContext : public std::enable_shared_from_this&&)>; + using report_status_callback = std::function&&)>; PipelineFragmentContext(const TUniqueId& query_id, const TUniqueId& instance_id, const int fragment_id, int backend_num, std::shared_ptr query_ctx, ExecEnv* exec_env, @@ -119,7 +119,7 @@ class PipelineFragmentContext : public std::enable_shared_from_this& handler) {} - virtual void send_report(bool); + virtual Status send_report(bool); Status update_status(Status status) { std::lock_guard l(_status_lock); @@ -143,6 +143,7 @@ class PipelineFragmentContext : public std::enable_shared_from_thisquery_id())) + .tag("query_id", print_id(_query_id)) .tag("fragment_id", _fragment_id) .tag("reason", reason) .tag("error message", msg); @@ -131,7 +131,7 @@ void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason, FOR_EACH_RUNTIME_STATE(LOG(WARNING) << "PipelineXFragmentContext cancel instance: " << print_id(runtime_state->fragment_instance_id());) } else { - _set_is_report_on_cancel(false); + _set_is_report_on_cancel(false); // TODO bug llj } // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe // For stream load the fragment's query_id == load id, it is set in FE. @@ -163,7 +163,7 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r } LOG_INFO("PipelineXFragmentContext::prepare") - .tag("query_id", _query_id) + .tag("query_id", print_id(_query_id)) .tag("fragment_id", _fragment_id) .tag("pthread_id", (uintptr_t)pthread_self()); @@ -801,7 +801,7 @@ void PipelineXFragmentContext::_close_action() { _exec_env->fragment_mgr()->remove_pipeline_context(shared_from_this()); } -void PipelineXFragmentContext::send_report(bool done) { +Status PipelineXFragmentContext::send_report(bool done) { Status exec_status = Status::OK(); { std::lock_guard l(_status_lock); @@ -811,7 +811,7 @@ void PipelineXFragmentContext::send_report(bool done) { // If plan is done successfully, but _is_report_success is false, // no need to send report. if (!_is_report_success && done && exec_status.ok()) { - return; + return Status::NeedSendAgain(""); } // If both _is_report_success and _is_report_on_cancel are false, @@ -819,7 +819,7 @@ void PipelineXFragmentContext::send_report(bool done) { // This may happen when the query limit reached and // a internal cancellation being processed if (!_is_report_success && !_is_report_on_cancel) { - return; + return Status::NeedSendAgain(""); } std::vector runtime_states(_runtime_states.size()); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index 2468cdde0d05d5..c548b8cfa3d241 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -100,7 +100,7 @@ class PipelineXFragmentContext : public PipelineFragmentContext { void cancel(const PPlanFragmentCancelReason& reason = PPlanFragmentCancelReason::INTERNAL_ERROR, const std::string& msg = "") override; - void send_report(bool) override; + Status send_report(bool) override; RuntimeState* get_runtime_state(UniqueId fragment_instance_id) override { std::lock_guard l(_state_map_lock); diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 95f8d322b17234..69c402ffa8ff27 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -245,7 +245,7 @@ void TaskScheduler::_do_work(size_t index) { // If pipeline is canceled caused by memory limit, we should send report to FE in order // to cancel all pipeline tasks in this query - fragment_ctx->send_report(true); + // fragment_ctx->send_report(true); _try_close_task(task, PipelineTaskState::CANCELED); continue; } @@ -276,7 +276,6 @@ void TaskScheduler::_do_work(size_t index) { // exec failed,cancel all fragment instance fragment_ctx->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, status.to_string()); - fragment_ctx->send_report(true); _try_close_task(task, PipelineTaskState::CANCELED, status); continue; } diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 03413e7fa699f2..9a916daf4e8f80 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -59,7 +59,6 @@ #include "gutil/strings/substitute.h" #include "io/fs/stream_load_pipe.h" #include "opentelemetry/trace/scope.h" -#include "pipeline/pipeline_context_report_executor.h" #include "pipeline/pipeline_fragment_context.h" #include "runtime/client_cache.h" #include "runtime/descriptors.h" @@ -139,7 +138,12 @@ FragmentMgr::FragmentMgr(ExecEnv* exec_env) [this]() { return _thread_pool->get_queue_size(); }); CHECK(s.ok()) << s.to_string(); - _pipeline_ctx_report_executor = std::make_unique(); + s = ThreadPoolBuilder("FragmentInstanceReportThreadPool") + .set_min_threads(48) + .set_max_threads(512) + .set_max_queue_size(102400) + .build(&_async_report_thread_pool); + CHECK(s.ok()) << s.to_string(); } FragmentMgr::~FragmentMgr() = default; @@ -165,7 +169,7 @@ void FragmentMgr::stop() { } _pipeline_map.clear(); } - _pipeline_ctx_report_executor->close(); + _async_report_thread_pool->shutdown(); } std::string FragmentMgr::to_http_path(const std::string& file_name) { @@ -177,10 +181,13 @@ std::string FragmentMgr::to_http_path(const std::string& file_name) { } Status FragmentMgr::trigger_pipeline_context_report( - const ReportStatusRequest& req, std::shared_ptr&& ctx) { - return _pipeline_ctx_report_executor->submit_report_task( - {req.fragment_instance_id, [this, req]() { coordinator_callback(req); }, ctx}, - !req.done); + const ReportStatusRequest req, std::shared_ptr&& ctx) { + return _async_report_thread_pool->submit_func([this, req, ctx]() { + coordinator_callback(req); + if (!req.done) { + ctx->refresh_next_report_time(); + } + }); } // There can only be one of these callbacks in-flight at any moment, because @@ -550,7 +557,7 @@ void FragmentMgr::remove_pipeline_context( f_context->instance_ids(ins_ids); bool all_done = q_context->countdown(ins_ids.size()); for (const auto& ins_id : ins_ids) { - LOG(INFO) << "remove pipeline context " << print_id(ins_id); + VLOG_DEBUG << "remove pipeline context " << print_id(ins_id) << ", all_done:" << all_done; _pipeline_map.erase(ins_id); } if (all_done) { @@ -786,8 +793,9 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, std::shared_ptr context = std::make_shared( query_ctx->query_id(), params.fragment_id, query_ctx, _exec_env, cb, - std::bind(std::mem_fn(&FragmentMgr::trigger_pipeline_context_report), - this, std::placeholders::_1, std::placeholders::_2), + std::bind( + std::mem_fn(&FragmentMgr::trigger_pipeline_context_report), this, + std::placeholders::_1, std::placeholders::_2), params.group_commit); { SCOPED_RAW_TIMER(&duration_ns); @@ -864,7 +872,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, std::make_shared( query_ctx->query_id(), fragment_instance_id, params.fragment_id, local_params.backend_num, query_ctx, _exec_env, cb, - std::bind( + std::bind( std::mem_fn(&FragmentMgr::trigger_pipeline_context_report), this, std::placeholders::_1, std::placeholders::_2)); { diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 20817fefa4d30e..9d7b482515e6be 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -101,7 +101,7 @@ class FragmentMgr : public RestMonitorIface { const PPlanFragmentCancelReason& reason, const std::unique_lock& state_lock, const std::string& msg = ""); - Status trigger_pipeline_context_report(const ReportStatusRequest&, + Status trigger_pipeline_context_report(const ReportStatusRequest, std::shared_ptr&&); // Pipeline version, cancel a fragment instance. @@ -207,7 +207,7 @@ class FragmentMgr : public RestMonitorIface { UIntGauge* timeout_canceled_fragment_count = nullptr; RuntimeFilterMergeController _runtimefilter_controller; - std::unique_ptr _pipeline_ctx_report_executor; + std::unique_ptr _async_report_thread_pool; // used for pipeliine context report }; } // namespace doris diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 59a89b44af3a50..062334ac61b423 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -594,7 +594,8 @@ void PlanFragmentExecutor::cancel(const PPlanFragmentCancelReason& reason, const .tag("reason", reason) .tag("error message", msg); if (_runtime_state->is_cancelled()) { - LOG(INFO) << "instance is already cancelled, skip cancel again"; + LOG(INFO) << "instance << " << print_id(_runtime_state->fragment_instance_id()) + << "is already cancelled, skip cancel again"; return; } DCHECK(_prepared); @@ -608,7 +609,8 @@ void PlanFragmentExecutor::cancel(const PPlanFragmentCancelReason& reason, const _query_ctx->set_ready_to_execute(true); // must close stream_mgr to avoid dead lock in Exchange Node - _exec_env->vstream_mgr()->cancel(_fragment_instance_id); + Status cancel_status = Status::Cancelled(msg); + _exec_env->vstream_mgr()->cancel(_fragment_instance_id, cancel_status); // Cancel the result queue manager used by spark doris connector _exec_env->result_queue_mgr()->update_queue_status(_fragment_instance_id, Status::Aborted(msg)); #ifndef BE_TEST diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 95541c0e079f80..2791291bf4d8ca 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -97,7 +97,9 @@ class QueryContext { // Notice. For load fragments, the fragment_num sent by FE has a small probability of 0. // this may be a bug, bug <= 1 in theory it shouldn't cause any problems at this stage. - bool countdown(int instance_num) { return fragment_num.fetch_sub(instance_num) <= 1; } + bool countdown(int instance_num) { + return fragment_num.fetch_sub(instance_num) <= instance_num; + } ExecEnv* exec_env() { return _exec_env; } @@ -137,10 +139,10 @@ class QueryContext { if (_is_cancelled) { return false; } + set_exec_status(new_status); _is_cancelled.store(v); set_ready_to_execute(true); - set_exec_status(new_status); return true; } diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 1d5fa990bb9811..ec18e83eabd783 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -305,6 +305,13 @@ void RuntimeState::get_unreported_errors(std::vector* new_errors) { } } +Status RuntimeState::query_status() { + auto st = _query_ctx->exec_status(); + RETURN_IF_ERROR(st); + std::lock_guard l(_process_status_lock); + return _process_status; +} + bool RuntimeState::is_cancelled() const { return _is_cancelled.load() || (_query_ctx && _query_ctx->is_cancelled()); } diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 87f0f45b6aaf18..02c048052a4595 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -151,10 +151,7 @@ class RuntimeState { _query_options.enable_common_expr_pushdown; } - Status query_status() { - std::lock_guard l(_process_status_lock); - return _process_status; - } + Status query_status(); // Appends error to the _error_log if there is space bool log_error(const std::string& error); diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp b/be/src/vec/runtime/vdata_stream_mgr.cpp index ad161828f905a6..230d5fb715f1ff 100644 --- a/be/src/vec/runtime/vdata_stream_mgr.cpp +++ b/be/src/vec/runtime/vdata_stream_mgr.cpp @@ -126,7 +126,9 @@ Status VDataStreamMgr::transmit_block(const PTransmitDataParams* request, } if (eos) { - recvr->remove_sender(request->sender_id(), request->be_number()); + Status exec_status = + request->has_exec_status() ? Status::create(request->exec_status()) : Status::OK(); + recvr->remove_sender(request->sender_id(), request->be_number(), exec_status); } return Status::OK(); } @@ -156,7 +158,8 @@ Status VDataStreamMgr::deregister_recvr(const TUniqueId& fragment_instance_id, P // Notify concurrent add_data() requests that the stream has been terminated. // cancel_stream maybe take a long time, so we handle it out of lock. if (targert_recvr) { - targert_recvr->cancel_stream(); + Status status = Status::OK(); + targert_recvr->cancel_stream(status); return Status::OK(); } else { std::stringstream err; @@ -167,7 +170,7 @@ Status VDataStreamMgr::deregister_recvr(const TUniqueId& fragment_instance_id, P } } -void VDataStreamMgr::cancel(const TUniqueId& fragment_instance_id) { +void VDataStreamMgr::cancel(const TUniqueId& fragment_instance_id, Status& exec_status) { VLOG_QUERY << "cancelling all streams for fragment=" << fragment_instance_id; std::vector> recvrs; { @@ -191,7 +194,7 @@ void VDataStreamMgr::cancel(const TUniqueId& fragment_instance_id) { // cancel_stream maybe take a long time, so we handle it out of lock. for (auto& it : recvrs) { - it->cancel_stream(); + it->cancel_stream(exec_status); } } diff --git a/be/src/vec/runtime/vdata_stream_mgr.h b/be/src/vec/runtime/vdata_stream_mgr.h index ca0e7ab4b741e2..8e39c1ebebbee0 100644 --- a/be/src/vec/runtime/vdata_stream_mgr.h +++ b/be/src/vec/runtime/vdata_stream_mgr.h @@ -63,7 +63,7 @@ class VDataStreamMgr { Status transmit_block(const PTransmitDataParams* request, ::google::protobuf::Closure** done); - void cancel(const TUniqueId& fragment_instance_id); + void cancel(const TUniqueId& fragment_instance_id, Status& exec_status); private: std::mutex _lock; diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index dd3e1cee29b6a4..59db1e70df4d06 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -46,7 +46,9 @@ VDataStreamRecvr::SenderQueue::SenderQueue(VDataStreamRecvr* parent_recvr, int n : _recvr(parent_recvr), _is_cancelled(false), _num_remaining_senders(num_senders), - _received_first_batch(false) {} + _received_first_batch(false) { + _cancel_status = Status::OK(); +} VDataStreamRecvr::SenderQueue::~SenderQueue() { // Check pending closures, if it is not empty, should clear it here. but it should not happen. @@ -81,6 +83,7 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block* block, bool* eos) { Status VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block, bool* eos) { if (_is_cancelled) { + RETURN_IF_ERROR(_cancel_status); return Status::Cancelled("Cancelled"); } @@ -270,13 +273,14 @@ void VDataStreamRecvr::SenderQueue::decrement_senders(int be_number) { } } -void VDataStreamRecvr::SenderQueue::cancel() { +void VDataStreamRecvr::SenderQueue::cancel(Status& cancel_status) { { std::lock_guard l(_lock); if (_is_cancelled) { return; } _is_cancelled = true; + _cancel_status = cancel_status; if (_dependency) { _dependency->set_always_done(); } @@ -444,14 +448,18 @@ Status VDataStreamRecvr::get_next(Block* block, bool* eos) { } } -void VDataStreamRecvr::remove_sender(int sender_id, int be_number) { +void VDataStreamRecvr::remove_sender(int sender_id, int be_number, Status& exec_status) { + if (!exec_status.ok()) { + cancel_stream(exec_status); + return; + } int use_sender_id = _is_merging ? sender_id : 0; _sender_queues[use_sender_id]->decrement_senders(be_number); } -void VDataStreamRecvr::cancel_stream() { +void VDataStreamRecvr::cancel_stream(Status& exec_status) { for (int i = 0; i < _sender_queues.size(); ++i) { - _sender_queues[i]->cancel(); + _sender_queues[i]->cancel(exec_status); } } diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index d0b009b22fd930..ec05cd3a567497 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -106,9 +106,9 @@ class VDataStreamRecvr { // Indicate that a particular sender is done. Delegated to the appropriate // sender queue. Called from DataStreamMgr. - void remove_sender(int sender_id, int be_number); + void remove_sender(int sender_id, int be_number, Status& exec_status); - void cancel_stream(); + void cancel_stream(Status& exec_status); void close(); @@ -209,7 +209,7 @@ class VDataStreamRecvr::SenderQueue { void decrement_senders(int sender_id); - void cancel(); + void cancel(Status& cancel_status); void close(); @@ -233,6 +233,7 @@ class VDataStreamRecvr::SenderQueue { VDataStreamRecvr* _recvr; std::mutex _lock; bool _is_cancelled; + Status _cancel_status; int _num_remaining_senders; std::condition_variable _data_arrival_cv; std::condition_variable _data_removal_cv; diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 263a063590757e..9ea0b73cbd0765 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -109,23 +109,23 @@ Status Channel::init(RuntimeState* state) { } template -Status Channel::send_current_block(bool eos) { +Status Channel::send_current_block(bool eos, Status& exec_status) { // FIXME: Now, local exchange will cause the performance problem is in a multi-threaded scenario // so this feature is turned off here by default. We need to re-examine this logic if (is_local()) { - return send_local_block(eos); + return send_local_block(exec_status, eos); } SCOPED_CONSUME_MEM_TRACKER(_parent->mem_tracker()); if (eos) { RETURN_IF_ERROR(_serializer.serialize_block(_ch_cur_pb_block, 1)); } - RETURN_IF_ERROR(send_block(_ch_cur_pb_block, eos)); + RETURN_IF_ERROR(send_remote_block(_ch_cur_pb_block, eos, exec_status)); ch_roll_pb_block(); return Status::OK(); } template -Status Channel::send_local_block(bool eos) { +Status Channel::send_local_block(Status exec_status, bool eos) { if constexpr (!std::is_same_v) { SCOPED_TIMER(_parent->local_send_timer()); } @@ -140,7 +140,7 @@ Status Channel::send_local_block(bool eos) { _local_recvr->add_block(&block, _parent->sender_id(), true); if (eos) { - _local_recvr->remove_sender(_parent->sender_id(), _be_number); + _local_recvr->remove_sender(_parent->sender_id(), _be_number, exec_status); } return Status::OK(); } else { @@ -168,7 +168,7 @@ Status Channel::send_local_block(Block* block) { } template -Status Channel::send_block(PBlock* block, bool eos) { +Status Channel::send_remote_block(PBlock* block, bool eos, Status exec_status) { if constexpr (!std::is_same_v) { SCOPED_TIMER(_parent->brpc_send_timer()); COUNTER_UPDATE(_parent->blocks_sent_counter(), 1); @@ -182,7 +182,7 @@ Status Channel::send_block(PBlock* block, bool eos) { SCOPED_TRACK_MEMORY_TO_UNKNOWN(); _closure->cntl.Reset(); } - VLOG_ROW << "Channel::send_batch() instance_id=" << _fragment_instance_id + VLOG_ROW << "Channel::send_batch() instance_id=" << print_id(_fragment_instance_id) << " dest_node=" << _dest_node_id << " to_host=" << _brpc_dest_addr.hostname << " _packet_seq=" << _packet_seq << " row_desc=" << _row_desc.debug_string(); if (_is_transfer_chain && (_send_query_statistics_with_every_batch || eos)) { @@ -191,6 +191,9 @@ Status Channel::send_block(PBlock* block, bool eos) { } _brpc_request.set_eos(eos); + if (!exec_status.ok()) { + exec_status.to_protobuf(_brpc_request.mutable_exec_status()); // should release??? + } if (block != nullptr) { _brpc_request.set_allocated_block(block); } @@ -228,7 +231,8 @@ Status Channel::add_rows(Block* block, const std::vector& rows, boo RETURN_IF_ERROR( _serializer.next_serialized_block(block, _ch_cur_pb_block, 1, &serialized, eos, &rows)); if (serialized) { - RETURN_IF_ERROR(send_current_block(false)); + Status exec_status = Status::OK(); + RETURN_IF_ERROR(send_current_block(false, exec_status)); } return Status::OK(); @@ -251,29 +255,29 @@ Status Channel::close_wait(RuntimeState* state) { } template -Status Channel::close_internal() { +Status Channel::close_internal(Status& exec_status) { if (!_need_close) { return Status::OK(); } - VLOG_RPC << "Channel::close() instance_id=" << _fragment_instance_id + VLOG_RPC << "Channel::close_internal() instance_id=" << print_id(_fragment_instance_id) << " dest_node=" << _dest_node_id << " #rows= " << ((_serializer.get_block() == nullptr) ? 0 : _serializer.get_block()->rows()) - << " receiver status: " << _receiver_status; + << " receiver status: " << _receiver_status << ", exec_status: " << exec_status; if (is_receiver_eof()) { _serializer.reset_block(); return Status::OK(); } Status status; if (_serializer.get_block() != nullptr && _serializer.get_block()->rows() > 0) { - status = send_current_block(true); + status = send_current_block(true, exec_status); } else { SCOPED_CONSUME_MEM_TRACKER(_parent->mem_tracker()); if (is_local()) { if (_recvr_is_valid()) { - _local_recvr->remove_sender(_parent->sender_id(), _be_number); + _local_recvr->remove_sender(_parent->sender_id(), _be_number, exec_status); } } else { - status = send_block((PBlock*)nullptr, true); + status = send_remote_block((PBlock*)nullptr, true, exec_status); } } // Don't wait for the last packet to finish, left it to close_wait. @@ -285,13 +289,13 @@ Status Channel::close_internal() { } template -Status Channel::close(RuntimeState* state) { +Status Channel::close(RuntimeState* state, Status& exec_status) { if (_closed) { return Status::OK(); } _closed = true; - Status st = close_internal(); + Status st = close_internal(exec_status); if (!st.ok()) { state->log_error(st.to_string()); } @@ -497,7 +501,8 @@ template void VDataStreamSender::_handle_eof_channel(RuntimeState* state, ChannelPtrType channel, Status st) { channel->set_receiver_eof(st); - channel->close(state); + Status ok = Status::OK(); + channel->close(state, ok); } Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { @@ -551,7 +556,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { status = channel->send_local_block(&cur_block); } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); - status = channel->send_block(block_holder, nullptr, eos); + status = channel->send_broadcast_block(block_holder, nullptr, eos); } HANDLE_CHANNEL_STATUS(state, channel, status); } @@ -578,7 +583,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { status = channel->send_local_block(&cur_block); } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); - status = channel->send_block(_cur_pb_block, false); + status = channel->send_remote_block(_cur_pb_block, false); } HANDLE_CHANNEL_STATUS(state, channel, status); } @@ -600,7 +605,8 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); RETURN_IF_ERROR( _serializer.serialize_block(block, current_channel->ch_cur_pb_block())); - auto status = current_channel->send_block(current_channel->ch_cur_pb_block(), eos); + auto status = + current_channel->send_remote_block(current_channel->ch_cur_pb_block(), eos); HANDLE_CHANNEL_STATUS(state, current_channel, status); current_channel->ch_roll_pb_block(); } @@ -682,7 +688,7 @@ Status VDataStreamSender::try_close(RuntimeState* state, Status exec_status) { _serializer.reset_block(); Status final_st = Status::OK(); for (int i = 0; i < _channels.size(); ++i) { - Status st = _channels[i]->close(state); + Status st = _channels[i]->close(state, exec_status); if (!st.ok() && final_st.ok()) { final_st = st; } @@ -711,7 +717,7 @@ Status VDataStreamSender::close(RuntimeState* state, Status exec_status) { status = channel->send_local_block(&block); } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); - status = channel->send_block(_cur_pb_block, false); + status = channel->send_remote_block(_cur_pb_block, false); } HANDLE_CHANNEL_STATUS(state, channel, status); } @@ -719,7 +725,7 @@ Status VDataStreamSender::close(RuntimeState* state, Status exec_status) { } } for (int i = 0; i < _channels.size(); ++i) { - Status st = _channels[i]->close(state); + Status st = _channels[i]->close(state, exec_status); if (!st.ok() && final_st.ok()) { final_st = st; } diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 1698e5e564e3b1..c7067de3d7d0fa 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -273,25 +273,26 @@ class Channel { // Returns the status of the most recently finished transmit_data // rpc (or OK if there wasn't one that hasn't been reported yet). // if batch is nullptr, send the eof packet - virtual Status send_block(PBlock* block, bool eos = false); + virtual Status send_remote_block(PBlock* block, bool eos = false, + Status exec_status = Status::OK()); - virtual Status send_block(BroadcastPBlockHolder* block, [[maybe_unused]] bool* sent, - bool eos = false) { + virtual Status send_broadcast_block(BroadcastPBlockHolder* block, [[maybe_unused]] bool* sent, + bool eos = false) { return Status::InternalError("Send BroadcastPBlockHolder is not allowed!"); } virtual Status add_rows(Block* block, const std::vector& row, bool eos); - virtual Status send_current_block(bool eos); + virtual Status send_current_block(bool eos, Status& exec_status); - Status send_local_block(bool eos = false); + Status send_local_block(Status exec_status, bool eos = false); Status send_local_block(Block* block); // Flush buffered rows and close channel. This function don't wait the response // of close operation, client should call close_wait() to finish channel's close. // We split one close operation into two phases in order to make multiple channels // can run parallel. - Status close(RuntimeState* state); + Status close(RuntimeState* state, Status& exec_status); // Get close wait's response, to finish channel close operation. Status close_wait(RuntimeState* state); @@ -362,7 +363,7 @@ class Channel { // Serialize _batch into _thrift_batch and send via send_batch(). // Returns send_batch() status. Status send_current_batch(bool eos = false); - Status close_internal(); + Status close_internal(Status& exec_status); Parent* _parent; @@ -476,7 +477,8 @@ class PipChannel final : public Channel { // Returns the status of the most recently finished transmit_data // rpc (or OK if there wasn't one that hasn't been reported yet). // if batch is nullptr, send the eof packet - Status send_block(PBlock* block, bool eos = false) override { + Status send_remote_block(PBlock* block, bool eos = false, + Status exec_status = Status::OK()) override { COUNTER_UPDATE(Channel::_parent->blocks_sent_counter(), 1); std::unique_ptr pblock_ptr; pblock_ptr.reset(block); @@ -489,13 +491,13 @@ class PipChannel final : public Channel { } } if (eos || block->column_metas_size()) { - RETURN_IF_ERROR(_buffer->add_block({this, std::move(pblock_ptr), eos})); + RETURN_IF_ERROR(_buffer->add_block({this, std::move(pblock_ptr), eos, exec_status})); } return Status::OK(); } - Status send_block(BroadcastPBlockHolder* block, [[maybe_unused]] bool* sent, - bool eos = false) override { + Status send_broadcast_block(BroadcastPBlockHolder* block, [[maybe_unused]] bool* sent, + bool eos = false) override { COUNTER_UPDATE(Channel::_parent->blocks_sent_counter(), 1); if (eos) { if (_eos_send) { @@ -520,19 +522,20 @@ class PipChannel final : public Channel { RETURN_IF_ERROR(Channel::_serializer.next_serialized_block( block, _pblock.get(), 1, &serialized, eos, &rows)); if (serialized) { - RETURN_IF_ERROR(send_current_block(eos)); + Status exec_status = Status::OK(); + RETURN_IF_ERROR(send_current_block(eos, exec_status)); } return Status::OK(); } // send _mutable_block - Status send_current_block(bool eos) override { + Status send_current_block(bool eos, Status& exec_status) override { if (Channel::is_local()) { - return Channel::send_local_block(eos); + return Channel::send_local_block(exec_status, eos); } SCOPED_CONSUME_MEM_TRACKER(Channel::_parent->mem_tracker()); - RETURN_IF_ERROR(send_block(_pblock.release(), eos)); + RETURN_IF_ERROR(send_remote_block(_pblock.release(), eos, exec_status)); return Status::OK(); } diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 878544e74b0bd4..c757315f0d072a 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -49,6 +49,7 @@ message PTransmitDataParams { // transfer the RowBatch to the Controller Attachment optional bool transfer_by_attachment = 10 [default = false]; optional PUniqueId query_id = 11; + optional PStatus exec_status = 12; }; message PTransmitDataResult { From bc00bc807a3ed55f0bf61e5a5b3f71b092971961 Mon Sep 17 00:00:00 2001 From: liulijia Date: Mon, 25 Sep 2023 20:51:18 +0800 Subject: [PATCH 4/8] fix pipeline x --- be/src/pipeline/exec/result_sink_operator.cpp | 3 ++- be/src/vec/runtime/vdata_stream_recvr.cpp | 6 +++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index 424d75b1d9a71a..2c645d4c07a7f5 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -181,7 +181,8 @@ Status ResultSinkLocalState::close(RuntimeState* state, Status exec_status) { COUNTER_UPDATE(profile()->total_time_counter(), _cancel_dependency->write_watcher_elapse_time()); SCOPED_TIMER(profile()->total_time_counter()); - Status final_status = Status::OK(); + Status exec_status = state->query_status(); + Status final_status = exec_status; if (_writer) { // close the writer Status st = _writer->close(); diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 59db1e70df4d06..09bd6262c5d9a1 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -284,7 +284,8 @@ void VDataStreamRecvr::SenderQueue::cancel(Status& cancel_status) { if (_dependency) { _dependency->set_always_done(); } - VLOG_QUERY << "cancelled stream: _fragment_instance_id=" << _recvr->fragment_instance_id() + VLOG_QUERY << "cancelled stream: _fragment_instance_id=" + << print_id(_recvr->fragment_instance_id()) << " node_id=" << _recvr->dest_node_id(); } // Wake up all threads waiting to produce/consume batches. They will all @@ -458,6 +459,9 @@ void VDataStreamRecvr::remove_sender(int sender_id, int be_number, Status& exec_ } void VDataStreamRecvr::cancel_stream(Status& exec_status) { + VLOG_QUERY << "cancel_stream: fragment_instance_id=" << print_id(_fragment_instance_id) + << exec_status; + for (int i = 0; i < _sender_queues.size(); ++i) { _sender_queues[i]->cancel(exec_status); } From f90b9006029095f9d5529d6853ef676f323ee50a Mon Sep 17 00:00:00 2001 From: liulijia Date: Mon, 25 Sep 2023 21:05:00 +0800 Subject: [PATCH 5/8] fix code style --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 3 ++- be/src/vec/sink/vdata_stream_sender.cpp | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 5877ae4de6c67d..f1ac7f690d705c 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -226,7 +226,8 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { brpc_request->set_allocated_block(request.block.get()); } if (!request.exec_status.ok()) { - request.exec_status.to_protobuf(brpc_request->mutable_exec_status()); // should release??? + // should release exec_status of brpc_request? + request.exec_status.to_protobuf(brpc_request->mutable_exec_status()); } auto* closure = request.channel->get_closure(id, request.eos, nullptr); diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 9ea0b73cbd0765..e5d6025c92840d 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -192,7 +192,8 @@ Status Channel::send_remote_block(PBlock* block, bool eos, Status exec_s _brpc_request.set_eos(eos); if (!exec_status.ok()) { - exec_status.to_protobuf(_brpc_request.mutable_exec_status()); // should release??? + // should release exec_status of brpc_request? + exec_status.to_protobuf(_brpc_request.mutable_exec_status()); } if (block != nullptr) { _brpc_request.set_allocated_block(block); From 0bf7de3cca5c017c86090d8cfe9d8f0e740c168d Mon Sep 17 00:00:00 2001 From: Liulijia Date: Tue, 26 Sep 2023 04:32:36 +0800 Subject: [PATCH 6/8] fix compile --- be/src/pipeline/exec/exchange_sink_operator.cpp | 2 +- be/src/pipeline/exec/result_file_sink_operator.cpp | 6 +++--- be/src/pipeline/exec/result_sink_operator.cpp | 1 - be/src/pipeline/task_scheduler.cpp | 7 +++++-- 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 2958529e993548..3027b5f8d67b2f 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -521,7 +521,7 @@ Status ExchangeSinkOperatorX::try_close(RuntimeState* state, Status exec_status) CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state); local_state._serializer.reset_block(); Status final_st = Status::OK(); - auto exec_status = state->query_status(); + Status final_status = exec_status; for (int i = 0; i < local_state.channels.size(); ++i) { Status st = local_state.channels[i]->close(state, exec_status); if (!st.ok() && final_st.ok()) { diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp b/be/src/pipeline/exec/result_file_sink_operator.cpp index 26f8fed7319a1f..955a5a8a02b6ee 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.cpp +++ b/be/src/pipeline/exec/result_file_sink_operator.cpp @@ -234,8 +234,8 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, Status exec_status) status = channel->send_local_block(&cur_block); } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); - status = - channel->send_block(_block_holder.get(), nullptr, true); + status = channel->send_broadcast_block(_block_holder.get(), + nullptr, true); } HANDLE_CHANNEL_STATUS(state, channel, status); } @@ -256,7 +256,7 @@ template void ResultFileSinkLocalState::_handle_eof_channel(RuntimeState* state, ChannelPtrType channel, Status st) { channel->set_receiver_eof(st); - channel->close(state); + channel->close(state, st); } Status ResultFileSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block, diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index 2c645d4c07a7f5..bba54559fefe18 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -181,7 +181,6 @@ Status ResultSinkLocalState::close(RuntimeState* state, Status exec_status) { COUNTER_UPDATE(profile()->total_time_counter(), _cancel_dependency->write_watcher_elapse_time()); SCOPED_TIMER(profile()->total_time_counter()); - Status exec_status = state->query_status(); Status final_status = exec_status; if (_writer) { // close the writer diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 69c402ffa8ff27..c7fd235db2cc7d 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -231,8 +231,10 @@ void TaskScheduler::_do_work(size_t index) { auto check_state = task->get_state(); if (check_state == PipelineTaskState::PENDING_FINISH) { DCHECK(!task->is_pending_finish()) << "must not pending close " << task->debug_string(); + Status exec_status = fragment_ctx->get_query_context()->exec_status(); _try_close_task(task, - canceled ? PipelineTaskState::CANCELED : PipelineTaskState::FINISHED); + canceled ? PipelineTaskState::CANCELED : PipelineTaskState::FINISHED, + exec_status); continue; } DCHECK(check_state != PipelineTaskState::FINISHED && @@ -246,7 +248,8 @@ void TaskScheduler::_do_work(size_t index) { // If pipeline is canceled caused by memory limit, we should send report to FE in order // to cancel all pipeline tasks in this query // fragment_ctx->send_report(true); - _try_close_task(task, PipelineTaskState::CANCELED); + Status cancel_status = fragment_ctx->get_query_context()->exec_status(); + _try_close_task(task, PipelineTaskState::CANCELED, cancel_status); continue; } From 4b0b2466a6a4289e56a384c58b9171c60059ccd0 Mon Sep 17 00:00:00 2001 From: liulijia Date: Tue, 26 Sep 2023 14:02:34 +0800 Subject: [PATCH 7/8] fix memory_order --- be/src/pipeline/pipeline_fragment_context.cpp | 8 ++++---- .../pipeline/pipeline_x/pipeline_x_fragment_context.cpp | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 1b7f996408bb81..c9e0ec0298d46c 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -379,9 +379,9 @@ void PipelineFragmentContext::_init_next_report_time() { } void PipelineFragmentContext::refresh_next_report_time() { - auto disable = _disable_period_report.load(std::memory_order_acq_rel); + auto disable = _disable_period_report.load(std::memory_order_acquire); DCHECK(disable == true); - _previous_report_time = MonotonicNanos(); + _previous_report_time.store(MonotonicNanos(), std::memory_order_release); _disable_period_report.compare_exchange_strong(disable, false); } @@ -389,7 +389,7 @@ void PipelineFragmentContext::trigger_report_if_necessary() { if (!_is_report_success) { return; } - auto disable = _disable_period_report.load(std::memory_order_acq_rel); + auto disable = _disable_period_report.load(std::memory_order_acquire); if (disable) { return; } @@ -399,7 +399,7 @@ void PipelineFragmentContext::trigger_report_if_necessary() { << "config::status_report_interval is equal to or less than zero, do not trigger " "report."; } - uint64_t next_report_time = _previous_report_time.load(std::memory_order_acq_rel) + + uint64_t next_report_time = _previous_report_time.load(std::memory_order_acquire) + (uint64_t)(interval_s)*NANOS_PER_SEC; if (MonotonicNanos() > next_report_time) { if (!_disable_period_report.compare_exchange_strong(disable, true, diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index e9e519b1d8a746..c3685b0d7c74fa 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -827,7 +827,7 @@ Status PipelineXFragmentContext::send_report(bool done) { runtime_states[i] = _runtime_states[i].get(); } - _report_status_cb( + return _report_status_cb( {true, exec_status, runtime_states, nullptr, nullptr, done || !exec_status.ok(), _query_ctx->coord_addr, _query_id, _fragment_id, TUniqueId(), _backend_num, _runtime_state.get(), From 3839fc6e7788d659ee599e8664214badfe3baf94 Mon Sep 17 00:00:00 2001 From: liulijia Date: Wed, 27 Sep 2023 20:44:07 +0800 Subject: [PATCH 8/8] fix cr --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 1 - be/src/pipeline/exec/exchange_sink_operator.cpp | 4 ++-- be/src/pipeline/exec/result_file_sink_operator.cpp | 3 ++- be/src/pipeline/pipeline_fragment_context.cpp | 4 +--- be/src/pipeline/task_scheduler.cpp | 4 ++-- be/src/runtime/fragment_mgr.h | 2 -- be/src/runtime/plan_fragment_executor.cpp | 3 +-- be/src/vec/runtime/vdata_stream_mgr.cpp | 5 ++--- be/src/vec/runtime/vdata_stream_mgr.h | 2 +- be/src/vec/runtime/vdata_stream_recvr.cpp | 6 +++--- be/src/vec/runtime/vdata_stream_recvr.h | 6 +++--- be/src/vec/sink/vdata_stream_sender.cpp | 14 ++++++-------- be/src/vec/sink/vdata_stream_sender.h | 8 ++++---- 13 files changed, 27 insertions(+), 35 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index f1ac7f690d705c..85e37ee4a9b54b 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -226,7 +226,6 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { brpc_request->set_allocated_block(request.block.get()); } if (!request.exec_status.ok()) { - // should release exec_status of brpc_request? request.exec_status.to_protobuf(brpc_request->mutable_exec_status()); } auto* closure = request.channel->get_closure(id, request.eos, nullptr); diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 3027b5f8d67b2f..5d339a7318cf52 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -278,8 +278,8 @@ template void ExchangeSinkOperatorX::_handle_eof_channel(RuntimeState* state, ChannelPtrType channel, Status st) { channel->set_receiver_eof(st); - Status ok = Status::OK(); - channel->close(state, ok); + // Chanel will not send RPC to the downstream when eof, so close chanel by OK status. + channel->close(state, Status::OK()); } Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block, diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp b/be/src/pipeline/exec/result_file_sink_operator.cpp index 955a5a8a02b6ee..2e375d6908733a 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.cpp +++ b/be/src/pipeline/exec/result_file_sink_operator.cpp @@ -256,7 +256,8 @@ template void ResultFileSinkLocalState::_handle_eof_channel(RuntimeState* state, ChannelPtrType channel, Status st) { channel->set_receiver_eof(st); - channel->close(state, st); + // Chanel will not send RPC to the downstream when eof, so close chanel by OK status. + channel->close(state, Status::OK()); } Status ResultFileSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block, diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index c9e0ec0298d46c..7d4b091cda398d 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -179,8 +179,7 @@ void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason, // must close stream_mgr to avoid dead lock in Exchange Node // TODO bug llj fix this other instance will not cancel - Status cancel_status = Status::Cancelled(msg); - _exec_env->vstream_mgr()->cancel(_fragment_instance_id, cancel_status); + _exec_env->vstream_mgr()->cancel(_fragment_instance_id, Status::Cancelled(msg)); // Cancel the result queue manager used by spark doris connector // TODO pipeline incomp // _exec_env->result_queue_mgr()->update_queue_status(id, Status::Aborted(msg)); @@ -724,7 +723,6 @@ void PipelineFragmentContext::close_sink() { } void PipelineFragmentContext::close_if_prepare_failed() { - LOG(WARNING) << "close PipelineFragmentContext because prepare failed"; if (_tasks.empty()) { if (_root_plan) { _root_plan->close(_runtime_state.get()); diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index c7fd235db2cc7d..30079126c2daa0 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -245,8 +245,8 @@ void TaskScheduler::_do_work(size_t index) { // may change from pending FINISH,should called cancel // also may change form BLOCK, other task called cancel - // If pipeline is canceled caused by memory limit, we should send report to FE in order - // to cancel all pipeline tasks in this query + // If pipeline is canceled, it will report after pipeline closed, and will propagate + // errors to downstream through exchange. So, here we needn't send_report. // fragment_ctx->send_report(true); Status cancel_status = fragment_ctx->get_query_context()->exec_status(); _try_close_task(task, PipelineTaskState::CANCELED, cancel_status); diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 9d7b482515e6be..395b9b546cf23f 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -48,8 +48,6 @@ namespace doris { namespace pipeline { class PipelineFragmentContext; class PipelineXFragmentContext; -struct ReportTask; -class PipelineContextReportExecutor; } // namespace pipeline class QueryContext; class ExecEnv; diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 062334ac61b423..4cec558fc5fb1f 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -609,8 +609,7 @@ void PlanFragmentExecutor::cancel(const PPlanFragmentCancelReason& reason, const _query_ctx->set_ready_to_execute(true); // must close stream_mgr to avoid dead lock in Exchange Node - Status cancel_status = Status::Cancelled(msg); - _exec_env->vstream_mgr()->cancel(_fragment_instance_id, cancel_status); + _exec_env->vstream_mgr()->cancel(_fragment_instance_id, Status::Cancelled(msg)); // Cancel the result queue manager used by spark doris connector _exec_env->result_queue_mgr()->update_queue_status(_fragment_instance_id, Status::Aborted(msg)); #ifndef BE_TEST diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp b/be/src/vec/runtime/vdata_stream_mgr.cpp index 230d5fb715f1ff..70bdb1baff1542 100644 --- a/be/src/vec/runtime/vdata_stream_mgr.cpp +++ b/be/src/vec/runtime/vdata_stream_mgr.cpp @@ -158,8 +158,7 @@ Status VDataStreamMgr::deregister_recvr(const TUniqueId& fragment_instance_id, P // Notify concurrent add_data() requests that the stream has been terminated. // cancel_stream maybe take a long time, so we handle it out of lock. if (targert_recvr) { - Status status = Status::OK(); - targert_recvr->cancel_stream(status); + targert_recvr->cancel_stream(Status::OK()); return Status::OK(); } else { std::stringstream err; @@ -170,7 +169,7 @@ Status VDataStreamMgr::deregister_recvr(const TUniqueId& fragment_instance_id, P } } -void VDataStreamMgr::cancel(const TUniqueId& fragment_instance_id, Status& exec_status) { +void VDataStreamMgr::cancel(const TUniqueId& fragment_instance_id, Status exec_status) { VLOG_QUERY << "cancelling all streams for fragment=" << fragment_instance_id; std::vector> recvrs; { diff --git a/be/src/vec/runtime/vdata_stream_mgr.h b/be/src/vec/runtime/vdata_stream_mgr.h index 8e39c1ebebbee0..d809ff96fbd6e9 100644 --- a/be/src/vec/runtime/vdata_stream_mgr.h +++ b/be/src/vec/runtime/vdata_stream_mgr.h @@ -63,7 +63,7 @@ class VDataStreamMgr { Status transmit_block(const PTransmitDataParams* request, ::google::protobuf::Closure** done); - void cancel(const TUniqueId& fragment_instance_id, Status& exec_status); + void cancel(const TUniqueId& fragment_instance_id, Status exec_status); private: std::mutex _lock; diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 09bd6262c5d9a1..d5abb50e0dcf01 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -273,7 +273,7 @@ void VDataStreamRecvr::SenderQueue::decrement_senders(int be_number) { } } -void VDataStreamRecvr::SenderQueue::cancel(Status& cancel_status) { +void VDataStreamRecvr::SenderQueue::cancel(Status cancel_status) { { std::lock_guard l(_lock); if (_is_cancelled) { @@ -449,7 +449,7 @@ Status VDataStreamRecvr::get_next(Block* block, bool* eos) { } } -void VDataStreamRecvr::remove_sender(int sender_id, int be_number, Status& exec_status) { +void VDataStreamRecvr::remove_sender(int sender_id, int be_number, Status exec_status) { if (!exec_status.ok()) { cancel_stream(exec_status); return; @@ -458,7 +458,7 @@ void VDataStreamRecvr::remove_sender(int sender_id, int be_number, Status& exec_ _sender_queues[use_sender_id]->decrement_senders(be_number); } -void VDataStreamRecvr::cancel_stream(Status& exec_status) { +void VDataStreamRecvr::cancel_stream(Status exec_status) { VLOG_QUERY << "cancel_stream: fragment_instance_id=" << print_id(_fragment_instance_id) << exec_status; diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index ec05cd3a567497..d8ab873f87aa50 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -106,9 +106,9 @@ class VDataStreamRecvr { // Indicate that a particular sender is done. Delegated to the appropriate // sender queue. Called from DataStreamMgr. - void remove_sender(int sender_id, int be_number, Status& exec_status); + void remove_sender(int sender_id, int be_number, Status exec_status); - void cancel_stream(Status& exec_status); + void cancel_stream(Status exec_status); void close(); @@ -209,7 +209,7 @@ class VDataStreamRecvr::SenderQueue { void decrement_senders(int sender_id); - void cancel(Status& cancel_status); + void cancel(Status cancel_status); void close(); diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index e5d6025c92840d..61d95538c94c21 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -109,7 +109,7 @@ Status Channel::init(RuntimeState* state) { } template -Status Channel::send_current_block(bool eos, Status& exec_status) { +Status Channel::send_current_block(bool eos, Status exec_status) { // FIXME: Now, local exchange will cause the performance problem is in a multi-threaded scenario // so this feature is turned off here by default. We need to re-examine this logic if (is_local()) { @@ -192,7 +192,6 @@ Status Channel::send_remote_block(PBlock* block, bool eos, Status exec_s _brpc_request.set_eos(eos); if (!exec_status.ok()) { - // should release exec_status of brpc_request? exec_status.to_protobuf(_brpc_request.mutable_exec_status()); } if (block != nullptr) { @@ -232,8 +231,7 @@ Status Channel::add_rows(Block* block, const std::vector& rows, boo RETURN_IF_ERROR( _serializer.next_serialized_block(block, _ch_cur_pb_block, 1, &serialized, eos, &rows)); if (serialized) { - Status exec_status = Status::OK(); - RETURN_IF_ERROR(send_current_block(false, exec_status)); + RETURN_IF_ERROR(send_current_block(false, Status::OK())); } return Status::OK(); @@ -256,7 +254,7 @@ Status Channel::close_wait(RuntimeState* state) { } template -Status Channel::close_internal(Status& exec_status) { +Status Channel::close_internal(Status exec_status) { if (!_need_close) { return Status::OK(); } @@ -290,7 +288,7 @@ Status Channel::close_internal(Status& exec_status) { } template -Status Channel::close(RuntimeState* state, Status& exec_status) { +Status Channel::close(RuntimeState* state, Status exec_status) { if (_closed) { return Status::OK(); } @@ -502,8 +500,8 @@ template void VDataStreamSender::_handle_eof_channel(RuntimeState* state, ChannelPtrType channel, Status st) { channel->set_receiver_eof(st); - Status ok = Status::OK(); - channel->close(state, ok); + // Chanel will not send RPC to the downstream when eof, so close chanel by OK status. + channel->close(state, Status::OK()); } Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index c7067de3d7d0fa..dfade831eab7a8 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -283,7 +283,7 @@ class Channel { virtual Status add_rows(Block* block, const std::vector& row, bool eos); - virtual Status send_current_block(bool eos, Status& exec_status); + virtual Status send_current_block(bool eos, Status exec_status); Status send_local_block(Status exec_status, bool eos = false); @@ -292,7 +292,7 @@ class Channel { // of close operation, client should call close_wait() to finish channel's close. // We split one close operation into two phases in order to make multiple channels // can run parallel. - Status close(RuntimeState* state, Status& exec_status); + Status close(RuntimeState* state, Status exec_status); // Get close wait's response, to finish channel close operation. Status close_wait(RuntimeState* state); @@ -363,7 +363,7 @@ class Channel { // Serialize _batch into _thrift_batch and send via send_batch(). // Returns send_batch() status. Status send_current_batch(bool eos = false); - Status close_internal(Status& exec_status); + Status close_internal(Status exec_status); Parent* _parent; @@ -530,7 +530,7 @@ class PipChannel final : public Channel { } // send _mutable_block - Status send_current_block(bool eos, Status& exec_status) override { + Status send_current_block(bool eos, Status exec_status) override { if (Channel::is_local()) { return Channel::send_local_block(exec_status, eos); }