Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ DEFINE_mBool(compress_rowbatches, "true");
DEFINE_mBool(rowbatch_align_tuple_offset, "false");
// interval between profile reports; in seconds
DEFINE_mInt32(status_report_interval, "5");
// The pipeline task has a high concurrency, therefore reducing its report frequency
DEFINE_mInt32(pipeline_status_report_interval, "10");
// if true, each disk will have a separate thread pool for scanner
DEFINE_Bool(doris_enable_scanner_thread_pool_per_disk, "true");
// the timeout of a work thread to wait the blocking priority queue to get a task
Expand Down
1 change: 1 addition & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ DECLARE_mBool(compress_rowbatches);
DECLARE_mBool(rowbatch_align_tuple_offset);
// interval between profile reports; in seconds
DECLARE_mInt32(status_report_interval);
DECLARE_mInt32(pipeline_status_report_interval);
// if true, each disk will have a separate thread pool for scanner
DECLARE_Bool(doris_enable_scanner_thread_pool_per_disk);
// the timeout of a work thread to wait the blocking priority queue to get a task
Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ Status ExecNode::close(RuntimeState* state) {
<< " already closed";
return Status::OK();
}
LOG(INFO) << "fragment_instance_id=" << print_id(state->fragment_instance_id()) << " closed";
LOG(INFO) << "fragment_instance_id=" << print_id(state->fragment_instance_id()) << ", "
<< " id=" << _id << " type=" << print_plan_node_type(_type) << " closed";
_is_closed = true;

Status result;
Expand Down
3 changes: 3 additions & 0 deletions be/src/pipeline/exec/exchange_sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,9 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId id) {
if (request.block) {
brpc_request->set_allocated_block(request.block.get());
}
if (!request.exec_status.ok()) {
request.exec_status.to_protobuf(brpc_request->mutable_exec_status());
}
auto* closure = request.channel->get_closure(id, request.eos, nullptr);

_instance_to_rpc_ctx[id]._closure = closure;
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/exchange_sink_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ struct TransmitInfo {
vectorized::PipChannel<Parent>* channel;
std::unique_ptr<PBlock> block;
bool eos;
Status exec_status;
};

template <typename Parent>
Expand Down
14 changes: 8 additions & 6 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,8 @@ template <typename ChannelPtrType>
void ExchangeSinkOperatorX::_handle_eof_channel(RuntimeState* state, ChannelPtrType channel,
Status st) {
channel->set_receiver_eof(st);
channel->close(state);
// Chanel will not send RPC to the downstream when eof, so close chanel by OK status.
channel->close(state, Status::OK());
}

Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block,
Expand Down Expand Up @@ -337,8 +338,8 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
status = channel->send_local_block(&cur_block);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
status = channel->send_block(block_holder, &sent,
source_state == SourceState::FINISHED);
status = channel->send_broadcast_block(
block_holder, &sent, source_state == SourceState::FINISHED);
}
HANDLE_CHANNEL_STATUS(state, channel, status);
}
Expand All @@ -365,8 +366,8 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
RETURN_IF_ERROR(local_state._serializer.serialize_block(
block, current_channel->ch_cur_pb_block()));
auto status = current_channel->send_block(current_channel->ch_cur_pb_block(),
source_state == SourceState::FINISHED);
auto status = current_channel->send_remote_block(
current_channel->ch_cur_pb_block(), source_state == SourceState::FINISHED);
HANDLE_CHANNEL_STATUS(state, current_channel, status);
current_channel->ch_roll_pb_block();
}
Expand Down Expand Up @@ -520,8 +521,9 @@ Status ExchangeSinkOperatorX::try_close(RuntimeState* state, Status exec_status)
CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
local_state._serializer.reset_block();
Status final_st = Status::OK();
Status final_status = exec_status;
for (int i = 0; i < local_state.channels.size(); ++i) {
Status st = local_state.channels[i]->close(state);
Status st = local_state.channels[i]->close(state, exec_status);
if (!st.ok() && final_st.ok()) {
final_st = st;
}
Expand Down
7 changes: 4 additions & 3 deletions be/src/pipeline/exec/result_file_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,8 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, Status exec_status)
status = channel->send_local_block(&cur_block);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
status =
channel->send_block(_block_holder.get(), nullptr, true);
status = channel->send_broadcast_block(_block_holder.get(),
nullptr, true);
}
HANDLE_CHANNEL_STATUS(state, channel, status);
}
Expand All @@ -256,7 +256,8 @@ template <typename ChannelPtrType>
void ResultFileSinkLocalState::_handle_eof_channel(RuntimeState* state, ChannelPtrType channel,
Status st) {
channel->set_receiver_eof(st);
channel->close(state);
// Chanel will not send RPC to the downstream when eof, so close chanel by OK status.
channel->close(state, Status::OK());
}

