diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index d4f6759de01f8f..b7d873571636dd 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -80,7 +80,7 @@ class FragmentExecState { Status execute(); - Status cancel(); + Status cancel(const PPlanFragmentCancelReason& reason); TUniqueId fragment_instance_id() const { return _fragment_instance_id; @@ -209,9 +209,12 @@ Status FragmentExecState::execute() { return Status::OK(); } -Status FragmentExecState::cancel() { +Status FragmentExecState::cancel(const PPlanFragmentCancelReason& reason) { std::lock_guard l(_status_lock); RETURN_IF_ERROR(_exec_status); + if (reason == PPlanFragmentCancelReason::LIMIT_REACH) { + _executor.set_is_report_on_cancel(false); + } _executor.cancel(); return Status::OK(); } @@ -462,7 +465,7 @@ Status FragmentMgr::exec_plan_fragment( return Status::OK(); } -Status FragmentMgr::cancel(const TUniqueId& id) { +Status FragmentMgr::cancel(const TUniqueId& id, const PPlanFragmentCancelReason& reason) { std::shared_ptr exec_state; { std::lock_guard lock(_lock); @@ -473,7 +476,7 @@ Status FragmentMgr::cancel(const TUniqueId& id) { } exec_state = iter->second; } - exec_state->cancel(); + exec_state->cancel(reason); return Status::OK(); } diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index b5fc075c4570b5..77dcf8247c219c 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -54,7 +54,11 @@ class FragmentMgr : public RestMonitorIface { // TODO(zc): report this is over Status exec_plan_fragment(const TExecPlanFragmentParams& params, FinishCallback cb); - Status cancel(const TUniqueId& fragment_id); + Status cancel(const TUniqueId& fragment_id) { + return cancel(fragment_id, PPlanFragmentCancelReason::INTERNAL_ERROR); + } + + Status cancel(const TUniqueId& fragment_id, const PPlanFragmentCancelReason& reason); void cancel_worker(); diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 51d4664336085d..beda13dafd4967 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -55,6 +55,7 @@ PlanFragmentExecutor::PlanFragmentExecutor( _closed(false), _has_thread_token(false), _is_report_success(true), + _is_report_on_cancel(true), _collect_query_statistics_with_every_batch(false) { } @@ -433,6 +434,10 @@ void PlanFragmentExecutor::send_report(bool done) { return; } + if (!_is_report_success && !_is_report_on_cancel) { + return; + } + // This will send a report even if we are cancelled. If the query completed correctly // but fragments still need to be cancelled (e.g. limit reached), the coordinator will // be waiting for a final report and profile. diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h index cc5c51d4b250ae..ff089cb2a3956a 100644 --- a/be/src/runtime/plan_fragment_executor.h +++ b/be/src/runtime/plan_fragment_executor.h @@ -141,6 +141,10 @@ class PlanFragmentExecutor { _stop_report_thread_cv.notify_one(); } + void set_is_report_on_cancel(bool val) { + _is_report_on_cancel = val; + } + private: ExecEnv* _exec_env; // not owned ExecNode* _plan; // lives in _runtime_state->obj_pool() @@ -176,6 +180,10 @@ class PlanFragmentExecutor { bool _is_report_success; + // If this is set to false, and '_is_report_success' is false as well, + // This executor will not report status to FE on being cancelled. + bool _is_report_on_cancel; + // Overall execution status. Either ok() or set to the first error status that // was encountered. Status _status; diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 6b6069f28fd2c1..687d7935f29bb8 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -154,8 +154,15 @@ void PInternalServiceImpl::cancel_plan_fragment( TUniqueId tid; tid.__set_hi(request->finst_id().hi()); tid.__set_lo(request->finst_id().lo()); - LOG(INFO) << "cancel framgent, fragment_instance_id=" << print_id(tid); - auto st = _exec_env->fragment_mgr()->cancel(tid); + + Status st; + if (request->has_cancel_reason()) { + LOG(INFO) << "cancel framgent, fragment_instance_id=" << print_id(tid) << ", reason: " << request->cancel_reason(); + st = _exec_env->fragment_mgr()->cancel(tid, request->cancel_reason()); + } else { + LOG(INFO) << "cancel framgent, fragment_instance_id=" << print_id(tid); + st = _exec_env->fragment_mgr()->cancel(tid); + } if (!st.ok()) { LOG(WARNING) << "cancel plan fragment failed, errmsg=" << st.get_error_msg(); } diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java index 8d289b038d75ee..434ba924d52940 100644 --- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java @@ -44,6 +44,7 @@ import org.apache.doris.planner.ResultSink; import org.apache.doris.planner.ScanNode; import org.apache.doris.planner.UnionNode; +import org.apache.doris.proto.PPlanFragmentCancelReason; import org.apache.doris.proto.PExecPlanFragmentResult; import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.rpc.RpcException; @@ -104,6 +105,7 @@ public class Coordinator { private static final Logger LOG = LogManager.getLogger(Coordinator.class); + private static final DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); private static String localIP = FrontendOptions.getLocalHostAddress(); @@ -331,21 +333,22 @@ private void unlock() { } private void traceInstance() { - // TODO(zc): add a switch to close this function - StringBuilder sb = new StringBuilder(); - int idx = 0; - sb.append("query id=").append(DebugUtil.printId(queryId)).append(","); - sb.append("fragment=["); - for (Map.Entry entry : fragmentExecParamsMap.entrySet()) { - if (idx++ != 0) { - sb.append(","); + if (LOG.isDebugEnabled()) { + // TODO(zc): add a switch to close this function + StringBuilder sb = new StringBuilder(); + int idx = 0; + sb.append("query id=").append(DebugUtil.printId(queryId)).append(","); + sb.append("fragment=["); + for (Map.Entry entry : fragmentExecParamsMap.entrySet()) { + if (idx++ != 0) { + sb.append(","); + } + sb.append(entry.getKey()); + entry.getValue().appendTo(sb); } - sb.append(entry.getKey()); - entry.getValue().appendTo(sb); + sb.append("]"); + LOG.debug(sb.toString()); } - sb.append("]"); - - LOG.info(sb.toString()); } // Initiate asynchronous execution of query. Returns as soon as all plan fragments @@ -458,19 +461,19 @@ public void exec() throws Exception { if (code != TStatusCode.OK) { if (errMsg == null) { - errMsg = "exec rpc error. backend id: " + pair.first.systemBackendId; + errMsg = "exec rpc error. backend id: " + pair.first.backendId; } queryStatus.setStatus(errMsg); LOG.warn("exec plan fragment failed, errmsg={}, fragmentId={}, backend={}:{}", errMsg, fragment.getFragmentId(), pair.first.address.hostname, pair.first.address.port); - cancelInternal(); + cancelInternal(PPlanFragmentCancelReason.INTERNAL_ERROR); switch (code) { case TIMEOUT: - throw new UserException("query timeout. backend id: " + pair.first.systemBackendId); + throw new UserException("query timeout. backend id: " + pair.first.backendId); case THRIFT_RPC_ERROR: - SimpleScheduler.updateBlacklistBackends(pair.first.systemBackendId); - throw new RpcException("rpc failed. backend id: " + pair.first.systemBackendId); + SimpleScheduler.updateBlacklistBackends(pair.first.backendId); + throw new RpcException("rpc failed. backend id: " + pair.first.backendId); default: throw new UserException(errMsg); } @@ -570,7 +573,7 @@ private void updateStatus(Status status, TUniqueId instanceId) { queryStatus.setStatus(status); LOG.warn("one instance report fail throw updateStatus(), need cancel. job id: {}, query id: {}, instance id: {}", jobId, DebugUtil.printId(queryId), instanceId != null ? DebugUtil.printId(instanceId) : "NaN"); - cancelInternal(); + cancelInternal(PPlanFragmentCancelReason.INTERNAL_ERROR); } finally { lock.unlock(); } @@ -625,7 +628,7 @@ public RowBatch getNext() throws Exception { boolean hasLimit = numLimitRows > 0; if (!isBlockQuery && instanceIds.size() > 1 && hasLimit && numReceivedRows >= numLimitRows) { LOG.debug("no block query, return num >= limit rows, need cancel"); - cancelInternal(); + cancelInternal(PPlanFragmentCancelReason.LIMIT_REACH); } } else { numReceivedRows += resultBatch.getBatch().getRowsSize(); @@ -647,17 +650,17 @@ public void cancel() { queryStatus.setStatus(Status.CANCELLED); } LOG.warn("cancel execution of query, this is outside invoke"); - cancelInternal(); + cancelInternal(PPlanFragmentCancelReason.USER_CANCEL); } finally { unlock(); } } - private void cancelInternal() { + private void cancelInternal(PPlanFragmentCancelReason cancelReason) { if (null != receiver) { receiver.cancel(); } - cancelRemoteFragmentsAsync(); + cancelRemoteFragmentsAsync(cancelReason); if (profileDoneSignal != null) { // count down to zero to notify all objects waiting for this profileDoneSignal.countDownToZero(); @@ -665,12 +668,13 @@ private void cancelInternal() { } } - private void cancelRemoteFragmentsAsync() { + private void cancelRemoteFragmentsAsync(PPlanFragmentCancelReason cancelReason) { for (BackendExecState backendExecState : backendExecStates) { TNetworkAddress address = backendExecState.getBackendAddress(); - LOG.info("cancelRemoteFragments initiated={} done={} hasCanceled={} ip={} port={} fragment instance id={}", + LOG.debug("cancelRemoteFragments initiated={} done={} hasCanceled={} ip={} port={} fragment instance id={}, reason: {}", backendExecState.initiated, backendExecState.done, backendExecState.hasCanceled, - address.hostname, address.port, DebugUtil.printId(backendExecState.getFragmentInstanceId())); + address.hostname, address.port, DebugUtil.printId(backendExecState.getFragmentInstanceId()), + cancelReason.name()); backendExecState.lock(); try { @@ -688,7 +692,7 @@ private void cancelRemoteFragmentsAsync() { try { BackendServiceProxy.getInstance().cancelPlanFragmentAsync( - brpcAddress, backendExecState.getFragmentInstanceId()); + brpcAddress, backendExecState.getFragmentInstanceId(), cancelReason); } catch (RpcException e) { LOG.warn("cancel plan fragment get a exception, address={}:{}", brpcAddress.getHostname(), brpcAddress.getPort()); @@ -1079,7 +1083,7 @@ private void computeScanRangeAssignmentByScheduler( public void updateFragmentExecStatus(TReportExecStatusParams params) { if (params.backend_num >= backendExecStates.size()) { - LOG.error("unknown backend number: {}, expected less than: {}", + LOG.warn("unknown backend number: {}, expected less than: {}", params.backend_num, backendExecStates.size()); return; } @@ -1114,7 +1118,7 @@ public void updateFragmentExecStatus(TReportExecStatusParams params) { // and returned_all_results_ is true. // (UpdateStatus() initiates cancellation, if it hasn't already been initiated) if (!(returnedAllResults && status.isCancelled()) && !status.ok()) { - LOG.warn("One instance report fail, query_id={} instance_id={}", + LOG.warn("one instance report fail, query_id={} instance_id={}", DebugUtil.printId(queryId), DebugUtil.printId(params.getFragment_instance_id())); updateStatus(status, params.getFragment_instance_id()); } @@ -1199,7 +1203,7 @@ public class BackendExecState { private int profileFragmentId; RuntimeProfile profile; TNetworkAddress address; - Long systemBackendId; + Long backendId; public int profileFragmentId() { return profileFragmentId; @@ -1225,7 +1229,7 @@ public int getInstanceId() { return instanceId; } - public PlanFragmentId getfragmentId() { + public PlanFragmentId getFragmentId() { return fragmentId; } @@ -1238,7 +1242,7 @@ public BackendExecState(PlanFragmentId fragmentId, int instanceId, int profileFr this.initiated = false; this.done = false; this.address = fragmentExecParamsMap.get(fragmentId).instanceExecParams.get(instanceId).host; - this.systemBackendId = addressToBackendID.get(address); + this.backendId = addressToBackendID.get(address); String name = "Instance " + DebugUtil.printId(fragmentExecParamsMap.get(fragmentId) .instanceExecParams.get(instanceId).instanceId) + " (host=" + address + ")"; @@ -1265,7 +1269,7 @@ public Future execRemoteFragmentAsync() throws TExcepti try { return BackendServiceProxy.getInstance().execPlanFragmentAsync(brpcAddress, rpcParams); } catch (RpcException e) { - SimpleScheduler.updateBlacklistBackends(systemBackendId); + SimpleScheduler.updateBlacklistBackends(backendId); throw e; } } @@ -1426,7 +1430,7 @@ public List getFragmentInstanceInfos() for (int index = 0; index < fragments.size(); index++) { for (Map.Entry entry: backendExecStateMap.entrySet()) { final BackendExecState backendExecState = entry.getValue(); - if (fragments.get(index).getFragmentId() != backendExecState.getfragmentId()) { + if (fragments.get(index).getFragmentId() != backendExecState.getFragmentId()) { continue; } final QueryStatisticsItem.FragmentInstanceInfo info diff --git a/fe/src/main/java/org/apache/doris/qe/QeProcessorImpl.java b/fe/src/main/java/org/apache/doris/qe/QeProcessorImpl.java index c7876bf491eb4b..5a08cb97d43127 100644 --- a/fe/src/main/java/org/apache/doris/qe/QeProcessorImpl.java +++ b/fe/src/main/java/org/apache/doris/qe/QeProcessorImpl.java @@ -96,7 +96,8 @@ public Map getQueryStatistics() { @Override public TReportExecStatusResult reportExecStatus(TReportExecStatusParams params) { LOG.info("ReportExecStatus(): fragment_instance_id=" + DebugUtil.printId(params.fragment_instance_id) - + ", query id=" + DebugUtil.printId(params.query_id) + " params=" + params); + + ", query id=" + DebugUtil.printId(params.query_id)); + LOG.debug("params: {}", params); final TReportExecStatusResult result = new TReportExecStatusResult(); final QueryInfo info = coordinatorMap.get(params.query_id); if (info == null) { diff --git a/fe/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index e19e35fac0d5e6..18dc80e18bcc67 100644 --- a/fe/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -20,6 +20,7 @@ import org.apache.doris.common.Config; import org.apache.doris.proto.PCancelPlanFragmentRequest; import org.apache.doris.proto.PCancelPlanFragmentResult; +import org.apache.doris.proto.PPlanFragmentCancelReason; import org.apache.doris.proto.PExecPlanFragmentResult; import org.apache.doris.proto.PFetchDataResult; import org.apache.doris.proto.PProxyRequest; @@ -111,12 +112,13 @@ public Future execPlanFragmentAsync( } public Future cancelPlanFragmentAsync( - TNetworkAddress address, TUniqueId finstId) throws RpcException { + TNetworkAddress address, TUniqueId finstId, PPlanFragmentCancelReason cancelReason) throws RpcException { final PCancelPlanFragmentRequest pRequest = new PCancelPlanFragmentRequest(); PUniqueId uid = new PUniqueId(); uid.hi = finstId.hi; uid.lo = finstId.lo; pRequest.finst_id = uid; + pRequest.cancel_reason = cancelReason; try { final PBackendService service = getProxy(address); return service.cancelPlanFragmentAsync(pRequest); diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 4b57fb9b8e9f2e..be36bc82189142 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -114,8 +114,16 @@ message PExecPlanFragmentResult { required PStatus status = 1; }; +enum PPlanFragmentCancelReason { + // 0 is reserved + LIMIT_REACH = 1; + USER_CANCEL = 2; + INTERNAL_ERROR = 3; +}; + message PCancelPlanFragmentRequest { required PUniqueId finst_id = 1; + optional PPlanFragmentCancelReason cancel_reason = 2; }; message PCancelPlanFragmentResult {