Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,8 @@ void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
// We need a more detail discussion.
if (_query_ctx->cancel(true, msg, Status::Cancelled(msg))) {
if (reason != PPlanFragmentCancelReason::LIMIT_REACH) {
LOG(WARNING) << "PipelineFragmentContext "
<< PrintInstanceStandardInfo(_query_id, _fragment_instance_id)
<< " is canceled, cancel message: " << msg;

LOG_WARNING("Instance {} cancelled, reason {}", print_id(_fragment_instance_id),
msg.substr(0, 50));
} else {
_set_is_report_on_cancel(false); // TODO bug llj fix this not projected by lock
}
Expand Down
7 changes: 3 additions & 4 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -402,10 +402,9 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
VLOG_DEBUG << "reportExecStatus params is "
<< apache::thrift::ThriftDebugString(params).c_str();
if (!exec_status.ok()) {
LOG(WARNING) << "report error status: " << exec_status.msg()
<< " to coordinator: " << req.coord_addr
<< ", query id: " << print_id(req.query_id)
<< ", instance id: " << print_id(req.fragment_instance_id);
LOG_WARNING("Query {} instance {} report error status to coor {}:{}, error status: {}",
print_id(req.query_id), print_id(req.fragment_instance_id),
req.coord_addr.hostname, req.coord_addr.port, exec_status.to_string_no_stack());
}
try {
try {
Expand Down
14 changes: 4 additions & 10 deletions be/src/vec/runtime/vdata_stream_recvr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -472,24 +472,18 @@ Status VDataStreamRecvr::get_next(Block* block, bool* eos) {
}
}

void VDataStreamRecvr::remove_sender(int sender_id, int be_number, Status exec_status) {
void VDataStreamRecvr::remove_sender(int sender_id, int be_number, Status exec_status,
QueryStatisticsPtr statistics) {
if (!exec_status.ok()) {
cancel_stream(exec_status);
return;
}
int use_sender_id = _is_merging ? sender_id : 0;
_sender_queues[use_sender_id]->decrement_senders(be_number);
}

void VDataStreamRecvr::remove_sender(int sender_id, int be_number, QueryStatisticsPtr statistics,
Status exec_status) {
if (!exec_status.ok()) {
cancel_stream(exec_status);
return;
if (statistics != nullptr) {
_sub_plan_query_statistics_recvr->insert(statistics, sender_id);
}
int use_sender_id = _is_merging ? sender_id : 0;
_sender_queues[use_sender_id]->decrement_senders(be_number);
_sub_plan_query_statistics_recvr->insert(statistics, sender_id);
}

void VDataStreamRecvr::cancel_stream(Status exec_status) {
Expand Down
6 changes: 2 additions & 4 deletions be/src/vec/runtime/vdata_stream_recvr.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,8 @@ class VDataStreamRecvr {

// Indicate that a particular sender is done. Delegated to the appropriate
// sender queue. Called from DataStreamMgr.
void remove_sender(int sender_id, int be_number, Status exec_status);

void remove_sender(int sender_id, int be_number, QueryStatisticsPtr statistics,
Status exec_status);
void remove_sender(int sender_id, int be_number, Status exec_status,
QueryStatisticsPtr statistics = nullptr);

void cancel_stream(Status exec_status);

Expand Down
8 changes: 4 additions & 4 deletions be/src/vec/sink/vdata_stream_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ Status Channel<Parent>::send_local_block(Status exec_status, bool eos) {
if (eos) {
/// TODO: Supported on pipelineX, we can hold QueryStatistics on the fragment instead of on instances.
if constexpr (std::is_same_v<VDataStreamSender, Parent>) {
_local_recvr->remove_sender(_parent->sender_id(), _be_number,
_parent->query_statisticsPtr(), exec_status);
_local_recvr->remove_sender(_parent->sender_id(), _be_number, exec_status,
_parent->query_statisticsPtr());
} else {
_local_recvr->remove_sender(_parent->sender_id(), _be_number, exec_status);
}
Expand Down Expand Up @@ -290,8 +290,8 @@ Status Channel<Parent>::close_internal(Status exec_status) {
if (is_local()) {
if (_recvr_is_valid()) {
if constexpr (std::is_same_v<VDataStreamSender, Parent>) {
_local_recvr->remove_sender(_parent->sender_id(), _be_number,
_parent->query_statisticsPtr(), exec_status);
_local_recvr->remove_sender(_parent->sender_id(), _be_number, exec_status,
_parent->query_statisticsPtr());
} else {
_local_recvr->remove_sender(_parent->sender_id(), _be_number, exec_status);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,7 @@ public Map<String, Integer> getInstancesNumPerUser() {
public void unregisterQuery(TUniqueId queryId) {
QueryInfo queryInfo = coordinatorMap.remove(queryId);
if (queryInfo != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Deregister query id {}", DebugUtil.printId(queryId));
}
LOG.info("Deregister query id {}", DebugUtil.printId(queryId));

if (queryInfo.getConnectContext() != null
&& !Strings.isNullOrEmpty(queryInfo.getConnectContext().getQualifiedUser())
Expand All @@ -152,7 +150,7 @@ public void unregisterQuery(TUniqueId queryId) {
String user = queryInfo.getConnectContext().getQualifiedUser();
AtomicInteger instancesNum = userToInstancesCount.get(user);
if (instancesNum == null) {
LOG.warn("WTF?? query {} in queryToInstancesNum but not in userToInstancesCount",
LOG.warn("Query {} in queryToInstancesNum but not in userToInstancesCount",
DebugUtil.printId(queryId)
);
} else {
Expand All @@ -161,9 +159,7 @@ public void unregisterQuery(TUniqueId queryId) {
}
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("not found query {} when unregisterQuery", DebugUtil.printId(queryId));
}
LOG.warn("Query {} not found when unregister.", DebugUtil.printId(queryId));
}

// commit hive tranaction if needed
Expand Down