Status ResultFileSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block,
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/result_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ Status ResultSinkLocalState::close(RuntimeState* state, Status exec_status) {
COUNTER_UPDATE(profile()->total_time_counter(),
_cancel_dependency->write_watcher_elapse_time());
SCOPED_TIMER(profile()->total_time_counter());
Status final_status = Status::OK();
Status final_status = exec_status;
if (_writer) {
// close the writer
Status st = _writer->close();
Expand Down
144 changes: 81 additions & 63 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
#include <pthread.h>
#include <stdlib.h>
// IWYU pragma: no_include <bits/chrono.h>
#include <fmt/format.h>
#include <fmt/ranges.h>

#include <chrono> // IWYU pragma: keep
#include <map>
#include <ostream>
Expand Down Expand Up @@ -125,14 +128,12 @@ PipelineFragmentContext::PipelineFragmentContext(
_exec_env(exec_env),
_query_ctx(std::move(query_ctx)),
_call_back(call_back),
_report_thread_active(false),
_report_status_cb(report_status_cb),
_is_report_on_cancel(true),
_report_status_cb(report_status_cb),
_group_commit(group_commit) {
if (_query_ctx->get_task_group()) {
_task_group_entity = _query_ctx->get_task_group()->task_entity();
}
_report_thread_future = _report_thread_promise.get_future();
_fragment_watcher.start();
}

Expand All @@ -146,16 +147,29 @@ PipelineFragmentContext::~PipelineFragmentContext() {
} else {
_call_back(_runtime_state.get(), &st);
}
DCHECK(!_report_thread_active);
}

void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
const std::string& msg) {
LOG_INFO("PipelineFragmentContext::cancel")
.tag("query_id", print_id(_query_ctx->query_id()))
.tag("fragment_id", _fragment_id)
.tag("instance_id", print_id(_runtime_state->fragment_instance_id()))
.tag("reason", PPlanFragmentCancelReason_Name(reason))
.tag("message", msg);

if (_query_ctx->cancel(true, msg, Status::Cancelled(msg))) {
LOG(WARNING) << "PipelineFragmentContext "
<< PrintInstanceStandardInfo(_query_id, _fragment_id, _fragment_instance_id)
<< " is canceled, cancel message: " << msg;
if (reason != PPlanFragmentCancelReason::LIMIT_REACH) {
LOG(WARNING) << "PipelineFragmentContext "
<< PrintInstanceStandardInfo(_query_id, _fragment_id,
_fragment_instance_id)
<< " is canceled, cancel message: " << msg;

} else {
_set_is_report_on_cancel(false); // TODO bug llj fix this not projected by lock
}

_runtime_state->set_process_status(_query_ctx->exec_status());
// Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe
// For stream load the fragment's query_id == load id, it is set in FE.
auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id);
Expand All @@ -164,7 +178,8 @@ void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
}

// must close stream_mgr to avoid dead lock in Exchange Node
_exec_env->vstream_mgr()->cancel(_fragment_instance_id);
// TODO bug llj fix this other instance will not cancel
_exec_env->vstream_mgr()->cancel(_fragment_instance_id, Status::Cancelled(msg));
// Cancel the result queue manager used by spark doris connector
// TODO pipeline incomp
// _exec_env->result_queue_mgr()->update_queue_status(id, Status::Aborted(msg));
Expand Down Expand Up @@ -199,6 +214,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re

LOG_INFO("PipelineFragmentContext::prepare")
.tag("query_id", print_id(_query_id))
.tag("fragment_id", _fragment_id)
.tag("instance_id", print_id(local_params.fragment_instance_id))
.tag("backend_num", local_params.backend_num)
.tag("pthread_id", (uintptr_t)pthread_self());
Expand Down Expand Up @@ -311,6 +327,8 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re
_runtime_state->runtime_profile()->add_child(_root_plan->runtime_profile(), true, nullptr);
_runtime_state->runtime_profile()->add_child(_runtime_profile.get(), true, nullptr);

_init_next_report_time();

_prepared = true;
return Status::OK();
}
Expand Down Expand Up @@ -344,54 +362,56 @@ Status PipelineFragmentContext::_build_pipeline_tasks(
return Status::OK();
}

