From 2af0a9fd1551685cee995bca72dbb9cfac3143b8 Mon Sep 17 00:00:00 2001 From: morningman Date: Tue, 16 Jul 2019 22:40:02 +0800 Subject: [PATCH 1/5] Remove query status report from BE when query is cancelled normally When query result reach limit, the Coordinator in FE will send a cancel request to BE to cancel the query. And when being cancelled, BE will report query status to FE for debug purpose. But actually it is not necessary and will generate too many logs. So I add a CancelReason to distinguish the difference between 'normally' cancellation and 'internal error' cancellation. if 'normally' cancelled, no status will be reported from BE. When query reach limit, or user cancel it actively, it is being cancelled 'normally'. Otherwise, the query is cancelled due to internal error, which will need a report from BE. --- be/src/runtime/fragment_mgr.cpp | 11 ++-- be/src/runtime/fragment_mgr.h | 6 +- be/src/runtime/plan_fragment_executor.cpp | 5 ++ be/src/runtime/plan_fragment_executor.h | 8 +++ be/src/service/internal_service.cpp | 8 ++- .../java/org/apache/doris/qe/Coordinator.java | 61 +++++++++++-------- .../org/apache/doris/qe/QeProcessorImpl.java | 3 +- .../apache/doris/rpc/BackendServiceProxy.java | 3 +- gensrc/proto/internal_service.proto | 6 ++ 9 files changed, 78 insertions(+), 33 deletions(-) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index d4f6759de01f8f..d8472072a9e6ff 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -80,7 +80,7 @@ class FragmentExecState { Status execute(); - Status cancel(); + Status cancel(const PCancelPlanFragmentRequest::CancelReason& reason); TUniqueId fragment_instance_id() const { return _fragment_instance_id; @@ -209,9 +209,12 @@ Status FragmentExecState::execute() { return Status::OK(); } -Status FragmentExecState::cancel() { +Status FragmentExecState::cancel(const PCancelPlanFragmentRequest::CancelReason& reason) { std::lock_guard l(_status_lock); RETURN_IF_ERROR(_exec_status); + if (reason != PCancelPlanFragmentRequest::INTERNAL_ERROR) { + _executor.set_is_report_on_cancel(false); + } _executor.cancel(); return Status::OK(); } @@ -462,7 +465,7 @@ Status FragmentMgr::exec_plan_fragment( return Status::OK(); } -Status FragmentMgr::cancel(const TUniqueId& id) { +Status FragmentMgr::cancel(const TUniqueId& id, const PCancelPlanFragmentRequest::CancelReason& reason) { std::shared_ptr exec_state; { std::lock_guard lock(_lock); @@ -473,7 +476,7 @@ Status FragmentMgr::cancel(const TUniqueId& id) { } exec_state = iter->second; } - exec_state->cancel(); + exec_state->cancel(reason); return Status::OK(); } diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index b5fc075c4570b5..65fa2fae4f6586 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -54,7 +54,11 @@ class FragmentMgr : public RestMonitorIface { // TODO(zc): report this is over Status exec_plan_fragment(const TExecPlanFragmentParams& params, FinishCallback cb); - Status cancel(const TUniqueId& fragment_id); + Status cancel(const TUniqueId& fragment_id) { + return cancel(fragment_id, PCancelPlanFragmentRequest::INTERNAL_ERROR); + } + + Status cancel(const TUniqueId& fragment_id, const PCancelPlanFragmentRequest::CancelReason& reason); void cancel_worker(); diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 51d4664336085d..beda13dafd4967 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -55,6 +55,7 @@ PlanFragmentExecutor::PlanFragmentExecutor( _closed(false), _has_thread_token(false), _is_report_success(true), + _is_report_on_cancel(true), _collect_query_statistics_with_every_batch(false) { } @@ -433,6 +434,10 @@ void PlanFragmentExecutor::send_report(bool done) { return; } + if (!_is_report_success && !_is_report_on_cancel) { + return; + } + // This will send a report even if we are cancelled. If the query completed correctly // but fragments still need to be cancelled (e.g. limit reached), the coordinator will // be waiting for a final report and profile. diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h index cc5c51d4b250ae..ff089cb2a3956a 100644 --- a/be/src/runtime/plan_fragment_executor.h +++ b/be/src/runtime/plan_fragment_executor.h @@ -141,6 +141,10 @@ class PlanFragmentExecutor { _stop_report_thread_cv.notify_one(); } + void set_is_report_on_cancel(bool val) { + _is_report_on_cancel = val; + } + private: ExecEnv* _exec_env; // not owned ExecNode* _plan; // lives in _runtime_state->obj_pool() @@ -176,6 +180,10 @@ class PlanFragmentExecutor { bool _is_report_success; + // If this is set to false, and '_is_report_success' is false as well, + // This executor will not report status to FE on being cancelled. + bool _is_report_on_cancel; + // Overall execution status. Either ok() or set to the first error status that // was encountered. Status _status; diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 6b6069f28fd2c1..03a1d6da5068ae 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -154,8 +154,14 @@ void PInternalServiceImpl::cancel_plan_fragment( TUniqueId tid; tid.__set_hi(request->finst_id().hi()); tid.__set_lo(request->finst_id().lo()); + LOG(INFO) << "cancel framgent, fragment_instance_id=" << print_id(tid); - auto st = _exec_env->fragment_mgr()->cancel(tid); + Status st; + if (request->has_cancel_reason()) { + st = _exec_env->fragment_mgr()->cancel(tid, request->cancel_reason()); + } else { + st = _exec_env->fragment_mgr()->cancel(tid); + } if (!st.ok()) { LOG(WARNING) << "cancel plan fragment failed, errmsg=" << st.get_error_msg(); } diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java index 8d289b038d75ee..c47a736422894d 100644 --- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java @@ -104,6 +104,15 @@ public class Coordinator { private static final Logger LOG = LogManager.getLogger(Coordinator.class); + + // The reason of cancellation of this query. + // This is used for telling BE whether it need to report query status when being cancelled. + public enum CancelReason { + LIMIT_REACH, // cancel the query because reaching limit, no need to report + USER_CANCEL, // query cancelled by client, no need to report + INTERNAL_ERROR // internal error + } + private static final DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); private static String localIP = FrontendOptions.getLocalHostAddress(); @@ -331,21 +340,22 @@ private void unlock() { } private void traceInstance() { - // TODO(zc): add a switch to close this function - StringBuilder sb = new StringBuilder(); - int idx = 0; - sb.append("query id=").append(DebugUtil.printId(queryId)).append(","); - sb.append("fragment=["); - for (Map.Entry entry : fragmentExecParamsMap.entrySet()) { - if (idx++ != 0) { - sb.append(","); + if (LOG.isDebugEnabled()) { + // TODO(zc): add a switch to close this function + StringBuilder sb = new StringBuilder(); + int idx = 0; + sb.append("query id=").append(DebugUtil.printId(queryId)).append(","); + sb.append("fragment=["); + for (Map.Entry entry : fragmentExecParamsMap.entrySet()) { + if (idx++ != 0) { + sb.append(","); + } + sb.append(entry.getKey()); + entry.getValue().appendTo(sb); } - sb.append(entry.getKey()); - entry.getValue().appendTo(sb); + sb.append("]"); + LOG.debug(sb.toString()); } - sb.append("]"); - - LOG.info(sb.toString()); } // Initiate asynchronous execution of query. Returns as soon as all plan fragments @@ -464,7 +474,7 @@ public void exec() throws Exception { LOG.warn("exec plan fragment failed, errmsg={}, fragmentId={}, backend={}:{}", errMsg, fragment.getFragmentId(), pair.first.address.hostname, pair.first.address.port); - cancelInternal(); + cancelInternal(CancelReason.INTERNAL_ERROR); switch (code) { case TIMEOUT: throw new UserException("query timeout. backend id: " + pair.first.systemBackendId); @@ -570,7 +580,7 @@ private void updateStatus(Status status, TUniqueId instanceId) { queryStatus.setStatus(status); LOG.warn("one instance report fail throw updateStatus(), need cancel. job id: {}, query id: {}, instance id: {}", jobId, DebugUtil.printId(queryId), instanceId != null ? DebugUtil.printId(instanceId) : "NaN"); - cancelInternal(); + cancelInternal(CancelReason.INTERNAL_ERROR); } finally { lock.unlock(); } @@ -625,7 +635,7 @@ public RowBatch getNext() throws Exception { boolean hasLimit = numLimitRows > 0; if (!isBlockQuery && instanceIds.size() > 1 && hasLimit && numReceivedRows >= numLimitRows) { LOG.debug("no block query, return num >= limit rows, need cancel"); - cancelInternal(); + cancelInternal(CancelReason.LIMIT_REACH); } } else { numReceivedRows += resultBatch.getBatch().getRowsSize(); @@ -647,17 +657,17 @@ public void cancel() { queryStatus.setStatus(Status.CANCELLED); } LOG.warn("cancel execution of query, this is outside invoke"); - cancelInternal(); + cancelInternal(CancelReason.USER_CANCEL); } finally { unlock(); } } - private void cancelInternal() { + private void cancelInternal(CancelReason cancelReason) { if (null != receiver) { receiver.cancel(); } - cancelRemoteFragmentsAsync(); + cancelRemoteFragmentsAsync(cancelReason); if (profileDoneSignal != null) { // count down to zero to notify all objects waiting for this profileDoneSignal.countDownToZero(); @@ -665,12 +675,13 @@ private void cancelInternal() { } } - private void cancelRemoteFragmentsAsync() { + private void cancelRemoteFragmentsAsync(CancelReason cancelReason) { for (BackendExecState backendExecState : backendExecStates) { TNetworkAddress address = backendExecState.getBackendAddress(); - LOG.info("cancelRemoteFragments initiated={} done={} hasCanceled={} ip={} port={} fragment instance id={}", + LOG.info("cancelRemoteFragments initiated={} done={} hasCanceled={} ip={} port={} fragment instance id={}, reason: {}", backendExecState.initiated, backendExecState.done, backendExecState.hasCanceled, - address.hostname, address.port, DebugUtil.printId(backendExecState.getFragmentInstanceId())); + address.hostname, address.port, DebugUtil.printId(backendExecState.getFragmentInstanceId()), + cancelReason.name()); backendExecState.lock(); try { @@ -688,7 +699,7 @@ private void cancelRemoteFragmentsAsync() { try { BackendServiceProxy.getInstance().cancelPlanFragmentAsync( - brpcAddress, backendExecState.getFragmentInstanceId()); + brpcAddress, backendExecState.getFragmentInstanceId(), cancelReason); } catch (RpcException e) { LOG.warn("cancel plan fragment get a exception, address={}:{}", brpcAddress.getHostname(), brpcAddress.getPort()); @@ -1079,7 +1090,7 @@ private void computeScanRangeAssignmentByScheduler( public void updateFragmentExecStatus(TReportExecStatusParams params) { if (params.backend_num >= backendExecStates.size()) { - LOG.error("unknown backend number: {}, expected less than: {}", + LOG.warn("unknown backend number: {}, expected less than: {}", params.backend_num, backendExecStates.size()); return; } @@ -1114,7 +1125,7 @@ public void updateFragmentExecStatus(TReportExecStatusParams params) { // and returned_all_results_ is true. // (UpdateStatus() initiates cancellation, if it hasn't already been initiated) if (!(returnedAllResults && status.isCancelled()) && !status.ok()) { - LOG.warn("One instance report fail, query_id={} instance_id={}", + LOG.warn("one instance report fail, query_id={} instance_id={}", DebugUtil.printId(queryId), DebugUtil.printId(params.getFragment_instance_id())); updateStatus(status, params.getFragment_instance_id()); } diff --git a/fe/src/main/java/org/apache/doris/qe/QeProcessorImpl.java b/fe/src/main/java/org/apache/doris/qe/QeProcessorImpl.java index c7876bf491eb4b..5a08cb97d43127 100644 --- a/fe/src/main/java/org/apache/doris/qe/QeProcessorImpl.java +++ b/fe/src/main/java/org/apache/doris/qe/QeProcessorImpl.java @@ -96,7 +96,8 @@ public Map getQueryStatistics() { @Override public TReportExecStatusResult reportExecStatus(TReportExecStatusParams params) { LOG.info("ReportExecStatus(): fragment_instance_id=" + DebugUtil.printId(params.fragment_instance_id) - + ", query id=" + DebugUtil.printId(params.query_id) + " params=" + params); + + ", query id=" + DebugUtil.printId(params.query_id)); + LOG.debug("params: {}", params); final TReportExecStatusResult result = new TReportExecStatusResult(); final QueryInfo info = coordinatorMap.get(params.query_id); if (info == null) { diff --git a/fe/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index e19e35fac0d5e6..c824f9dafc194f 100644 --- a/fe/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -26,6 +26,7 @@ import org.apache.doris.proto.PProxyResult; import org.apache.doris.proto.PTriggerProfileReportResult; import org.apache.doris.proto.PUniqueId; +import org.apache.doris.qe.Coordinator.CancelReason; import org.apache.doris.thrift.TExecPlanFragmentParams; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TUniqueId; @@ -111,7 +112,7 @@ public Future execPlanFragmentAsync( } public Future cancelPlanFragmentAsync( - TNetworkAddress address, TUniqueId finstId) throws RpcException { + TNetworkAddress address, TUniqueId finstId, CancelReason cancelReason) throws RpcException { final PCancelPlanFragmentRequest pRequest = new PCancelPlanFragmentRequest(); PUniqueId uid = new PUniqueId(); uid.hi = finstId.hi; diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 4b57fb9b8e9f2e..7a3cbcad1e6990 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -116,6 +116,12 @@ message PExecPlanFragmentResult { message PCancelPlanFragmentRequest { required PUniqueId finst_id = 1; + enum CancelReason { + LIMIT_REACH = 0; + USER_CANCEL = 1; + INTERNAL_ERROR = 2; + } + optional CancelReason cancel_reason = 2; }; message PCancelPlanFragmentResult { From 6362aa84c24ebb86afd3f29a98069afebf384552 Mon Sep 17 00:00:00 2001 From: morningman Date: Wed, 17 Jul 2019 09:17:38 +0800 Subject: [PATCH 2/5] fix bug --- .../java/org/apache/doris/qe/Coordinator.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java index c47a736422894d..36e53b646881cb 100644 --- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java @@ -468,7 +468,7 @@ public void exec() throws Exception { if (code != TStatusCode.OK) { if (errMsg == null) { - errMsg = "exec rpc error. backend id: " + pair.first.systemBackendId; + errMsg = "exec rpc error. backend id: " + pair.first.backendId; } queryStatus.setStatus(errMsg); LOG.warn("exec plan fragment failed, errmsg={}, fragmentId={}, backend={}:{}", @@ -477,10 +477,10 @@ public void exec() throws Exception { cancelInternal(CancelReason.INTERNAL_ERROR); switch (code) { case TIMEOUT: - throw new UserException("query timeout. backend id: " + pair.first.systemBackendId); + throw new UserException("query timeout. backend id: " + pair.first.backendId); case THRIFT_RPC_ERROR: - SimpleScheduler.updateBlacklistBackends(pair.first.systemBackendId); - throw new RpcException("rpc failed. backend id: " + pair.first.systemBackendId); + SimpleScheduler.updateBlacklistBackends(pair.first.backendId); + throw new RpcException("rpc failed. backend id: " + pair.first.backendId); default: throw new UserException(errMsg); } @@ -1210,7 +1210,7 @@ public class BackendExecState { private int profileFragmentId; RuntimeProfile profile; TNetworkAddress address; - Long systemBackendId; + Long backendId; public int profileFragmentId() { return profileFragmentId; @@ -1236,7 +1236,7 @@ public int getInstanceId() { return instanceId; } - public PlanFragmentId getfragmentId() { + public PlanFragmentId getFragmentId() { return fragmentId; } @@ -1249,7 +1249,7 @@ public BackendExecState(PlanFragmentId fragmentId, int instanceId, int profileFr this.initiated = false; this.done = false; this.address = fragmentExecParamsMap.get(fragmentId).instanceExecParams.get(instanceId).host; - this.systemBackendId = addressToBackendID.get(address); + this.backendId = addressToBackendID.get(address); String name = "Instance " + DebugUtil.printId(fragmentExecParamsMap.get(fragmentId) .instanceExecParams.get(instanceId).instanceId) + " (host=" + address + ")"; @@ -1276,7 +1276,7 @@ public Future execRemoteFragmentAsync() throws TExcepti try { return BackendServiceProxy.getInstance().execPlanFragmentAsync(brpcAddress, rpcParams); } catch (RpcException e) { - SimpleScheduler.updateBlacklistBackends(systemBackendId); + SimpleScheduler.updateBlacklistBackends(backendId); throw e; } } @@ -1437,7 +1437,7 @@ public List getFragmentInstanceInfos() for (int index = 0; index < fragments.size(); index++) { for (Map.Entry entry: backendExecStateMap.entrySet()) { final BackendExecState backendExecState = entry.getValue(); - if (fragments.get(index).getFragmentId() != backendExecState.getfragmentId()) { + if (fragments.get(index).getFragmentId() != backendExecState.getFragmentId()) { continue; } final QueryStatisticsItem.FragmentInstanceInfo info From e6c5c9b02e65f9fff53789d37a2d48ba02b11ac5 Mon Sep 17 00:00:00 2001 From: morningman Date: Wed, 17 Jul 2019 10:25:00 +0800 Subject: [PATCH 3/5] move CancelReason to PCancelReason --- be/src/runtime/fragment_mgr.cpp | 8 ++++---- be/src/service/internal_service.cpp | 3 ++- gensrc/proto/internal_service.proto | 14 ++++++++------ 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index d8472072a9e6ff..15a1aafa51cfa1 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -80,7 +80,7 @@ class FragmentExecState { Status execute(); - Status cancel(const PCancelPlanFragmentRequest::CancelReason& reason); + Status cancel(const PCancelReason& reason); TUniqueId fragment_instance_id() const { return _fragment_instance_id; @@ -209,10 +209,10 @@ Status FragmentExecState::execute() { return Status::OK(); } -Status FragmentExecState::cancel(const PCancelPlanFragmentRequest::CancelReason& reason) { +Status FragmentExecState::cancel(const PCancelReason& reason) { std::lock_guard l(_status_lock); RETURN_IF_ERROR(_exec_status); - if (reason != PCancelPlanFragmentRequest::INTERNAL_ERROR) { + if (reason == PCancelReason::LIMIT_REACH) { _executor.set_is_report_on_cancel(false); } _executor.cancel(); @@ -465,7 +465,7 @@ Status FragmentMgr::exec_plan_fragment( return Status::OK(); } -Status FragmentMgr::cancel(const TUniqueId& id, const PCancelPlanFragmentRequest::CancelReason& reason) { +Status FragmentMgr::cancel(const TUniqueId& id, const PCancelReason& reason) { std::shared_ptr exec_state; { std::lock_guard lock(_lock); diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 03a1d6da5068ae..687d7935f29bb8 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -155,11 +155,12 @@ void PInternalServiceImpl::cancel_plan_fragment( tid.__set_hi(request->finst_id().hi()); tid.__set_lo(request->finst_id().lo()); - LOG(INFO) << "cancel framgent, fragment_instance_id=" << print_id(tid); Status st; if (request->has_cancel_reason()) { + LOG(INFO) << "cancel framgent, fragment_instance_id=" << print_id(tid) << ", reason: " << request->cancel_reason(); st = _exec_env->fragment_mgr()->cancel(tid, request->cancel_reason()); } else { + LOG(INFO) << "cancel framgent, fragment_instance_id=" << print_id(tid); st = _exec_env->fragment_mgr()->cancel(tid); } if (!st.ok()) { diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 7a3cbcad1e6990..99383525de87e3 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -114,14 +114,16 @@ message PExecPlanFragmentResult { required PStatus status = 1; }; +enum PCancelReason { + // 0 is reserved + LIMIT_REACH = 1; + USER_CANCEL = 2; + INTERNAL_ERROR = 3; +}; + message PCancelPlanFragmentRequest { required PUniqueId finst_id = 1; - enum CancelReason { - LIMIT_REACH = 0; - USER_CANCEL = 1; - INTERNAL_ERROR = 2; - } - optional CancelReason cancel_reason = 2; + optional PCancelReason cancel_reason = 2; }; message PCancelPlanFragmentResult { From 73ffba527a844a77d3cbbbe8dc13bb2520bc5d0c Mon Sep 17 00:00:00 2001 From: morningman Date: Wed, 17 Jul 2019 10:37:25 +0800 Subject: [PATCH 4/5] fix fe code --- be/src/runtime/fragment_mgr.h | 4 ++-- .../java/org/apache/doris/qe/Coordinator.java | 23 +++++++------------ .../apache/doris/rpc/BackendServiceProxy.java | 5 ++-- 3 files changed, 13 insertions(+), 19 deletions(-) diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 65fa2fae4f6586..282774b58ccd70 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -55,10 +55,10 @@ class FragmentMgr : public RestMonitorIface { Status exec_plan_fragment(const TExecPlanFragmentParams& params, FinishCallback cb); Status cancel(const TUniqueId& fragment_id) { - return cancel(fragment_id, PCancelPlanFragmentRequest::INTERNAL_ERROR); + return cancel(fragment_id, PCancelReason::INTERNAL_ERROR); } - Status cancel(const TUniqueId& fragment_id, const PCancelPlanFragmentRequest::CancelReason& reason); + Status cancel(const TUniqueId& fragment_id, const PCancelReason& reason); void cancel_worker(); diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java index 36e53b646881cb..29ca14ed4aa203 100644 --- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java @@ -44,6 +44,7 @@ import org.apache.doris.planner.ResultSink; import org.apache.doris.planner.ScanNode; import org.apache.doris.planner.UnionNode; +import org.apache.doris.proto.PCancelReason; import org.apache.doris.proto.PExecPlanFragmentResult; import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.rpc.RpcException; @@ -105,14 +106,6 @@ public class Coordinator { private static final Logger LOG = LogManager.getLogger(Coordinator.class); - // The reason of cancellation of this query. - // This is used for telling BE whether it need to report query status when being cancelled. - public enum CancelReason { - LIMIT_REACH, // cancel the query because reaching limit, no need to report - USER_CANCEL, // query cancelled by client, no need to report - INTERNAL_ERROR // internal error - } - private static final DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); private static String localIP = FrontendOptions.getLocalHostAddress(); @@ -474,7 +467,7 @@ public void exec() throws Exception { LOG.warn("exec plan fragment failed, errmsg={}, fragmentId={}, backend={}:{}", errMsg, fragment.getFragmentId(), pair.first.address.hostname, pair.first.address.port); - cancelInternal(CancelReason.INTERNAL_ERROR); + cancelInternal(PCancelReason.INTERNAL_ERROR); switch (code) { case TIMEOUT: throw new UserException("query timeout. backend id: " + pair.first.backendId); @@ -580,7 +573,7 @@ private void updateStatus(Status status, TUniqueId instanceId) { queryStatus.setStatus(status); LOG.warn("one instance report fail throw updateStatus(), need cancel. job id: {}, query id: {}, instance id: {}", jobId, DebugUtil.printId(queryId), instanceId != null ? DebugUtil.printId(instanceId) : "NaN"); - cancelInternal(CancelReason.INTERNAL_ERROR); + cancelInternal(PCancelReason.INTERNAL_ERROR); } finally { lock.unlock(); } @@ -635,7 +628,7 @@ public RowBatch getNext() throws Exception { boolean hasLimit = numLimitRows > 0; if (!isBlockQuery && instanceIds.size() > 1 && hasLimit && numReceivedRows >= numLimitRows) { LOG.debug("no block query, return num >= limit rows, need cancel"); - cancelInternal(CancelReason.LIMIT_REACH); + cancelInternal(PCancelReason.LIMIT_REACH); } } else { numReceivedRows += resultBatch.getBatch().getRowsSize(); @@ -657,13 +650,13 @@ public void cancel() { queryStatus.setStatus(Status.CANCELLED); } LOG.warn("cancel execution of query, this is outside invoke"); - cancelInternal(CancelReason.USER_CANCEL); + cancelInternal(PCancelReason.USER_CANCEL); } finally { unlock(); } } - private void cancelInternal(CancelReason cancelReason) { + private void cancelInternal(PCancelReason cancelReason) { if (null != receiver) { receiver.cancel(); } @@ -675,10 +668,10 @@ private void cancelInternal(CancelReason cancelReason) { } } - private void cancelRemoteFragmentsAsync(CancelReason cancelReason) { + private void cancelRemoteFragmentsAsync(PCancelReason cancelReason) { for (BackendExecState backendExecState : backendExecStates) { TNetworkAddress address = backendExecState.getBackendAddress(); - LOG.info("cancelRemoteFragments initiated={} done={} hasCanceled={} ip={} port={} fragment instance id={}, reason: {}", + LOG.debug("cancelRemoteFragments initiated={} done={} hasCanceled={} ip={} port={} fragment instance id={}, reason: {}", backendExecState.initiated, backendExecState.done, backendExecState.hasCanceled, address.hostname, address.port, DebugUtil.printId(backendExecState.getFragmentInstanceId()), cancelReason.name()); diff --git a/fe/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index c824f9dafc194f..a4e5b542f57e0f 100644 --- a/fe/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -20,13 +20,13 @@ import org.apache.doris.common.Config; import org.apache.doris.proto.PCancelPlanFragmentRequest; import org.apache.doris.proto.PCancelPlanFragmentResult; +import org.apache.doris.proto.PCancelReason; import org.apache.doris.proto.PExecPlanFragmentResult; import org.apache.doris.proto.PFetchDataResult; import org.apache.doris.proto.PProxyRequest; import org.apache.doris.proto.PProxyResult; import org.apache.doris.proto.PTriggerProfileReportResult; import org.apache.doris.proto.PUniqueId; -import org.apache.doris.qe.Coordinator.CancelReason; import org.apache.doris.thrift.TExecPlanFragmentParams; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TUniqueId; @@ -112,12 +112,13 @@ public Future execPlanFragmentAsync( } public Future cancelPlanFragmentAsync( - TNetworkAddress address, TUniqueId finstId, CancelReason cancelReason) throws RpcException { + TNetworkAddress address, TUniqueId finstId, PCancelReason cancelReason) throws RpcException { final PCancelPlanFragmentRequest pRequest = new PCancelPlanFragmentRequest(); PUniqueId uid = new PUniqueId(); uid.hi = finstId.hi; uid.lo = finstId.lo; pRequest.finst_id = uid; + pRequest.cancel_reason = cancelReason; try { final PBackendService service = getProxy(address); return service.cancelPlanFragmentAsync(pRequest); From 6641e677b962a5c6ff0e0f6fa60bb187caafade6 Mon Sep 17 00:00:00 2001 From: morningman Date: Wed, 17 Jul 2019 14:13:57 +0800 Subject: [PATCH 5/5] rename PCancelReason to PPlanFragmentCancelReason --- be/src/runtime/fragment_mgr.cpp | 8 ++++---- be/src/runtime/fragment_mgr.h | 4 ++-- .../main/java/org/apache/doris/qe/Coordinator.java | 14 +++++++------- .../org/apache/doris/rpc/BackendServiceProxy.java | 4 ++-- gensrc/proto/internal_service.proto | 4 ++-- 5 files changed, 17 insertions(+), 17 deletions(-) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 15a1aafa51cfa1..b7d873571636dd 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -80,7 +80,7 @@ class FragmentExecState { Status execute(); - Status cancel(const PCancelReason& reason); + Status cancel(const PPlanFragmentCancelReason& reason); TUniqueId fragment_instance_id() const { return _fragment_instance_id; @@ -209,10 +209,10 @@ Status FragmentExecState::execute() { return Status::OK(); } -Status FragmentExecState::cancel(const PCancelReason& reason) { +Status FragmentExecState::cancel(const PPlanFragmentCancelReason& reason) { std::lock_guard l(_status_lock); RETURN_IF_ERROR(_exec_status); - if (reason == PCancelReason::LIMIT_REACH) { + if (reason == PPlanFragmentCancelReason::LIMIT_REACH) { _executor.set_is_report_on_cancel(false); } _executor.cancel(); @@ -465,7 +465,7 @@ Status FragmentMgr::exec_plan_fragment( return Status::OK(); } -Status FragmentMgr::cancel(const TUniqueId& id, const PCancelReason& reason) { +Status FragmentMgr::cancel(const TUniqueId& id, const PPlanFragmentCancelReason& reason) { std::shared_ptr exec_state; { std::lock_guard lock(_lock); diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 282774b58ccd70..77dcf8247c219c 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -55,10 +55,10 @@ class FragmentMgr : public RestMonitorIface { Status exec_plan_fragment(const TExecPlanFragmentParams& params, FinishCallback cb); Status cancel(const TUniqueId& fragment_id) { - return cancel(fragment_id, PCancelReason::INTERNAL_ERROR); + return cancel(fragment_id, PPlanFragmentCancelReason::INTERNAL_ERROR); } - Status cancel(const TUniqueId& fragment_id, const PCancelReason& reason); + Status cancel(const TUniqueId& fragment_id, const PPlanFragmentCancelReason& reason); void cancel_worker(); diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java index 29ca14ed4aa203..434ba924d52940 100644 --- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java @@ -44,7 +44,7 @@ import org.apache.doris.planner.ResultSink; import org.apache.doris.planner.ScanNode; import org.apache.doris.planner.UnionNode; -import org.apache.doris.proto.PCancelReason; +import org.apache.doris.proto.PPlanFragmentCancelReason; import org.apache.doris.proto.PExecPlanFragmentResult; import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.rpc.RpcException; @@ -467,7 +467,7 @@ public void exec() throws Exception { LOG.warn("exec plan fragment failed, errmsg={}, fragmentId={}, backend={}:{}", errMsg, fragment.getFragmentId(), pair.first.address.hostname, pair.first.address.port); - cancelInternal(PCancelReason.INTERNAL_ERROR); + cancelInternal(PPlanFragmentCancelReason.INTERNAL_ERROR); switch (code) { case TIMEOUT: throw new UserException("query timeout. backend id: " + pair.first.backendId); @@ -573,7 +573,7 @@ private void updateStatus(Status status, TUniqueId instanceId) { queryStatus.setStatus(status); LOG.warn("one instance report fail throw updateStatus(), need cancel. job id: {}, query id: {}, instance id: {}", jobId, DebugUtil.printId(queryId), instanceId != null ? DebugUtil.printId(instanceId) : "NaN"); - cancelInternal(PCancelReason.INTERNAL_ERROR); + cancelInternal(PPlanFragmentCancelReason.INTERNAL_ERROR); } finally { lock.unlock(); } @@ -628,7 +628,7 @@ public RowBatch getNext() throws Exception { boolean hasLimit = numLimitRows > 0; if (!isBlockQuery && instanceIds.size() > 1 && hasLimit && numReceivedRows >= numLimitRows) { LOG.debug("no block query, return num >= limit rows, need cancel"); - cancelInternal(PCancelReason.LIMIT_REACH); + cancelInternal(PPlanFragmentCancelReason.LIMIT_REACH); } } else { numReceivedRows += resultBatch.getBatch().getRowsSize(); @@ -650,13 +650,13 @@ public void cancel() { queryStatus.setStatus(Status.CANCELLED); } LOG.warn("cancel execution of query, this is outside invoke"); - cancelInternal(PCancelReason.USER_CANCEL); + cancelInternal(PPlanFragmentCancelReason.USER_CANCEL); } finally { unlock(); } } - private void cancelInternal(PCancelReason cancelReason) { + private void cancelInternal(PPlanFragmentCancelReason cancelReason) { if (null != receiver) { receiver.cancel(); } @@ -668,7 +668,7 @@ private void cancelInternal(PCancelReason cancelReason) { } } - private void cancelRemoteFragmentsAsync(PCancelReason cancelReason) { + private void cancelRemoteFragmentsAsync(PPlanFragmentCancelReason cancelReason) { for (BackendExecState backendExecState : backendExecStates) { TNetworkAddress address = backendExecState.getBackendAddress(); LOG.debug("cancelRemoteFragments initiated={} done={} hasCanceled={} ip={} port={} fragment instance id={}, reason: {}", diff --git a/fe/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index a4e5b542f57e0f..18dc80e18bcc67 100644 --- a/fe/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -20,7 +20,7 @@ import org.apache.doris.common.Config; import org.apache.doris.proto.PCancelPlanFragmentRequest; import org.apache.doris.proto.PCancelPlanFragmentResult; -import org.apache.doris.proto.PCancelReason; +import org.apache.doris.proto.PPlanFragmentCancelReason; import org.apache.doris.proto.PExecPlanFragmentResult; import org.apache.doris.proto.PFetchDataResult; import org.apache.doris.proto.PProxyRequest; @@ -112,7 +112,7 @@ public Future execPlanFragmentAsync( } public Future cancelPlanFragmentAsync( - TNetworkAddress address, TUniqueId finstId, PCancelReason cancelReason) throws RpcException { + TNetworkAddress address, TUniqueId finstId, PPlanFragmentCancelReason cancelReason) throws RpcException { final PCancelPlanFragmentRequest pRequest = new PCancelPlanFragmentRequest(); PUniqueId uid = new PUniqueId(); uid.hi = finstId.hi; diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 99383525de87e3..be36bc82189142 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -114,7 +114,7 @@ message PExecPlanFragmentResult { required PStatus status = 1; }; -enum PCancelReason { +enum PPlanFragmentCancelReason { // 0 is reserved LIMIT_REACH = 1; USER_CANCEL = 2; @@ -123,7 +123,7 @@ enum PCancelReason { message PCancelPlanFragmentRequest { required PUniqueId finst_id = 1; - optional PCancelReason cancel_reason = 2; + optional PPlanFragmentCancelReason cancel_reason = 2; }; message PCancelPlanFragmentResult {