Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class FragmentExecState {

Status execute();

Status cancel();
Status cancel(const PPlanFragmentCancelReason& reason);

TUniqueId fragment_instance_id() const {
return _fragment_instance_id;
Expand Down Expand Up @@ -209,9 +209,12 @@ Status FragmentExecState::execute() {
return Status::OK();
}

Status FragmentExecState::cancel() {
Status FragmentExecState::cancel(const PPlanFragmentCancelReason& reason) {
std::lock_guard<std::mutex> l(_status_lock);
RETURN_IF_ERROR(_exec_status);
if (reason == PPlanFragmentCancelReason::LIMIT_REACH) {
_executor.set_is_report_on_cancel(false);
}
_executor.cancel();
return Status::OK();
}
Expand Down Expand Up @@ -462,7 +465,7 @@ Status FragmentMgr::exec_plan_fragment(
return Status::OK();
}

Status FragmentMgr::cancel(const TUniqueId& id) {
Status FragmentMgr::cancel(const TUniqueId& id, const PPlanFragmentCancelReason& reason) {
std::shared_ptr<FragmentExecState> exec_state;
{
std::lock_guard<std::mutex> lock(_lock);
Expand All @@ -473,7 +476,7 @@ Status FragmentMgr::cancel(const TUniqueId& id) {
}
exec_state = iter->second;
}
exec_state->cancel();
exec_state->cancel(reason);

return Status::OK();
}
Expand Down
6 changes: 5 additions & 1 deletion be/src/runtime/fragment_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ class FragmentMgr : public RestMonitorIface {
// TODO(zc): report this is over
Status exec_plan_fragment(const TExecPlanFragmentParams& params, FinishCallback cb);

Status cancel(const TUniqueId& fragment_id);
Status cancel(const TUniqueId& fragment_id) {
return cancel(fragment_id, PPlanFragmentCancelReason::INTERNAL_ERROR);
}

Status cancel(const TUniqueId& fragment_id, const PPlanFragmentCancelReason& reason);

void cancel_worker();

Expand Down
5 changes: 5 additions & 0 deletions be/src/runtime/plan_fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ PlanFragmentExecutor::PlanFragmentExecutor(
_closed(false),
_has_thread_token(false),
_is_report_success(true),
_is_report_on_cancel(true),
_collect_query_statistics_with_every_batch(false) {
}

Expand Down Expand Up @@ -433,6 +434,10 @@ void PlanFragmentExecutor::send_report(bool done) {
return;
}

if (!_is_report_success && !_is_report_on_cancel) {
return;
}

// This will send a report even if we are cancelled. If the query completed correctly
// but fragments still need to be cancelled (e.g. limit reached), the coordinator will
// be waiting for a final report and profile.
Expand Down
8 changes: 8 additions & 0 deletions be/src/runtime/plan_fragment_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ class PlanFragmentExecutor {
_stop_report_thread_cv.notify_one();
}

void set_is_report_on_cancel(bool val) {
_is_report_on_cancel = val;
}

private:
ExecEnv* _exec_env; // not owned
ExecNode* _plan; // lives in _runtime_state->obj_pool()
Expand Down Expand Up @@ -176,6 +180,10 @@ class PlanFragmentExecutor {

bool _is_report_success;

// If this is set to false, and '_is_report_success' is false as well,
// This executor will not report status to FE on being cancelled.
bool _is_report_on_cancel;

// Overall execution status. Either ok() or set to the first error status that
// was encountered.
Status _status;
Expand Down
11 changes: 9 additions & 2 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,15 @@ void PInternalServiceImpl<T>::cancel_plan_fragment(
TUniqueId tid;
tid.__set_hi(request->finst_id().hi());
tid.__set_lo(request->finst_id().lo());
LOG(INFO) << "cancel framgent, fragment_instance_id=" << print_id(tid);
auto st = _exec_env->fragment_mgr()->cancel(tid);

Status st;
if (request->has_cancel_reason()) {
LOG(INFO) << "cancel framgent, fragment_instance_id=" << print_id(tid) << ", reason: " << request->cancel_reason();
st = _exec_env->fragment_mgr()->cancel(tid, request->cancel_reason());
} else {
LOG(INFO) << "cancel framgent, fragment_instance_id=" << print_id(tid);
st = _exec_env->fragment_mgr()->cancel(tid);
}
if (!st.ok()) {
LOG(WARNING) << "cancel plan fragment failed, errmsg=" << st.get_error_msg();
}
Expand Down
72 changes: 38 additions & 34 deletions fe/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.doris.planner.ResultSink;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.planner.UnionNode;
import org.apache.doris.proto.PPlanFragmentCancelReason;
import org.apache.doris.proto.PExecPlanFragmentResult;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
Expand Down Expand Up @@ -104,6 +105,7 @@

public class Coordinator {
private static final Logger LOG = LogManager.getLogger(Coordinator.class);

private static final DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

private static String localIP = FrontendOptions.getLocalHostAddress();
Expand Down Expand Up @@ -331,21 +333,22 @@ private void unlock() {
}

private void traceInstance() {
// TODO(zc): add a switch to close this function
StringBuilder sb = new StringBuilder();
int idx = 0;
sb.append("query id=").append(DebugUtil.printId(queryId)).append(",");
sb.append("fragment=[");
for (Map.Entry<PlanFragmentId, FragmentExecParams> entry : fragmentExecParamsMap.entrySet()) {
if (idx++ != 0) {
sb.append(",");
if (LOG.isDebugEnabled()) {
// TODO(zc): add a switch to close this function
StringBuilder sb = new StringBuilder();
int idx = 0;
sb.append("query id=").append(DebugUtil.printId(queryId)).append(",");
sb.append("fragment=[");
for (Map.Entry<PlanFragmentId, FragmentExecParams> entry : fragmentExecParamsMap.entrySet()) {
if (idx++ != 0) {
sb.append(",");
}
sb.append(entry.getKey());
entry.getValue().appendTo(sb);
}
sb.append(entry.getKey());
entry.getValue().appendTo(sb);
sb.append("]");
LOG.debug(sb.toString());
}
sb.append("]");

LOG.info(sb.toString());
}

// Initiate asynchronous execution of query. Returns as soon as all plan fragments
Expand Down Expand Up @@ -458,19 +461,19 @@ public void exec() throws Exception {

if (code != TStatusCode.OK) {
if (errMsg == null) {
errMsg = "exec rpc error. backend id: " + pair.first.systemBackendId;
errMsg = "exec rpc error. backend id: " + pair.first.backendId;
}
queryStatus.setStatus(errMsg);
LOG.warn("exec plan fragment failed, errmsg={}, fragmentId={}, backend={}:{}",
errMsg, fragment.getFragmentId(),
pair.first.address.hostname, pair.first.address.port);
cancelInternal();
cancelInternal(PPlanFragmentCancelReason.INTERNAL_ERROR);
switch (code) {
case TIMEOUT:
throw new UserException("query timeout. backend id: " + pair.first.systemBackendId);
throw new UserException("query timeout. backend id: " + pair.first.backendId);
case THRIFT_RPC_ERROR:
SimpleScheduler.updateBlacklistBackends(pair.first.systemBackendId);
throw new RpcException("rpc failed. backend id: " + pair.first.systemBackendId);
SimpleScheduler.updateBlacklistBackends(pair.first.backendId);
throw new RpcException("rpc failed. backend id: " + pair.first.backendId);
default:
throw new UserException(errMsg);
}
Expand Down Expand Up @@ -570,7 +573,7 @@ private void updateStatus(Status status, TUniqueId instanceId) {
queryStatus.setStatus(status);
LOG.warn("one instance report fail throw updateStatus(), need cancel. job id: {}, query id: {}, instance id: {}",
jobId, DebugUtil.printId(queryId), instanceId != null ? DebugUtil.printId(instanceId) : "NaN");
cancelInternal();
cancelInternal(PPlanFragmentCancelReason.INTERNAL_ERROR);
} finally {
lock.unlock();
}
Expand Down Expand Up @@ -625,7 +628,7 @@ public RowBatch getNext() throws Exception {
boolean hasLimit = numLimitRows > 0;
if (!isBlockQuery && instanceIds.size() > 1 && hasLimit && numReceivedRows >= numLimitRows) {
LOG.debug("no block query, return num >= limit rows, need cancel");
cancelInternal();
cancelInternal(PPlanFragmentCancelReason.LIMIT_REACH);
}
} else {
numReceivedRows += resultBatch.getBatch().getRowsSize();
Expand All @@ -647,30 +650,31 @@ public void cancel() {
queryStatus.setStatus(Status.CANCELLED);
}
LOG.warn("cancel execution of query, this is outside invoke");
cancelInternal();
cancelInternal(PPlanFragmentCancelReason.USER_CANCEL);
} finally {
unlock();
}
}

private void cancelInternal() {
private void cancelInternal(PPlanFragmentCancelReason cancelReason) {
if (null != receiver) {
receiver.cancel();
}
cancelRemoteFragmentsAsync();
cancelRemoteFragmentsAsync(cancelReason);
if (profileDoneSignal != null) {
// count down to zero to notify all objects waiting for this
profileDoneSignal.countDownToZero();
LOG.info("unfinished instance: {}", profileDoneSignal.getLeftMarks());
}
}

private void cancelRemoteFragmentsAsync() {
private void cancelRemoteFragmentsAsync(PPlanFragmentCancelReason cancelReason) {
for (BackendExecState backendExecState : backendExecStates) {
TNetworkAddress address = backendExecState.getBackendAddress();
LOG.info("cancelRemoteFragments initiated={} done={} hasCanceled={} ip={} port={} fragment instance id={}",
LOG.debug("cancelRemoteFragments initiated={} done={} hasCanceled={} ip={} port={} fragment instance id={}, reason: {}",
backendExecState.initiated, backendExecState.done, backendExecState.hasCanceled,
address.hostname, address.port, DebugUtil.printId(backendExecState.getFragmentInstanceId()));
address.hostname, address.port, DebugUtil.printId(backendExecState.getFragmentInstanceId()),
cancelReason.name());

backendExecState.lock();
try {
Expand All @@ -688,7 +692,7 @@ private void cancelRemoteFragmentsAsync() {

try {
BackendServiceProxy.getInstance().cancelPlanFragmentAsync(
brpcAddress, backendExecState.getFragmentInstanceId());
brpcAddress, backendExecState.getFragmentInstanceId(), cancelReason);
} catch (RpcException e) {
LOG.warn("cancel plan fragment get a exception, address={}:{}",
brpcAddress.getHostname(), brpcAddress.getPort());
Expand Down Expand Up @@ -1079,7 +1083,7 @@ private void computeScanRangeAssignmentByScheduler(

public void updateFragmentExecStatus(TReportExecStatusParams params) {
if (params.backend_num >= backendExecStates.size()) {
LOG.error("unknown backend number: {}, expected less than: {}",
LOG.warn("unknown backend number: {}, expected less than: {}",
params.backend_num, backendExecStates.size());
return;
}
Expand Down Expand Up @@ -1114,7 +1118,7 @@ public void updateFragmentExecStatus(TReportExecStatusParams params) {
// and returned_all_results_ is true.
// (UpdateStatus() initiates cancellation, if it hasn't already been initiated)
if (!(returnedAllResults && status.isCancelled()) && !status.ok()) {
LOG.warn("One instance report fail, query_id={} instance_id={}",
LOG.warn("one instance report fail, query_id={} instance_id={}",
DebugUtil.printId(queryId), DebugUtil.printId(params.getFragment_instance_id()));
updateStatus(status, params.getFragment_instance_id());
}
Expand Down Expand Up @@ -1199,7 +1203,7 @@ public class BackendExecState {
private int profileFragmentId;
RuntimeProfile profile;
TNetworkAddress address;
Long systemBackendId;
Long backendId;

public int profileFragmentId() {
return profileFragmentId;
Expand All @@ -1225,7 +1229,7 @@ public int getInstanceId() {
return instanceId;
}

public PlanFragmentId getfragmentId() {
public PlanFragmentId getFragmentId() {
return fragmentId;
}

Expand All @@ -1238,7 +1242,7 @@ public BackendExecState(PlanFragmentId fragmentId, int instanceId, int profileFr
this.initiated = false;
this.done = false;
this.address = fragmentExecParamsMap.get(fragmentId).instanceExecParams.get(instanceId).host;
this.systemBackendId = addressToBackendID.get(address);
this.backendId = addressToBackendID.get(address);

String name = "Instance " + DebugUtil.printId(fragmentExecParamsMap.get(fragmentId)
.instanceExecParams.get(instanceId).instanceId) + " (host=" + address + ")";
Expand All @@ -1265,7 +1269,7 @@ public Future<PExecPlanFragmentResult> execRemoteFragmentAsync() throws TExcepti
try {
return BackendServiceProxy.getInstance().execPlanFragmentAsync(brpcAddress, rpcParams);
} catch (RpcException e) {
SimpleScheduler.updateBlacklistBackends(systemBackendId);
SimpleScheduler.updateBlacklistBackends(backendId);
throw e;
}
}
Expand Down Expand Up @@ -1426,7 +1430,7 @@ public List<QueryStatisticsItem.FragmentInstanceInfo> getFragmentInstanceInfos()
for (int index = 0; index < fragments.size(); index++) {
for (Map.Entry<TUniqueId, BackendExecState> entry: backendExecStateMap.entrySet()) {
final BackendExecState backendExecState = entry.getValue();
if (fragments.get(index).getFragmentId() != backendExecState.getfragmentId()) {
if (fragments.get(index).getFragmentId() != backendExecState.getFragmentId()) {
continue;
}
final QueryStatisticsItem.FragmentInstanceInfo info
Expand Down
3 changes: 2 additions & 1 deletion fe/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ public Map<String, QueryStatisticsItem> getQueryStatistics() {
@Override
public TReportExecStatusResult reportExecStatus(TReportExecStatusParams params) {
LOG.info("ReportExecStatus(): fragment_instance_id=" + DebugUtil.printId(params.fragment_instance_id)
+ ", query id=" + DebugUtil.printId(params.query_id) + " params=" + params);
+ ", query id=" + DebugUtil.printId(params.query_id));
LOG.debug("params: {}", params);
final TReportExecStatusResult result = new TReportExecStatusResult();
final QueryInfo info = coordinatorMap.get(params.query_id);
if (info == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.doris.common.Config;
import org.apache.doris.proto.PCancelPlanFragmentRequest;
import org.apache.doris.proto.PCancelPlanFragmentResult;
import org.apache.doris.proto.PPlanFragmentCancelReason;
import org.apache.doris.proto.PExecPlanFragmentResult;
import org.apache.doris.proto.PFetchDataResult;
import org.apache.doris.proto.PProxyRequest;
Expand Down Expand Up @@ -111,12 +112,13 @@ public Future<PExecPlanFragmentResult> execPlanFragmentAsync(
}

public Future<PCancelPlanFragmentResult> cancelPlanFragmentAsync(
TNetworkAddress address, TUniqueId finstId) throws RpcException {
TNetworkAddress address, TUniqueId finstId, PPlanFragmentCancelReason cancelReason) throws RpcException {
final PCancelPlanFragmentRequest pRequest = new PCancelPlanFragmentRequest();
PUniqueId uid = new PUniqueId();
uid.hi = finstId.hi;
uid.lo = finstId.lo;
pRequest.finst_id = uid;
pRequest.cancel_reason = cancelReason;
try {
final PBackendService service = getProxy(address);
return service.cancelPlanFragmentAsync(pRequest);
Expand Down
8 changes: 8 additions & 0 deletions gensrc/proto/internal_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,16 @@ message PExecPlanFragmentResult {
required PStatus status = 1;
};

enum PPlanFragmentCancelReason {
// 0 is reserved
LIMIT_REACH = 1;
USER_CANCEL = 2;
INTERNAL_ERROR = 3;
};

message PCancelPlanFragmentRequest {
required PUniqueId finst_id = 1;
optional PPlanFragmentCancelReason cancel_reason = 2;
};

message PCancelPlanFragmentResult {
Expand Down