void PipelineFragmentContext::_stop_report_thread() {
if (!_report_thread_active) {
return;
void PipelineFragmentContext::_init_next_report_time() {
auto interval_s = config::pipeline_status_report_interval;
if (_is_report_success && interval_s > 0 && _query_ctx->timeout_second > interval_s) {
std::vector<string> ins_ids;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: variable 'ins_ids' is not initialized [cppcoreguidelines-init-variables]

be/src/pipeline/pipeline_fragment_context.cpp:369:

- ;
+  = 0;

instance_ids(ins_ids);
VLOG_FILE << "enable period report: instance_id="
<< fmt::format("{}", fmt::join(ins_ids, ", "));
uint64_t report_fragment_offset = (uint64_t)(rand() % interval_s) * NANOS_PER_SEC;
// We don't want to wait longer than it takes to run the entire fragment.
_previous_report_time =
MonotonicNanos() + report_fragment_offset - (uint64_t)(interval_s)*NANOS_PER_SEC;
_disable_period_report = false;
}
}

_report_thread_active = false;

_stop_report_thread_cv.notify_one();
// Wait infinitly to ensure that the report task is finished and the this variable
// is not used in report thread.
_report_thread_future.wait();
void PipelineFragmentContext::refresh_next_report_time() {
auto disable = _disable_period_report.load(std::memory_order_acquire);
DCHECK(disable == true);
_previous_report_time.store(MonotonicNanos(), std::memory_order_release);
_disable_period_report.compare_exchange_strong(disable, false);
}

void PipelineFragmentContext::report_profile() {
SCOPED_ATTACH_TASK(_runtime_state.get());
VLOG_FILE << "report_profile(): instance_id=" << _runtime_state->fragment_instance_id();

_report_thread_active = true;

std::unique_lock<std::mutex> l(_report_thread_lock);
// tell Open() that we started
_report_thread_started_cv.notify_one();

// Jitter the reporting time of remote fragments by a random amount between
// 0 and the report_interval. This way, the coordinator doesn't get all the
// updates at once so its better for contention as well as smoother progress
// reporting.
int report_fragment_offset = rand() % config::status_report_interval;
// We don't want to wait longer than it takes to run the entire fragment.
_stop_report_thread_cv.wait_for(l, std::chrono::seconds(report_fragment_offset));
while (_report_thread_active) {
if (config::status_report_interval > 0) {
// wait_for can return because the timeout occurred or the condition variable
// was signaled. We can't rely on its return value to distinguish between the
// two cases (e.g. there is a race here where the wait timed out but before grabbing
// the lock, the condition variable was signaled). Instead, we will use an external
// flag, _report_thread_active, to coordinate this.
_stop_report_thread_cv.wait_for(l,
std::chrono::seconds(config::status_report_interval));
} else {
LOG(WARNING) << "config::status_report_interval is equal to or less than zero, exiting "
"reporting thread.";
break;
void PipelineFragmentContext::trigger_report_if_necessary() {
if (!_is_report_success) {
return;
}
auto disable = _disable_period_report.load(std::memory_order_acquire);
if (disable) {
return;
}
int32_t interval_s = config::pipeline_status_report_interval;
if (interval_s <= 0) {
LOG(WARNING)
<< "config::status_report_interval is equal to or less than zero, do not trigger "
"report.";
}
uint64_t next_report_time = _previous_report_time.load(std::memory_order_acquire) +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: variable 'next_report_time' is not initialized [cppcoreguidelines-init-variables]

Suggested change
uint64_t next_report_time = _previous_report_time.load(std::memory_order_acquire) +
} = 0

(uint64_t)(interval_s)*NANOS_PER_SEC;
if (MonotonicNanos() > next_report_time) {
if (!_disable_period_report.compare_exchange_strong(disable, true,
std::memory_order_acq_rel)) {
return;
}

if (VLOG_FILE_IS_ON) {
VLOG_FILE << "Reporting " << (!_report_thread_active ? "final " : " ")
<< "profile for instance " << _runtime_state->fragment_instance_id();
std::vector<string> ins_ids;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: variable 'ins_ids' is not initialized [cppcoreguidelines-init-variables]

be/src/pipeline/pipeline_fragment_context.cpp:410:

- ;
+  = 0;

instance_ids(ins_ids);
VLOG_FILE << "Reporting "
<< "profile for query_id " << print_id(_query_id)
<< ", instance ids: " << fmt::format("{}", fmt::join(ins_ids, ", "));

std::stringstream ss;
_runtime_state->runtime_profile()->compute_time_in_profile();
_runtime_state->runtime_profile()->pretty_print(&ss);
Expand All @@ -401,15 +421,13 @@ void PipelineFragmentContext::report_profile() {
}
VLOG_FILE << ss.str();
}

if (!_report_thread_active) {
break;
auto st = send_report(false);
if (!st.ok()) {
disable = true;
_disable_period_report.compare_exchange_strong(disable, false,
std::memory_order_acq_rel);
}

send_report(false);
}

VLOG_FILE << "exiting reporting thread: instance_id=" << _runtime_state->fragment_instance_id();
}

// TODO: use virtual function to do abstruct
Expand Down Expand Up @@ -815,7 +833,6 @@ Status PipelineFragmentContext::_create_sink(int sender_id, const TDataSink& thr
void PipelineFragmentContext::_close_action() {
_runtime_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
send_report(true);
_stop_report_thread();
// all submitted tasks done
_exec_env->fragment_mgr()->remove_pipeline_context(shared_from_this());
}
Expand All @@ -828,7 +845,7 @@ void PipelineFragmentContext::close_a_pipeline() {
}
}

void PipelineFragmentContext::send_report(bool done) {
Status PipelineFragmentContext::send_report(bool done) {
Status exec_status = Status::OK();
{
std::lock_guard<std::mutex> l(_status_lock);
Expand All @@ -838,18 +855,18 @@ void PipelineFragmentContext::send_report(bool done) {
// If plan is done successfully, but _is_report_success is false,
// no need to send report.
if (!_is_report_success && done && exec_status.ok()) {
return;
return Status::NeedSendAgain("");
}

// If both _is_report_success and _is_report_on_cancel are false,
// which means no matter query is success or failed, no report is needed.
// This may happen when the query limit reached and
// a internal cancellation being processed
if (!_is_report_success && !_is_report_on_cancel) {
return;
return Status::NeedSendAgain("");
}

_report_status_cb(
return _report_status_cb(
{false,
exec_status,
{},
Expand All @@ -864,7 +881,8 @@ void PipelineFragmentContext::send_report(bool done) {
_runtime_state.get(),
std::bind(&PipelineFragmentContext::update_status, this, std::placeholders::_1),
std::bind(&PipelineFragmentContext::cancel, this, std::placeholders::_1,
std::placeholders::_2)});
std::placeholders::_2)},
shared_from_this());
}

} // namespace doris::pipeline
Loading