Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 2 additions & 14 deletions be/src/runtime/data_spliter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,22 +308,10 @@ Status DataSpliter::close(RuntimeState* state, Status close_status) {
}
Expr::close(_partition_expr_ctxs, state);
for (auto& iter : _rollup_map) {
Status status = iter.second->close(state);
if (UNLIKELY(is_ok && !status.ok())) {
LOG(WARNING) << "close rollup_map error"
<< " err_msg=" << status.get_error_msg();
is_ok = false;
err_status = status;
}
iter.second->close(state);
}
for (auto iter : _partition_infos) {
Status status = iter->close(state);
if (UNLIKELY(is_ok && !status.ok())) {
LOG(WARNING) << "close partition_info error"
<< " err_msg=" << status.get_error_msg();
is_ok = false;
err_status = status;
}
iter->close(state);
}

_expr_mem_tracker.reset();
Expand Down
45 changes: 32 additions & 13 deletions be/src/runtime/data_stream_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "runtime/client_cache.h"
#include "runtime/dpp_sink_internal.h"
#include "runtime/mem_tracker.h"
#include "service/backend_options.h"
#include "util/debug_util.h"
#include "util/network_util.h"
#include "util/thrift_client.h"
Expand Down Expand Up @@ -114,10 +115,10 @@ class DataStreamSender::Channel {
// of close operation, client should call close_wait() to finish channel's close.
// We split one close operation into two phases in order to make multiple channels
// can run parallel.
void close(RuntimeState* state);
Status close(RuntimeState* state);

// Get close wait's response, to finish channel close operation.
void close_wait(RuntimeState* state);
Status close_wait(RuntimeState* state);

int64_t num_data_bytes_sent() const {
return _num_data_bytes_sent;
Expand All @@ -137,9 +138,11 @@ class DataStreamSender::Channel {
auto cntl = &_closure->cntl;
brpc::Join(cntl->call_id());
if (cntl->Failed()) {
LOG(WARNING) << "failed to send brpc batch, error=" << berror(cntl->ErrorCode())
<< ", error_text=" << cntl->ErrorText();
return Status::ThriftRpcError("failed to send batch");
std::stringstream ss;
ss << "failed to send brpc batch, error=" << berror(cntl->ErrorCode())
<< ", error_text=" << cntl->ErrorText() << ", client: " << BackendOptions::get_localhost();
LOG(WARNING) << ss.str();
return Status::ThriftRpcError(ss.str());
}
return Status::OK();
}
Expand Down Expand Up @@ -296,16 +299,25 @@ Status DataStreamSender::Channel::close_internal() {
return Status::OK();
}

void DataStreamSender::Channel::close(RuntimeState* state) {
state->log_error(close_internal().get_error_msg());
Status DataStreamSender::Channel::close(RuntimeState* state) {
Status st = close_internal();
if (!st.ok()) {
state->log_error(st.get_error_msg());
}
return st;
}

void DataStreamSender::Channel::close_wait(RuntimeState* state) {
Status DataStreamSender::Channel::close_wait(RuntimeState* state) {
if (_need_close) {
state->log_error(_wait_last_brpc().get_error_msg());
Status st = _wait_last_brpc();
if (!st.ok()) {
state->log_error(st.get_error_msg());
}
_need_close = false;
return st;
}
_batch.reset();
return Status::OK();
}

DataStreamSender::DataStreamSender(
Expand Down Expand Up @@ -606,19 +618,26 @@ Status DataStreamSender::compute_range_part_code(
Status DataStreamSender::close(RuntimeState* state, Status exec_status) {
// TODO: only close channels that didn't have any errors
// make all channels close parallel
Status final_st = Status::OK();
for (int i = 0; i < _channels.size(); ++i) {
_channels[i]->close(state);
Status st = _channels[i]->close(state);
if (!st.ok() && final_st.ok()) {
final_st = st;
}
}
// wait all channels to finish
for (int i = 0; i < _channels.size(); ++i) {
_channels[i]->close_wait(state);
Status st = _channels[i]->close_wait(state);
if (!st.ok() && final_st.ok()) {
final_st = st;
}
}
for (auto iter : _partition_infos) {
RETURN_IF_ERROR(iter->close(state));
iter->close(state);
}
Expr::close(_partition_expr_ctxs, state);

return Status::OK();
return final_st;
}

template<typename T>
Expand Down
6 changes: 2 additions & 4 deletions be/src/runtime/dpp_sink_internal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,9 @@ Status RollupSchema::open(RuntimeState* state) {
return Status::OK();
}

Status RollupSchema::close(RuntimeState* state) {
void RollupSchema::close(RuntimeState* state) {
Expr::close(_key_ctxs, state);
Expr::close(_value_ctxs, state);
return Status::OK();
}

Status PartRangeKey::from_thrift(
Expand Down Expand Up @@ -237,11 +236,10 @@ Status PartitionInfo::open(RuntimeState* state) {
return Status::OK();
}

Status PartitionInfo::close(RuntimeState* state) {
void PartitionInfo::close(RuntimeState* state) {
if (_distributed_expr_ctxs.size() > 0) {
Expr::close(_distributed_expr_ctxs, state);
}
return Status::OK();
}

}
4 changes: 2 additions & 2 deletions be/src/runtime/dpp_sink_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class RollupSchema {

Status open(RuntimeState* state);

Status close(RuntimeState* state);
void close(RuntimeState* state);

const std::string& keys_type() const {
return _keys_type;
Expand Down Expand Up @@ -263,7 +263,7 @@ class PartitionInfo {

Status open(RuntimeState* state);

Status close(RuntimeState* state);
void close(RuntimeState* state);

int64_t id() const {
return _id;
Expand Down
4 changes: 3 additions & 1 deletion be/src/runtime/routine_load/routine_load_task_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) {

// thread pool's queue size > 0 means there are tasks waiting to be executed, so no more tasks should be submitted.
if (_thread_pool.get_queue_size() > 0) {
LOG(INFO) << "too many tasks in thread pool. reject task: " << UniqueId(task.id);
LOG(INFO) << "too many tasks in thread pool. reject task: " << UniqueId(task.id)
<< ", job id: " << task.job_id
<< ", queue size: " << _thread_pool.get_queue_size();
return Status::TooManyTasks(UniqueId(task.id).to_string());
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/util/batch_process_thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class BatchProcessThreadPool {
// the first task should blocking, or the tasks queue is empty
has_task = _work_queue.blocking_get(&task);
} else {
// the 2rd, 3rd... task shoudl non blocking get
// the 2rd, 3rd... task should non blocking get
has_task = _work_queue.non_blocking_get(&task);
if (!has_task) {
break;
Expand Down
20 changes: 11 additions & 9 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ public void exec() throws Exception {
for (Pair<BackendExecState, Future<PExecPlanFragmentResult>> pair : futures) {
TStatusCode code = TStatusCode.INTERNAL_ERROR;
String errMsg = null;
Exception exception = null;
try {
PExecPlanFragmentResult result = pair.second.get(Config.remote_fragment_exec_timeout_ms,
TimeUnit.MILLISECONDS);
Expand All @@ -502,16 +503,23 @@ public void exec() throws Exception {
}
} catch (ExecutionException e) {
LOG.warn("catch a execute exception", e);
exception = e;
code = TStatusCode.THRIFT_RPC_ERROR;
} catch (InterruptedException e) {
LOG.warn("catch a interrupt exception", e);
exception = e;
code = TStatusCode.INTERNAL_ERROR;
} catch (TimeoutException e) {
LOG.warn("catch a timeout exception", e);
exception = e;
code = TStatusCode.TIMEOUT;
}

if (code != TStatusCode.OK) {
if (exception != null) {
errMsg = exception.getMessage();
}

if (errMsg == null) {
errMsg = "exec rpc error. backend id: " + pair.first.backend.getId();
}
Expand All @@ -524,7 +532,7 @@ public void exec() throws Exception {
case TIMEOUT:
throw new UserException("query timeout. backend id: " + pair.first.backend.getId());
case THRIFT_RPC_ERROR:
SimpleScheduler.addToBlacklist(pair.first.backend.getId());
SimpleScheduler.addToBlacklist(pair.first.backend.getId(), errMsg);
throw new RpcException(pair.first.backend.getHost(), "rpc failed");
default:
throw new UserException(errMsg);
Expand Down Expand Up @@ -668,7 +676,7 @@ public RowBatch getNext() throws Exception {
copyStatus.rewriteErrorMsg();
}
if (copyStatus.isRpcError()) {
throw new RpcException("unknown", copyStatus.getErrorMsg());
throw new RpcException(null, copyStatus.getErrorMsg());
} else {
String errMsg = copyStatus.getErrorMsg();
LOG.warn("query failed: {}", errMsg);
Expand Down Expand Up @@ -1205,9 +1213,6 @@ private void getExecHostPortForFragmentIDAndBucketSeq(TScanRangeLocations seqLoc
int randomLocation = new Random().nextInt(seqLocation.locations.size());
Reference<Long> backendIdRef = new Reference<Long>();
TNetworkAddress execHostPort = SimpleScheduler.getHost(seqLocation.locations.get(randomLocation).backend_id, seqLocation.locations, this.idToBackend, backendIdRef);
if (execHostPort == null) {
throw new UserException("there is no scanNode Backend");
}
this.addressToBackendID.put(execHostPort, backendIdRef.getRef());
this.fragmentIdToSeqToAddressMap.get(fragmentId).put(bucketSeq, execHostPort);
}
Expand Down Expand Up @@ -1236,9 +1241,6 @@ private void computeScanRangeAssignmentByScheduler(
Reference<Long> backendIdRef = new Reference<Long>();
TNetworkAddress execHostPort = SimpleScheduler.getHost(minLocation.backend_id,
scanRangeLocations.getLocations(), this.idToBackend, backendIdRef);
if (execHostPort == null) {
throw new UserException("there is no scanNode Backend");
}
this.addressToBackendID.put(execHostPort, backendIdRef.getRef());

Map<Integer, List<TScanRangeParams>> scanRanges = findOrInsert(assignment, execHostPort,
Expand Down Expand Up @@ -1482,7 +1484,7 @@ public synchronized boolean cancelFragmentInstance(PPlanFragmentCancelReason can
} catch (RpcException e) {
LOG.warn("cancel plan fragment get a exception, address={}:{}", brpcAddress.getHostname(),
brpcAddress.getPort());
SimpleScheduler.addToBlacklist(addressToBackendID.get(brpcAddress));
SimpleScheduler.addToBlacklist(addressToBackendID.get(brpcAddress), e.getMessage());
}

this.hasCanceled = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,15 @@ public RowBatch getNext(Status status) throws TException {
} catch (RpcException e) {
LOG.warn("fetch result rpc exception, finstId={}", finstId, e);
status.setRpcStatus(e.getMessage());
SimpleScheduler.addToBlacklist(backendId);
SimpleScheduler.addToBlacklist(backendId, e.getMessage());
} catch (ExecutionException e) {
LOG.warn("fetch result execution exception, finstId={}", finstId, e);
if (e.getMessage().contains("time out")) {
// if timeout, we set error code to TIMEOUT, and it will not retry querying.
status.setStatus(new Status(TStatusCode.TIMEOUT, e.getMessage()));
} else {
status.setRpcStatus(e.getMessage());
SimpleScheduler.addToBlacklist(backendId);
SimpleScheduler.addToBlacklist(backendId, e.getMessage());
}
} catch (TimeoutException e) {
LOG.warn("fetch result timeout, finstId={}", finstId, e);
Expand Down
Loading