From 6f2971a83276da1534a87e02353699d6e7029f5d Mon Sep 17 00:00:00 2001 From: morningman Date: Fri, 20 May 2022 17:07:47 +0800 Subject: [PATCH 1/8] pass --- be/src/runtime/fragment_mgr.cpp | 132 ++++--- be/src/runtime/fragment_mgr.h | 2 + be/src/runtime/query_fragments_ctx.h | 21 ++ be/src/service/internal_service.cpp | 44 ++- be/src/service/internal_service.h | 7 +- .../apache/doris/catalog/TabletStatMgr.java | 2 +- .../apache/doris/clone/BeLoadRebalancer.java | 10 +- .../doris/clone/ClusterLoadStatistic.java | 2 +- .../apache/doris/clone/DiskRebalancer.java | 10 +- .../org/apache/doris/clone/TabletChecker.java | 2 +- .../apache/doris/clone/TabletScheduler.java | 4 +- .../java/org/apache/doris/common/Config.java | 9 - .../routineload/RoutineLoadScheduler.java | 4 +- .../apache/doris/master/ReportHandler.java | 2 +- .../java/org/apache/doris/qe/Coordinator.java | 342 +++++++++++------- .../doris/qe/InsertStreamTxnExecutor.java | 7 +- .../org/apache/doris/qe/ResultReceiver.java | 4 +- .../doris/rpc/BackendServiceClient.java | 8 +- .../apache/doris/rpc/BackendServiceProxy.java | 49 ++- .../org/apache/doris/rpc/RpcException.java | 5 + .../load/sync/canal/CanalSyncDataTest.java | 16 +- gensrc/proto/internal_service.proto | 11 + gensrc/thrift/PaloInternalService.thrift | 11 +- 23 files changed, 466 insertions(+), 238 deletions(-) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index c553744b5d52b3..0edc04b6992ccd 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -140,6 +140,8 @@ class FragmentExecState { void set_pipe(std::shared_ptr pipe) { _pipe = pipe; } std::shared_ptr get_pipe() const { return _pipe; } + void set_need_wait_execution_trigger() { _need_wait_execution_trigger = true; } + private: void coordinator_callback(const Status& status, RuntimeProfile* profile, bool done); @@ -171,6 +173,9 @@ class FragmentExecState { std::shared_ptr _merge_controller_handler; // The pipe for data transfering, such as insert. std::shared_ptr _pipe; + + // If set the true, this plan fragment will be executed only after FE send execution start rpc. + bool _need_wait_execution_trigger = false; }; FragmentExecState::FragmentExecState(const TUniqueId& query_id, @@ -225,6 +230,11 @@ Status FragmentExecState::prepare(const TExecPlanFragmentParams& params) { } Status FragmentExecState::execute() { + if (_need_wait_execution_trigger) { + // if _need_wait_execution_trigger is true, which means this instance + // is prepared but need to wait for the signal to do the rest execution. + _fragments_ctx->wait_for_start(); + } int64_t duration_ns = 0; { SCOPED_RAW_TIMER(&duration_ns); @@ -527,6 +537,22 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) { } } +Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* request) { + std::lock_guard lock(_lock); + TUniqueId query_id; + query_id.__set_hi(request->query_id().hi()); + query_id.__set_lo(request->query_id().lo()); + auto search = _fragments_ctx_map.find(query_id); + if (search == _fragments_ctx_map.end()) { + return Status::InternalError( + strings::Substitute("Failed to get query fragments context. Query may be " + "timeout or be cancelled. host: ", + BackendOptions::get_localhost())); + } + search->second->set_ready_to_execute(); + return Status::OK(); +} + void FragmentMgr::set_pipe(const TUniqueId& fragment_instance_id, std::shared_ptr pipe) { { @@ -562,65 +588,63 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi } std::shared_ptr exec_state; - if (!params.__isset.is_simplified_param) { - // This is an old version params, all @Common components is set in TExecPlanFragmentParams. - exec_state.reset(new FragmentExecState(params.params.query_id, - params.params.fragment_instance_id, - params.backend_num, _exec_env, params.coord)); + std::shared_ptr fragments_ctx; + if (params.is_simplified_param) { + // Get common components from _fragments_ctx_map + std::lock_guard lock(_lock); + auto search = _fragments_ctx_map.find(params.params.query_id); + if (search == _fragments_ctx_map.end()) { + return Status::InternalError( + strings::Substitute("Failed to get query fragments context. Query may be " + "timeout or be cancelled. host: ", + BackendOptions::get_localhost())); + } + fragments_ctx = search->second; } else { - std::shared_ptr fragments_ctx; - if (params.is_simplified_param) { - // Get common components from _fragments_ctx_map - std::lock_guard lock(_lock); - auto search = _fragments_ctx_map.find(params.params.query_id); - if (search == _fragments_ctx_map.end()) { - return Status::InternalError( - strings::Substitute("Failed to get query fragments context. Query may be " - "timeout or be cancelled. host: ", - BackendOptions::get_localhost())); - } - fragments_ctx = search->second; - } else { - // This may be a first fragment request of the query. - // Create the query fragments context. - fragments_ctx.reset(new QueryFragmentsCtx(params.fragment_num_on_host, _exec_env)); - fragments_ctx->query_id = params.params.query_id; - RETURN_IF_ERROR(DescriptorTbl::create(&(fragments_ctx->obj_pool), params.desc_tbl, - &(fragments_ctx->desc_tbl))); - fragments_ctx->coord_addr = params.coord; - fragments_ctx->query_globals = params.query_globals; - - if (params.__isset.resource_info) { - fragments_ctx->user = params.resource_info.user; - fragments_ctx->group = params.resource_info.group; - fragments_ctx->set_rsc_info = true; - } + // This may be a first fragment request of the query. + // Create the query fragments context. + fragments_ctx.reset(new QueryFragmentsCtx(params.fragment_num_on_host, _exec_env)); + fragments_ctx->query_id = params.params.query_id; + RETURN_IF_ERROR(DescriptorTbl::create(&(fragments_ctx->obj_pool), params.desc_tbl, + &(fragments_ctx->desc_tbl))); + fragments_ctx->coord_addr = params.coord; + fragments_ctx->query_globals = params.query_globals; - if (params.__isset.query_options) { - fragments_ctx->timeout_second = params.query_options.query_timeout; - if (params.query_options.__isset.resource_limit) { - fragments_ctx->set_thread_token(params.query_options.resource_limit.cpu_limit); - } + if (params.__isset.resource_info) { + fragments_ctx->user = params.resource_info.user; + fragments_ctx->group = params.resource_info.group; + fragments_ctx->set_rsc_info = true; + } + + if (params.__isset.query_options) { + fragments_ctx->timeout_second = params.query_options.query_timeout; + if (params.query_options.__isset.resource_limit) { + fragments_ctx->set_thread_token(params.query_options.resource_limit.cpu_limit); } + } - { - // Find _fragments_ctx_map again, in case some other request has already - // create the query fragments context. - std::lock_guard lock(_lock); - auto search = _fragments_ctx_map.find(params.params.query_id); - if (search == _fragments_ctx_map.end()) { - _fragments_ctx_map.insert( - std::make_pair(fragments_ctx->query_id, fragments_ctx)); - } else { - // Already has a query fragmentscontext, use it - fragments_ctx = search->second; - } + { + // Find _fragments_ctx_map again, in case some other request has already + // create the query fragments context. + std::lock_guard lock(_lock); + auto search = _fragments_ctx_map.find(params.params.query_id); + if (search == _fragments_ctx_map.end()) { + _fragments_ctx_map.insert( + std::make_pair(fragments_ctx->query_id, fragments_ctx)); + } else { + // Already has a query fragmentscontext, use it + fragments_ctx = search->second; } } + } - exec_state.reset(new FragmentExecState(fragments_ctx->query_id, - params.params.fragment_instance_id, - params.backend_num, _exec_env, fragments_ctx)); + exec_state.reset(new FragmentExecState(fragments_ctx->query_id, + params.params.fragment_instance_id, + params.backend_num, _exec_env, fragments_ctx)); + if (params.__isset.need_wait_execution_trigger && params.need_wait_execution_trigger) { + // set need_wait_execution_trigger means this instance will not actually being executed + // until the execPlanFragmentStart RPC trigger to start it. + exec_state->set_need_wait_execution_trigger(); } std::shared_ptr handler; @@ -672,6 +696,7 @@ void FragmentMgr::cancel_worker() { LOG(INFO) << "FragmentMgr cancel worker start working."; do { std::vector to_cancel; + std::vector to_cancel_queries; DateTimeValue now = DateTimeValue::local_time(); { std::lock_guard lock(_lock); @@ -682,6 +707,9 @@ void FragmentMgr::cancel_worker() { } for (auto it = _fragments_ctx_map.begin(); it != _fragments_ctx_map.end();) { if (it->second->is_timeout(now)) { + // The execution logic of the instance needs to be notified. + // The execution logic of the instance will eventually cancel the execution plan. + it->second->set_ready_to_execute(); it = _fragments_ctx_map.erase(it); } else { ++it; diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 3e5185c94a9e42..8db7efcc7471b0 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -65,6 +65,8 @@ class FragmentMgr : public RestMonitorIface { // TODO(zc): report this is over Status exec_plan_fragment(const TExecPlanFragmentParams& params, FinishCallback cb); + Status start_query_execution(const PExecPlanFragmentStartRequest* request); + Status cancel(const TUniqueId& fragment_id) { return cancel(fragment_id, PPlanFragmentCancelReason::INTERNAL_ERROR); } diff --git a/be/src/runtime/query_fragments_ctx.h b/be/src/runtime/query_fragments_ctx.h index f8fa5fe10a118d..2a4f2f3e1aea2f 100644 --- a/be/src/runtime/query_fragments_ctx.h +++ b/be/src/runtime/query_fragments_ctx.h @@ -62,6 +62,21 @@ class QueryFragmentsCtx { ThreadPoolToken* get_token() { return _thread_token.get(); } + void set_ready_to_execute() { + { + std::lock_guard l(_start_lock); + _ready_to_execute = true; + } + _start_cond.notify_all(); + } + + void wait_for_start() { + std::unique_lock l(_start_lock); + while (!_ready_to_execute.load()) { + _start_cond.wait(l); + } + } + public: TUniqueId query_id; DescriptorTbl* desc_tbl; @@ -92,6 +107,12 @@ class QueryFragmentsCtx { // So that we can control the max thread that a query can be used to execute. // If this token is not set, the scanner will be executed in "_scan_thread_pool" in exec env. std::unique_ptr _thread_token; + + std::mutex _start_lock; + std::condition_variable _start_cond; + // Only valid when _need_wait_execution_trigger is set to true in FragmentExecState. + // And all fragments of this query will start execution when this is set to true. + std::atomic _ready_to_execute { false }; }; } // namespace doris diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index bfdd1f31ba3eb3..1193ecab96e687 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -111,13 +111,24 @@ void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* c brpc::ClosureGuard closure_guard(done); auto st = Status::OK(); bool compact = request->has_compact() ? request->compact() : false; - st = _exec_plan_fragment(request->request(), compact); + PFragmentRequestVersion version = request->has_version() ? request->version() : PFragmentRequestVersion::VERSION_1; + st = _exec_plan_fragment(request->request(), version, compact); if (!st.ok()) { LOG(WARNING) << "exec plan fragment failed, errmsg=" << st.get_error_msg(); } st.to_protobuf(response->mutable_status()); } +void PInternalServiceImpl::exec_plan_fragment_start(google::protobuf::RpcController* controller, + const PExecPlanFragmentStartRequest* request, + PExecPlanFragmentResult* result, + google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); + brpc::ClosureGuard closure_guard(done); + auto st = _exec_env->fragment_mgr()->start_query_execution(request); + st.to_protobuf(result->mutable_status()); +} + void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcController* cntl_base, const PTabletWriterAddBlockRequest* request, PTabletWriterAddBlockResult* response, @@ -201,14 +212,31 @@ void PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController* } } -Status PInternalServiceImpl::_exec_plan_fragment(const std::string& ser_request, bool compact) { - TExecPlanFragmentParams t_request; - { - const uint8_t* buf = (const uint8_t*)ser_request.data(); - uint32_t len = ser_request.size(); - RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, compact, &t_request)); +Status PInternalServiceImpl::_exec_plan_fragment(const std::string& ser_request, PFragmentRequestVersion version, bool compact) { + if (version == PFragmentRequestVersion::VERSION_1) { + // VERSION_1 should be removed in v1.2 + TExecPlanFragmentParams t_request; + { + const uint8_t* buf = (const uint8_t*)ser_request.data(); + uint32_t len = ser_request.size(); + RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, compact, &t_request)); + } + return _exec_env->fragment_mgr()->exec_plan_fragment(t_request); + } else if (version == PFragmentRequestVersion::VERSION_2) { + TExecPlanFragmentParamsList t_request; + { + const uint8_t* buf = (const uint8_t*)ser_request.data(); + uint32_t len = ser_request.size(); + RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, compact, &t_request)); + } + + for (const TExecPlanFragmentParams& params : t_request.paramsList) { + RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(params)); + } + return Status::OK(); + } else { + return Status::InternalError("invalid version"); } - return _exec_env->fragment_mgr()->exec_plan_fragment(t_request); } void PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* cntl_base, diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index 18a1667f56dc33..6f33978097c552 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -45,6 +45,11 @@ class PInternalServiceImpl : public PBackendService { PExecPlanFragmentResult* result, google::protobuf::Closure* done) override; + void exec_plan_fragment_start(google::protobuf::RpcController* controller, + const PExecPlanFragmentStartRequest* request, + PExecPlanFragmentResult* result, + google::protobuf::Closure* done) override; + void cancel_plan_fragment(google::protobuf::RpcController* controller, const PCancelPlanFragmentRequest* request, PCancelPlanFragmentResult* result, @@ -121,7 +126,7 @@ class PInternalServiceImpl : public PBackendService { PHandShakeResponse* response, google::protobuf::Closure* done) override; private: - Status _exec_plan_fragment(const std::string& s_request, bool compact); + Status _exec_plan_fragment(const std::string& s_request, PFragmentRequestVersion version, bool compact); Status _fold_constant_expr(const std::string& ser_request, PConstantExprResult* response); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java index f05abec8523714..d7a46cb564d50c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java @@ -71,7 +71,7 @@ protected void runAfterCatalogReady() { } } }); - LOG.info("finished to get tablet stat of all backends. cost: {} ms", + LOG.debug("finished to get tablet stat of all backends. cost: {} ms", (System.currentTimeMillis() - start)); // after update replica in all backends, update index row num diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java index 50f9e820076071..89a827cd871112 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java @@ -81,7 +81,7 @@ protected List selectAlternativeTabletsForCluster( clusterStat.getBackendStatisticByClass(lowBEs, midBEs, highBEs, medium); if (lowBEs.isEmpty() && highBEs.isEmpty()) { - LOG.info("cluster is balance: {} with medium: {}. skip", clusterName, medium); + LOG.debug("cluster is balance: {} with medium: {}. skip", clusterName, medium); return alternativeTablets; } @@ -185,9 +185,11 @@ protected List selectAlternativeTabletsForCluster( } } // end for high backends - LOG.info("select alternative tablets for cluster: {}, medium: {}, num: {}, detail: {}", - clusterName, medium, alternativeTablets.size(), - alternativeTablets.stream().mapToLong(TabletSchedCtx::getTabletId).toArray()); + if (!alternativeTablets.isEmpty()) { + LOG.info("select alternative tablets for cluster: {}, medium: {}, num: {}, detail: {}", + clusterName, medium, alternativeTablets.size(), + alternativeTablets.stream().mapToLong(TabletSchedCtx::getTabletId).toArray()); + } return alternativeTablets; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/ClusterLoadStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/clone/ClusterLoadStatistic.java index a6fc86d54c7f7c..34d6536c24952b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/ClusterLoadStatistic.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/ClusterLoadStatistic.java @@ -182,7 +182,7 @@ private void classifyBackendByLoad(TStorageMedium medium) { } } - LOG.info("classify backend by load. medium: {} avg load score: {}. low/mid/high: {}/{}/{}", + LOG.debug("classify backend by load. medium: {} avg load score: {}. low/mid/high: {}/{}/{}", medium, avgLoadScore, lowCounter, midCounter, highCounter); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java index 152f47a5e8e737..80bac26717fb25 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java @@ -136,7 +136,7 @@ protected List selectAlternativeTabletsForCluster( // first we should check if mid backends is available. // if all mid backends is not available, we should not start balance if (midBEs.stream().noneMatch(BackendLoadStatistic::isAvailable)) { - LOG.info("all mid load backends is dead: {} with medium: {}. skip", + LOG.debug("all mid load backends is dead: {} with medium: {}. skip", lowBEs.stream().mapToLong(BackendLoadStatistic::getBeId).toArray(), medium); return alternativeTablets; } @@ -231,9 +231,11 @@ protected List selectAlternativeTabletsForCluster( // remove balanced BEs from prio backends prioBackends.keySet().removeIf(id -> !unbalancedBEs.contains(id)); - LOG.info("select alternative tablets for cluster: {}, medium: {}, num: {}, detail: {}", - clusterName, medium, alternativeTablets.size(), - alternativeTablets.stream().mapToLong(TabletSchedCtx::getTabletId).toArray()); + if (!alternativeTablets.isEmpty()) { + LOG.info("select alternative tablets for cluster: {}, medium: {}, num: {}, detail: {}", + clusterName, medium, alternativeTablets.size(), + alternativeTablets.stream().mapToLong(TabletSchedCtx::getTabletId).toArray()); + } return alternativeTablets; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java index 121a8de8b5acea..b56d9bc9a516b7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java @@ -210,7 +210,7 @@ protected void runAfterCatalogReady() { removePriosIfNecessary(); stat.counterTabletCheckRound.incrementAndGet(); - LOG.info(stat.incrementalBrief()); + LOG.debug(stat.incrementalBrief()); } private static class CheckerCounter { diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index 6b84a432ed286e..a6c4501c6f118b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -338,7 +338,7 @@ private void updateClusterLoadStatistic() { infoService, invertedIndex); clusterLoadStatistic.init(); newStatisticMap.put(clusterName, tag, clusterLoadStatistic); - LOG.info("update cluster {} load statistic:\n{}", clusterName, clusterLoadStatistic.getBrief()); + LOG.debug("update cluster {} load statistic:\n{}", clusterName, clusterLoadStatistic.getBrief()); } } @@ -368,7 +368,7 @@ private synchronized void adjustPriorities() { pendingTablets.add(tabletCtx); } - LOG.info("adjust priority for all tablets. changed: {}, total: {}", changedNum, size); + LOG.debug("adjust priority for all tablets. changed: {}, total: {}", changedNum, size); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index cff7f2b1544455..e45f384f87b704 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1046,15 +1046,6 @@ public class Config extends ConfigBase { @ConfField(mutable = true) public static boolean enable_local_replica_selection_fallback = false; - - /** - * The timeout of executing async remote fragment. - * In normal case, the async remote fragment will be executed in a short time. If system are under high load - * condition,try to set this timeout longer. - */ - @ConfField(mutable = true) - public static long remote_fragment_exec_timeout_ms = 5000; // 5 sec - /** * The number of query retries. * A query may retry if we encounter RPC exception and no result has been sent to user. diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java index 34b6e2f50376c2..e6662427ba09b3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java @@ -70,7 +70,9 @@ private void process() throws UserException { LOG.warn("failed to get need schedule routine jobs", e); } - LOG.info("there are {} job need schedule", routineLoadJobList.size()); + if (!routineLoadJobList.isEmpty()) { + LOG.info("there are {} job need schedule", routineLoadJobList.size()); + } for (RoutineLoadJob routineLoadJob : routineLoadJobList) { RoutineLoadJob.JobState errorJobState = null; UserException userException = null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index cdca5251fafb9a..79ea3c2c23a879 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -334,7 +334,7 @@ private static void tabletReport(long backendId, Map backendTable } private static void taskReport(long backendId, Map> runningTasks) { - LOG.info("begin to handle task report from backend {}", backendId); + LOG.debug("begin to handle task report from backend {}", backendId); long start = System.currentTimeMillis(); if (LOG.isDebugEnabled()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 566d1ab28cacb6..9d63b8a98b67d3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -57,7 +57,10 @@ import org.apache.doris.planner.SetOperationNode; import org.apache.doris.planner.UnionNode; import org.apache.doris.proto.InternalService; +import org.apache.doris.proto.InternalService.PExecPlanFragmentResult; +import org.apache.doris.proto.InternalService.PExecPlanFragmentStartRequest; import org.apache.doris.proto.Types; +import org.apache.doris.proto.Types.PUniqueId; import org.apache.doris.qe.QueryStatisticsItem.FragmentInstanceInfo; import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.rpc.RpcException; @@ -70,6 +73,7 @@ import org.apache.doris.thrift.TErrorTabletInfo; import org.apache.doris.thrift.TEsScanRange; import org.apache.doris.thrift.TExecPlanFragmentParams; +import org.apache.doris.thrift.TExecPlanFragmentParamsList; import org.apache.doris.thrift.TLoadErrorHubInfo; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPaloScanRange; @@ -102,6 +106,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; +import org.jetbrains.annotations.NotNull; import java.text.DateFormat; import java.text.SimpleDateFormat; @@ -211,6 +216,10 @@ public class Coordinator { // parallel execute private final TUniqueId nextInstanceId; + // a timestamp represent the absolute timeout + // eg, System.currentTimeMillis() + query_timeout * 1000 + private long timeoutDeadline; + // Runtime filter merge instance address and ID public TNetworkAddress runtimeFilterMergeAddr; public TUniqueId runtimeFilterMergeInstanceId; @@ -222,6 +231,7 @@ public class Coordinator { // Runtime filter ID to the builder instance number public Map ridToBuilderNum = Maps.newHashMap(); + // Used for query/insert public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) { this.isBlockQuery = planner.isBlockQuery(); @@ -489,15 +499,14 @@ public void exec() throws Exception { PlanFragmentId topId = fragments.get(0).getFragmentId(); FragmentExecParams topParams = fragmentExecParamsMap.get(topId); DataSink topDataSink = topParams.fragment.getSink(); + this.timeoutDeadline = System.currentTimeMillis() + queryOptions.query_timeout * 1000; if (topDataSink instanceof ResultSink || topDataSink instanceof ResultFileSink) { TNetworkAddress execBeAddr = topParams.instanceExecParams.get(0).host; - receiver = new ResultReceiver( - topParams.instanceExecParams.get(0).instanceId, - addressToBackendID.get(execBeAddr), - toBrpcHost(execBeAddr), - queryOptions.query_timeout * 1000); + receiver = new ResultReceiver(topParams.instanceExecParams.get(0).instanceId, + addressToBackendID.get(execBeAddr), toBrpcHost(execBeAddr), this.timeoutDeadline); if (LOG.isDebugEnabled()) { - LOG.debug("dispatch query job: {} to {}", DebugUtil.printId(queryId), topParams.instanceExecParams.get(0).host); + LOG.debug("dispatch query job: {} to {}", DebugUtil.printId(queryId), + topParams.instanceExecParams.get(0).host); } if (topDataSink instanceof ResultFileSink @@ -530,6 +539,31 @@ public void exec() throws Exception { sendFragment(); } + /** + * The logic for sending query plan fragments is as follows: + * First, plan fragments are dependent. According to the order in "fragments" list, + * it must be ensured that on the BE side, the next fragment instance can be executed + * only after the previous fragment instance is ready, + *

+ * In the previous logic, we will send fragment instances in sequence through RPC, + * and will wait for the RPC of the previous fragment instance to return successfully + * before sending the next one. But for some complex queries, this may lead to too many RPCs. + *

+ * The optimized logic is as follows: + * 1. If the number of fragment instance is <= 2, the original logic is still used + * to complete the sending of fragments through at most 2 RPCs. + * 2. If the number of fragment instance is >= 3, first group all fragments by BE, + * and send all fragment instances to the corresponding BE node through the FIRST rpc, + * but these fragment instances will only perform the preparation phase but will not be actually executed. + * After that, the execution logic of all fragment instances is started through the SECOND RPC. + *

+ * After optimization, a query on a BE node will only send two RPCs at most. + * Thereby reducing the "send fragment timeout" error caused by too many RPCs and BE unable to process in time. + * + * @throws TException + * @throws RpcException + * @throws UserException + */ private void sendFragment() throws TException, RpcException, UserException { lock(); try { @@ -539,23 +573,21 @@ private void sendFragment() throws TException, RpcException, UserException { hostCounter.add(fi.host); } } - // Execute all instances from up to bottom - // NOTICE: We must ensure that these fragments are executed sequentially, - // otherwise the data dependency between the fragments will be destroyed. + int backendIdx = 0; int profileFragmentId = 0; long memoryLimit = queryOptions.getMemLimit(); + Map beToExecStates = Maps.newHashMap(); + boolean needTrigger = fragments.size() > 2; for (PlanFragment fragment : fragments) { FragmentExecParams params = fragmentExecParamsMap.get(fragment.getFragmentId()); - // set up exec states + // 1. set up exec states int instanceNum = params.instanceExecParams.size(); Preconditions.checkState(instanceNum > 0); List tParams = params.toThrift(backendIdx); - List>> futures = - Lists.newArrayList(); - // update memory limit for colocate join + // 2. update memory limit for colocate join if (colocateFragmentIds.contains(fragment.getFragmentId().asInt())) { int rate = Math.min(Config.query_colocate_join_memory_limit_penalty_factor, instanceNum); long newMemory = memoryLimit / rate; @@ -574,15 +606,16 @@ private void sendFragment() throws TException, RpcException, UserException { needCheckBackendState = true; } + // 3. group BackendExecState by BE. So that we can use one RPC to send all fragment instances of a BE. int instanceId = 0; for (TExecPlanFragmentParams tParam : tParams) { - BackendExecState execState = new BackendExecState(fragment.getFragmentId(), instanceId++, - profileFragmentId, tParam, this.addressToBackendID); - execState.unsetFields(); + BackendExecState execState = + new BackendExecState(fragment.getFragmentId(), instanceId++, profileFragmentId, tParam, this.addressToBackendID); // Each tParam will set the total number of Fragments that need to be executed on the same BE, // and the BE will determine whether all Fragments have been executed based on this information. tParam.setFragmentNumOnHost(hostCounter.count(execState.address)); tParam.setBackendId(execState.backend.getId()); + tParam.setNeedWaitExecutionTrigger(needTrigger); backendExecStates.add(execState); if (needCheckBackendState) { @@ -592,69 +625,32 @@ private void sendFragment() throws TException, RpcException, UserException { fragment.getFragmentId().asInt(), jobId); } } - futures.add(Pair.create(execState, execState.execRemoteFragmentAsync())); - backendIdx++; - } - - for (Pair> pair : futures) { - TStatusCode code; - String errMsg = null; - Exception exception = null; - try { - InternalService.PExecPlanFragmentResult result = pair.second.get(Config.remote_fragment_exec_timeout_ms, - TimeUnit.MILLISECONDS); - code = TStatusCode.findByValue(result.getStatus().getStatusCode()); - if (!result.getStatus().getErrorMsgsList().isEmpty()) { - errMsg = result.getStatus().getErrorMsgsList().get(0); - } - } catch (ExecutionException e) { - LOG.warn("catch a execute exception", e); - exception = e; - code = TStatusCode.THRIFT_RPC_ERROR; - BackendServiceProxy.getInstance().removeProxy(pair.first.brpcAddress); - } catch (InterruptedException e) { - LOG.warn("catch a interrupt exception", e); - exception = e; - code = TStatusCode.INTERNAL_ERROR; - } catch (TimeoutException e) { - LOG.warn("catch a timeout exception", e); - exception = e; - code = TStatusCode.TIMEOUT; - BackendServiceProxy.getInstance().removeProxy(pair.first.brpcAddress); + BackendExecStates states = beToExecStates.get(execState.backend.getId()); + if (states == null) { + states = new BackendExecStates(execState.backend.getId(), execState.brpcAddress); + beToExecStates.putIfAbsent(execState.backend.getId(), states); } + states.addState(execState); + } + profileFragmentId += 1; + } // end for fragments - if (code != TStatusCode.OK) { - if (exception != null) { - errMsg = exception.getMessage(); - } - - if (errMsg == null) { - errMsg = "exec rpc error. backend id: " + pair.first.backend.getId(); - } - queryStatus.setStatus(errMsg); - LOG.warn("exec plan fragment failed, errmsg={}, code: {}, fragmentId={}, backend={}:{}", - errMsg, code, fragment.getFragmentId(), - pair.first.address.hostname, pair.first.address.port); - cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR); - switch (code) { - case TIMEOUT: - throw new RpcException(pair.first.backend.getHost(), "send fragment timeout. backend id: " - + pair.first.backend.getId() + " fragment: " - + DebugUtil.printId(pair.first.rpcParams.params.fragment_instance_id)); - case THRIFT_RPC_ERROR: - SimpleScheduler.addToBlacklist(pair.first.backend.getId(), errMsg); - throw new RpcException(pair.first.backend.getHost(), "rpc failed"); - default: - throw new UserException(errMsg); - } - } + // 4. send and wait fragments rpc + List>> futures = Lists.newArrayList(); + for (BackendExecStates states : beToExecStates.values()) { + states.unsetFields(); + futures.add(Pair.create(states, states.execRemoteFragmentsAsync())); + } + waitRpc(futures, this.timeoutDeadline - System.currentTimeMillis(), "send fragments"); - // succeed to send the plan fragment, update the "alreadySentBackendIds" - alreadySentBackendIds.add(pair.first.backend.getId()); + if (needTrigger) { + // 5. send and wait execution start rpc + futures.clear(); + for (BackendExecStates states : beToExecStates.values()) { + futures.add(Pair.create(states, states.execPlanFragmentStartAsync())); } - - profileFragmentId += 1; + waitRpc(futures, this.timeoutDeadline - System.currentTimeMillis(), "send execution start"); } attachInstanceProfileToFragmentProfile(); @@ -663,6 +659,63 @@ private void sendFragment() throws TException, RpcException, UserException { } } + private void waitRpc(List>> futures, long timeoutMs, + String operation) throws RpcException, UserException { + if (timeoutMs <= 0) { + throw new UserException("timeout before waiting for " + operation + " RPC. Elapse(sec): " + ( + (System.currentTimeMillis() - timeoutDeadline) / 1000 + queryOptions.query_timeout)); + } + + for (Pair> pair : futures) { + TStatusCode code; + String errMsg = null; + Exception exception = null; + try { + PExecPlanFragmentResult result = pair.second.get(timeoutMs, TimeUnit.MILLISECONDS); + code = TStatusCode.findByValue(result.getStatus().getStatusCode()); + if (code != TStatusCode.OK) { + if (!result.getStatus().getErrorMsgsList().isEmpty()) { + errMsg = result.getStatus().getErrorMsgsList().get(0); + } else { + errMsg = operation + " failed. backend id: " + pair.first.beId; + } + } + } catch (ExecutionException e) { + exception = e; + code = TStatusCode.THRIFT_RPC_ERROR; + BackendServiceProxy.getInstance().removeProxy(pair.first.brpcAddr); + } catch (InterruptedException e) { + exception = e; + code = TStatusCode.INTERNAL_ERROR; + } catch (TimeoutException e) { + exception = e; + errMsg = "timeout when waiting for " + operation + " RPC. Elapse(sec): " + + ((System.currentTimeMillis() - timeoutDeadline) / 1000 + queryOptions.query_timeout); + code = TStatusCode.TIMEOUT; + } + + if (code != TStatusCode.OK) { + if (exception != null && errMsg == null) { + errMsg = operation + " failed. " + exception.getMessage(); + } + queryStatus.setStatus(errMsg); + cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR); + switch (code) { + case TIMEOUT: + throw new RpcException(pair.first.brpcAddr.hostname, errMsg, exception); + case THRIFT_RPC_ERROR: + SimpleScheduler.addToBlacklist(pair.first.beId, errMsg); + throw new RpcException(pair.first.brpcAddr.hostname, errMsg, exception); + default: + throw new UserException(errMsg, exception); + } + } + + // succeed to send the plan fragment, update the "alreadySentBackendIds" + alreadySentBackendIds.add(pair.first.beId); + } + } + public List getExportFiles() { return exportFiles; } @@ -1896,7 +1949,6 @@ private void computeInstanceParam(PlanFragmentId fragmentId, int parallelExecIns public class BackendExecState { TExecPlanFragmentParams rpcParams; PlanFragmentId fragmentId; - int instanceId; boolean initiated; volatile boolean done; boolean hasCanceled; @@ -1906,18 +1958,20 @@ public class BackendExecState { TNetworkAddress address; Backend backend; long lastMissingHeartbeatTime = -1; + TUniqueId instanceId; public BackendExecState(PlanFragmentId fragmentId, int instanceId, int profileFragmentId, TExecPlanFragmentParams rpcParams, Map addressToBackendID) { this.profileFragmentId = profileFragmentId; this.fragmentId = fragmentId; - this.instanceId = instanceId; this.rpcParams = rpcParams; this.initiated = false; this.done = false; FInstanceExecParam fi = fragmentExecParamsMap.get(fragmentId).instanceExecParams.get(instanceId); + this.instanceId = fi.instanceId; this.address = fi.host; this.backend = idToBackend.get(addressToBackendID.get(address)); + this.brpcAddress = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); String name = "Instance " + DebugUtil.printId(fi.instanceId) + " (host=" + address + ")"; this.profile = new RuntimeProfile(name); @@ -2016,62 +2070,107 @@ public boolean isBackendStateHealthy() { return true; } - public Future execRemoteFragmentAsync() throws TException, RpcException { - try { - brpcAddress = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); - } catch (Exception e) { - throw new TException(e.getMessage()); + public FragmentInstanceInfo buildFragmentInstanceInfo() { + return new QueryStatisticsItem.FragmentInstanceInfo.Builder().instanceId(fragmentInstanceId()) + .fragmentId(String.valueOf(fragmentId)).address(this.address).build(); + } + + private TUniqueId fragmentInstanceId() { + return this.rpcParams.params.getFragmentInstanceId(); + } + } + + /** + * A set of BackendExecState for same Backend + */ + public class BackendExecStates { + long beId; + TNetworkAddress brpcAddr; + List states = Lists.newArrayList(); + + public BackendExecStates(long beId, TNetworkAddress brpcAddr) { + this.beId = beId; + this.brpcAddr = brpcAddr; + } + + public void addState(BackendExecState state) { + this.states.add(state); + } + + /** + * The BackendExecState in states are all send to the same BE. + * So only the first BackendExecState need to carry some common fields, such as DescriptorTbl, + * the other BackendExecState does not need those fields. Unset them to reduce size. + */ + public void unsetFields() { + boolean first = true; + for (BackendExecState state : states) { + if (first) { + first = false; + continue; + } + state.unsetFields(); } - this.initiated = true; + } + + public Future execRemoteFragmentsAsync() throws TException { try { - return BackendServiceProxy.getInstance().execPlanFragmentAsync(brpcAddress, rpcParams); + TExecPlanFragmentParamsList paramsList = new TExecPlanFragmentParamsList(); + for (BackendExecState state : states) { + paramsList.addToParamsList(state.rpcParams); + } + return BackendServiceProxy.getInstance().execPlanFragmentsAsync(brpcAddr, paramsList); } catch (RpcException e) { // DO NOT throw exception here, return a complete future with error code, // so that the following logic will cancel the fragment. - return new Future() { - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return false; - } + return futureWithException(e); + } + } - @Override - public boolean isCancelled() { - return false; - } + public Future execPlanFragmentStartAsync() throws TException { + try { + PExecPlanFragmentStartRequest.Builder builder = PExecPlanFragmentStartRequest.newBuilder(); + PUniqueId qid = PUniqueId.newBuilder().setHi(queryId.hi).setLo(queryId.lo).build(); + builder.setQueryId(qid); + return BackendServiceProxy.getInstance().execPlanFragmentStartAsync(brpcAddr, builder.build()); + } catch (RpcException e) { + // DO NOT throw exception here, return a complete future with error code, + // so that the following logic will cancel the fragment. + return futureWithException(e); + } + } - @Override - public boolean isDone() { - return true; - } + @NotNull + private Future futureWithException(RpcException e) { + return new Future() { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } - @Override - public InternalService.PExecPlanFragmentResult get() { - InternalService.PExecPlanFragmentResult result = InternalService.PExecPlanFragmentResult - .newBuilder() - .setStatus(Types.PStatus.newBuilder() - .addErrorMsgs(e.getMessage()) - .setStatusCode(TStatusCode.THRIFT_RPC_ERROR.getValue()) - .build()) - .build(); - return result; - } + @Override + public boolean isCancelled() { + return false; + } - @Override - public InternalService.PExecPlanFragmentResult get(long timeout, TimeUnit unit) { - return get(); - } - }; - } - } + @Override + public boolean isDone() { + return true; + } - public FragmentInstanceInfo buildFragmentInstanceInfo() { - return new QueryStatisticsItem.FragmentInstanceInfo.Builder() - .instanceId(fragmentInstanceId()).fragmentId(String.valueOf(fragmentId)).address(this.address) - .build(); - } + @Override + public PExecPlanFragmentResult get() { + PExecPlanFragmentResult result = PExecPlanFragmentResult.newBuilder().setStatus( + Types.PStatus.newBuilder().addErrorMsgs(e.getMessage()) + .setStatusCode(TStatusCode.THRIFT_RPC_ERROR.getValue()).build()).build(); + return result; + } - private TUniqueId fragmentInstanceId() { - return this.rpcParams.params.getFragmentInstanceId(); + @Override + public PExecPlanFragmentResult get(long timeout, TimeUnit unit) { + return get(); + } + }; } } @@ -2287,3 +2386,4 @@ public FRuntimeFilterTargetParam(TUniqueId id, TNetworkAddress host) { } } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java index ea0cb6cfda0330..5160769de1722c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java @@ -30,6 +30,7 @@ import org.apache.doris.task.StreamLoadTask; import org.apache.doris.thrift.TBrokerRangeDesc; import org.apache.doris.thrift.TExecPlanFragmentParams; +import org.apache.doris.thrift.TExecPlanFragmentParamsList; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TScanRangeParams; @@ -85,8 +86,10 @@ public void beginTransaction(TStreamLoadPutRequest request) throws UserException txnEntry.setBackend(backend); TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); try { - Future future = BackendServiceProxy.getInstance().execPlanFragmentAsync( - address, tRequest); + TExecPlanFragmentParamsList paramsList = new TExecPlanFragmentParamsList(); + paramsList.addToParamsList(tRequest); + Future future = + BackendServiceProxy.getInstance().execPlanFragmentsAsync(address, paramsList); InternalService.PExecPlanFragmentResult result = future.get(5, TimeUnit.SECONDS); TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); if (code != TStatusCode.OK) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java index 5168475a7f1b9e..04b4850cf588d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java @@ -48,11 +48,11 @@ public class ResultReceiver { private Long backendId; private Thread currentThread; - public ResultReceiver(TUniqueId tid, Long backendId, TNetworkAddress address, int timeoutMs) { + public ResultReceiver(TUniqueId tid, Long backendId, TNetworkAddress address, long timeoutTs) { this.finstId = Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build(); this.backendId = backendId; this.address = address; - this.timeoutTs = System.currentTimeMillis() + timeoutMs; + this.timeoutTs = timeoutTs; } public RowBatch getNext(Status status) throws TException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java index eae4c19531b451..3742f79e6c8442 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java @@ -43,8 +43,7 @@ public BackendServiceClient(TNetworkAddress address) { this.address = address; channel = NettyChannelBuilder.forAddress(address.getHostname(), address.getPort()) .flowControlWindow(Config.grpc_max_message_size_bytes) - .maxInboundMessageSize(Config.grpc_max_message_size_bytes) - .enableRetry().maxRetryAttempts(MAX_RETRY_NUM) + .maxInboundMessageSize(Config.grpc_max_message_size_bytes).enableRetry().maxRetryAttempts(MAX_RETRY_NUM) .usePlaintext().build(); stub = PBackendServiceGrpc.newFutureStub(channel); blockingStub = PBackendServiceGrpc.newBlockingStub(channel); @@ -55,6 +54,11 @@ public Future execPlanFragmentAsync( return stub.execPlanFragment(request); } + public Future execPlanFragmentStartAsync( + InternalService.PExecPlanFragmentStartRequest request) { + return stub.execPlanFragmentStart(request); + } + public Future cancelPlanFragmentAsync( InternalService.PCancelPlanFragmentRequest request) { return stub.cancelPlanFragment(request); diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index f1ad320dcefe43..e37a3afa58b165 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -19,8 +19,9 @@ import org.apache.doris.common.Config; import org.apache.doris.proto.InternalService; +import org.apache.doris.proto.InternalService.PExecPlanFragmentStartRequest; import org.apache.doris.proto.Types; -import org.apache.doris.thrift.TExecPlanFragmentParams; +import org.apache.doris.thrift.TExecPlanFragmentParamsList; import org.apache.doris.thrift.TFoldConstantParams; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TUniqueId; @@ -92,43 +93,55 @@ private BackendServiceClient getProxy(TNetworkAddress address) { } } - public Future execPlanFragmentAsync( - TNetworkAddress address, TExecPlanFragmentParams tRequest) + public Future execPlanFragmentsAsync( + TNetworkAddress address, TExecPlanFragmentParamsList paramsList) throws TException, RpcException { - InternalService.PExecPlanFragmentRequest.Builder builder = InternalService.PExecPlanFragmentRequest.newBuilder(); + InternalService.PExecPlanFragmentRequest.Builder builder = + InternalService.PExecPlanFragmentRequest.newBuilder(); if (Config.use_compact_thrift_rpc) { - builder.setRequest(ByteString.copyFrom(new TSerializer(new TCompactProtocol.Factory()).serialize(tRequest))); + builder.setRequest( + ByteString.copyFrom(new TSerializer(new TCompactProtocol.Factory()).serialize(paramsList))); builder.setCompact(true); } else { - builder.setRequest(ByteString.copyFrom(new TSerializer().serialize(tRequest))).build(); + builder.setRequest(ByteString.copyFrom(new TSerializer().serialize(paramsList))).build(); builder.setCompact(false); } + // VERSION 2 means we send TExecPlanFragmentParamsList, not single TExecPlanFragmentParams + builder.setVersion(InternalService.PFragmentRequestVersion.VERSION_2); final InternalService.PExecPlanFragmentRequest pRequest = builder.build(); try { final BackendServiceClient client = getProxy(address); return client.execPlanFragmentAsync(pRequest); } catch (Throwable e) { - LOG.warn("Execute plan fragment catch a exception, address={}:{}", - address.getHostname(), address.getPort(), e); + LOG.warn("Execute plan fragment catch a exception, address={}:{}", address.getHostname(), address.getPort(), + e); throw new RpcException(address.hostname, e.getMessage()); } } - public Future cancelPlanFragmentAsync( - TNetworkAddress address, TUniqueId finstId, Types.PPlanFragmentCancelReason cancelReason) - throws RpcException { - final InternalService.PCancelPlanFragmentRequest pRequest = InternalService.PCancelPlanFragmentRequest - .newBuilder() - .setFinstId( - Types.PUniqueId.newBuilder().setHi(finstId.hi).setLo(finstId.lo).build()) - .setCancelReason(cancelReason).build(); + public Future execPlanFragmentStartAsync(TNetworkAddress address, + PExecPlanFragmentStartRequest request) throws TException, RpcException { + try { + final BackendServiceClient client = getProxy(address); + return client.execPlanFragmentStartAsync(request); + } catch (Exception e) { + throw new RpcException(address.hostname, e.getMessage(), e); + } + } + + public Future cancelPlanFragmentAsync(TNetworkAddress address, + TUniqueId finstId, Types.PPlanFragmentCancelReason cancelReason) throws RpcException { + final InternalService.PCancelPlanFragmentRequest pRequest = + InternalService.PCancelPlanFragmentRequest.newBuilder() + .setFinstId(Types.PUniqueId.newBuilder().setHi(finstId.hi).setLo(finstId.lo).build()) + .setCancelReason(cancelReason).build(); try { final BackendServiceClient client = getProxy(address); return client.cancelPlanFragmentAsync(pRequest); } catch (Throwable e) { - LOG.warn("Cancel plan fragment catch a exception, address={}:{}", - address.getHostname(), address.getPort(), e); + LOG.warn("Cancel plan fragment catch a exception, address={}:{}", address.getHostname(), address.getPort(), + e); throw new RpcException(address.hostname, e.getMessage()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/RpcException.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/RpcException.java index c5d30a3a834554..7e3e1cded7e20a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/RpcException.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/RpcException.java @@ -28,6 +28,11 @@ public RpcException(String host, String message) { this.host = host; } + public RpcException(String host, String message, Exception e) { + super(message, e); + this.host = host; + } + @Override public String getMessage() { if (Strings.isNullOrEmpty(host)) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java index 727644a6a315bf..509507908780a9 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java @@ -31,6 +31,7 @@ import org.apache.doris.system.SystemInfoService; import org.apache.doris.task.StreamLoadTask; import org.apache.doris.thrift.TExecPlanFragmentParams; +import org.apache.doris.thrift.TExecPlanFragmentParamsList; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPlanFragmentExecParams; import org.apache.doris.thrift.TStorageMedium; @@ -46,10 +47,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import mockit.Expectations; -import mockit.Mock; -import mockit.MockUp; -import mockit.Mocked; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.junit.Assert; @@ -62,6 +60,10 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; +import mockit.Expectations; +import mockit.Mock; +import mockit.MockUp; +import mockit.Mocked; public class CanalSyncDataTest { private static final Logger LOG = LogManager.getLogger(CanalSyncDataTest.class); @@ -252,7 +254,7 @@ public void testNormal(@Mocked GlobalTransactionMgr transactionMgr, minTimes = 0; result = 105L; - backendServiceProxy.execPlanFragmentAsync((TNetworkAddress) any, (TExecPlanFragmentParams) any); + backendServiceProxy.execPlanFragmentsAsync((TNetworkAddress) any, (TExecPlanFragmentParamsList) any); minTimes = 0; result = execFuture; @@ -323,7 +325,7 @@ public void testExecFragmentFail(@Mocked GlobalTransactionMgr transactionMgr, minTimes = 0; result = 105L; - backendServiceProxy.execPlanFragmentAsync((TNetworkAddress) any, (TExecPlanFragmentParams) any); + backendServiceProxy.execPlanFragmentsAsync((TNetworkAddress) any, (TExecPlanFragmentParamsList) any); minTimes = 0; result = execFuture; @@ -389,7 +391,7 @@ public void testCommitTxnFail(@Mocked GlobalTransactionMgr transactionMgr, minTimes = 0; result = 105L; - backendServiceProxy.execPlanFragmentAsync((TNetworkAddress) any, (TExecPlanFragmentParams) any); + backendServiceProxy.execPlanFragmentsAsync((TNetworkAddress) any, (TExecPlanFragmentParamsList) any); minTimes = 0; result = execFuture; diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index ae2d1874124df6..bc3014d6503a29 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -163,11 +163,21 @@ message PTabletWriterCancelRequest { message PTabletWriterCancelResult { }; +enum PFragmentRequestVersion { + VERSION_1 = 1; // only one TExecPlanFragmentParams in request + VERSION_2 = 2; // multi TExecPlanFragmentParams in request +}; + message PExecPlanFragmentRequest { optional bytes request = 1; optional bool compact = 2; + optional PFragmentRequestVersion version = 3 [default = VERSION_2]; }; +message PExecPlanFragmentStartRequest { + optional PUniqueId query_id = 1; +} + message PExecPlanFragmentResult { required PStatus status = 1; }; @@ -459,6 +469,7 @@ message PResetRPCChannelResponse { service PBackendService { rpc transmit_data(PTransmitDataParams) returns (PTransmitDataResult); rpc exec_plan_fragment(PExecPlanFragmentRequest) returns (PExecPlanFragmentResult); + rpc exec_plan_fragment_start(PExecPlanFragmentStartRequest) returns (PExecPlanFragmentResult); rpc cancel_plan_fragment(PCancelPlanFragmentRequest) returns (PCancelPlanFragmentResult); rpc fetch_data(PFetchDataRequest) returns (PFetchDataResult); rpc tablet_writer_open(PTabletWriterOpenRequest) returns (PTabletWriterOpenResult); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 4787513baff845..d79d9efbe2d358 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -340,10 +340,19 @@ struct TExecPlanFragmentParams { // If true, all @Common components is unset and should be got from BE's cache // If this field is unset or it set to false, all @Common components is set. - 16: optional bool is_simplified_param + 16: optional bool is_simplified_param = false; 17: optional TTxnParams txn_conf 18: optional i64 backend_id 19: optional TGlobalDict global_dict // scan node could use the global dict to encode the string value to an integer + + // If it is true, after this fragment is prepared on the BE side, + // it will wait for the FE to send the "start execution" command before it is actually executed. + // Otherwise, the fragment will start executing directly on the BE side. + 20: optional bool need_wait_execution_trigger; +} + +struct TExecPlanFragmentParamsList { + 1: optional list paramsList; } struct TExecPlanFragmentResult { From 969000fec317577997a1296a2bc27df61df6037c Mon Sep 17 00:00:00 2001 From: morningman Date: Sun, 29 May 2022 13:58:12 +0800 Subject: [PATCH 2/8] code format --- be/src/runtime/fragment_mgr.cpp | 11 +++++------ be/src/runtime/query_fragments_ctx.h | 2 +- be/src/service/internal_service.cpp | 14 ++++++++------ be/src/service/internal_service.h | 9 +++++---- 4 files changed, 19 insertions(+), 17 deletions(-) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 0edc04b6992ccd..686ece037e54b5 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -546,8 +546,8 @@ Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* r if (search == _fragments_ctx_map.end()) { return Status::InternalError( strings::Substitute("Failed to get query fragments context. Query may be " - "timeout or be cancelled. host: ", - BackendOptions::get_localhost())); + "timeout or be cancelled. host: ", + BackendOptions::get_localhost())); } search->second->set_ready_to_execute(); return Status::OK(); @@ -629,8 +629,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi std::lock_guard lock(_lock); auto search = _fragments_ctx_map.find(params.params.query_id); if (search == _fragments_ctx_map.end()) { - _fragments_ctx_map.insert( - std::make_pair(fragments_ctx->query_id, fragments_ctx)); + _fragments_ctx_map.insert(std::make_pair(fragments_ctx->query_id, fragments_ctx)); } else { // Already has a query fragmentscontext, use it fragments_ctx = search->second; @@ -639,8 +638,8 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi } exec_state.reset(new FragmentExecState(fragments_ctx->query_id, - params.params.fragment_instance_id, - params.backend_num, _exec_env, fragments_ctx)); + params.params.fragment_instance_id, params.backend_num, + _exec_env, fragments_ctx)); if (params.__isset.need_wait_execution_trigger && params.need_wait_execution_trigger) { // set need_wait_execution_trigger means this instance will not actually being executed // until the execPlanFragmentStart RPC trigger to start it. diff --git a/be/src/runtime/query_fragments_ctx.h b/be/src/runtime/query_fragments_ctx.h index 2a4f2f3e1aea2f..0c91f1b305af06 100644 --- a/be/src/runtime/query_fragments_ctx.h +++ b/be/src/runtime/query_fragments_ctx.h @@ -112,7 +112,7 @@ class QueryFragmentsCtx { std::condition_variable _start_cond; // Only valid when _need_wait_execution_trigger is set to true in FragmentExecState. // And all fragments of this query will start execution when this is set to true. - std::atomic _ready_to_execute { false }; + std::atomic _ready_to_execute {false}; }; } // namespace doris diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 1193ecab96e687..8ad2ad1056775f 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -111,7 +111,8 @@ void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* c brpc::ClosureGuard closure_guard(done); auto st = Status::OK(); bool compact = request->has_compact() ? request->compact() : false; - PFragmentRequestVersion version = request->has_version() ? request->version() : PFragmentRequestVersion::VERSION_1; + PFragmentRequestVersion version = + request->has_version() ? request->version() : PFragmentRequestVersion::VERSION_1; st = _exec_plan_fragment(request->request(), version, compact); if (!st.ok()) { LOG(WARNING) << "exec plan fragment failed, errmsg=" << st.get_error_msg(); @@ -120,9 +121,9 @@ void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* c } void PInternalServiceImpl::exec_plan_fragment_start(google::protobuf::RpcController* controller, - const PExecPlanFragmentStartRequest* request, - PExecPlanFragmentResult* result, - google::protobuf::Closure* done) { + const PExecPlanFragmentStartRequest* request, + PExecPlanFragmentResult* result, + google::protobuf::Closure* done) { SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); auto st = _exec_env->fragment_mgr()->start_query_execution(request); @@ -212,7 +213,8 @@ void PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController* } } -Status PInternalServiceImpl::_exec_plan_fragment(const std::string& ser_request, PFragmentRequestVersion version, bool compact) { +Status PInternalServiceImpl::_exec_plan_fragment(const std::string& ser_request, + PFragmentRequestVersion version, bool compact) { if (version == PFragmentRequestVersion::VERSION_1) { // VERSION_1 should be removed in v1.2 TExecPlanFragmentParams t_request; @@ -232,7 +234,7 @@ Status PInternalServiceImpl::_exec_plan_fragment(const std::string& ser_request, for (const TExecPlanFragmentParams& params : t_request.paramsList) { RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(params)); - } + } return Status::OK(); } else { return Status::InternalError("invalid version"); diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index 6f33978097c552..c2560633d474ce 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -46,9 +46,9 @@ class PInternalServiceImpl : public PBackendService { google::protobuf::Closure* done) override; void exec_plan_fragment_start(google::protobuf::RpcController* controller, - const PExecPlanFragmentStartRequest* request, - PExecPlanFragmentResult* result, - google::protobuf::Closure* done) override; + const PExecPlanFragmentStartRequest* request, + PExecPlanFragmentResult* result, + google::protobuf::Closure* done) override; void cancel_plan_fragment(google::protobuf::RpcController* controller, const PCancelPlanFragmentRequest* request, @@ -126,7 +126,8 @@ class PInternalServiceImpl : public PBackendService { PHandShakeResponse* response, google::protobuf::Closure* done) override; private: - Status _exec_plan_fragment(const std::string& s_request, PFragmentRequestVersion version, bool compact); + Status _exec_plan_fragment(const std::string& s_request, PFragmentRequestVersion version, + bool compact); Status _fold_constant_expr(const std::string& ser_request, PConstantExprResult* response); From 9f4a1e0a59fa81b24a3901aed14ef96de0df6032 Mon Sep 17 00:00:00 2001 From: morningman Date: Sun, 29 May 2022 14:02:49 +0800 Subject: [PATCH 3/8] add log --- .../apache/doris/load/sync/canal/CanalSyncDataTest.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java index 509507908780a9..f3ad8c2d329997 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java @@ -47,7 +47,10 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; - +import mockit.Expectations; +import mockit.Mock; +import mockit.MockUp; +import mockit.Mocked; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.junit.Assert; @@ -60,10 +63,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; -import mockit.Expectations; -import mockit.Mock; -import mockit.MockUp; -import mockit.Mocked; public class CanalSyncDataTest { private static final Logger LOG = LogManager.getLogger(CanalSyncDataTest.class); From 593560d2220be94e5fdd21625306e3a5b945c1b7 Mon Sep 17 00:00:00 2001 From: morningman Date: Sun, 29 May 2022 14:04:51 +0800 Subject: [PATCH 4/8] add log --- gensrc/thrift/PaloInternalService.thrift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index d79d9efbe2d358..aa1576b95a3419 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -348,7 +348,7 @@ struct TExecPlanFragmentParams { // If it is true, after this fragment is prepared on the BE side, // it will wait for the FE to send the "start execution" command before it is actually executed. // Otherwise, the fragment will start executing directly on the BE side. - 20: optional bool need_wait_execution_trigger; + 20: optional bool need_wait_execution_trigger = false; } struct TExecPlanFragmentParamsList { From b6d5d68663e369c782f48add2c70a2111208ecc0 Mon Sep 17 00:00:00 2001 From: morningman Date: Sun, 29 May 2022 15:55:45 +0800 Subject: [PATCH 5/8] fix ut --- .../doris/utframe/MockedBackendFactory.java | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java index efab82c9aa1f40..9578560876e614 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java @@ -336,7 +336,8 @@ public void transmitData(InternalService.PTransmitDataParams request, } @Override - public void execPlanFragment(InternalService.PExecPlanFragmentRequest request, StreamObserver responseObserver) { + public void execPlanFragment(InternalService.PExecPlanFragmentRequest request, + StreamObserver responseObserver) { System.out.println("get exec_plan_fragment request"); responseObserver.onNext(InternalService.PExecPlanFragmentResult.newBuilder() .setStatus(Types.PStatus.newBuilder().setStatusCode(0)).build()); @@ -344,7 +345,17 @@ public void execPlanFragment(InternalService.PExecPlanFragmentRequest request, S } @Override - public void cancelPlanFragment(InternalService.PCancelPlanFragmentRequest request, StreamObserver responseObserver) { + public void execPlanFragmentStart(InternalService.PExecPlanFragmentStartRequest request, + StreamObserver responseObserver) { + System.out.println("get exec_plan_fragment_start request"); + responseObserver.onNext(InternalService.PExecPlanFragmentResult.newBuilder() + .setStatus(Types.PStatus.newBuilder().setStatusCode(0)).build()); + responseObserver.onCompleted(); + } + + @Override + public void cancelPlanFragment(InternalService.PCancelPlanFragmentRequest request, + StreamObserver responseObserver) { System.out.println("get cancel_plan_fragment request"); responseObserver.onNext(InternalService.PCancelPlanFragmentResult.newBuilder() .setStatus(Types.PStatus.newBuilder().setStatusCode(0)).build()); From 2dd009d8cc52abfd223da57fd4e51d8d34932c6a Mon Sep 17 00:00:00 2001 From: morningman Date: Wed, 1 Jun 2022 11:49:35 +0800 Subject: [PATCH 6/8] 1 --- .../java/org/apache/doris/qe/Coordinator.java | 18 ++++++++++++------ .../doris/qe/InsertStreamTxnExecutor.java | 2 +- .../apache/doris/rpc/BackendServiceClient.java | 5 +++++ .../apache/doris/rpc/BackendServiceProxy.java | 11 +++++++---- .../load/sync/canal/CanalSyncDataTest.java | 12 ++++++++---- gensrc/proto/internal_service.proto | 3 +++ 6 files changed, 36 insertions(+), 15 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 9d63b8a98b67d3..e3e414273476f7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -578,7 +578,9 @@ private void sendFragment() throws TException, RpcException, UserException { int profileFragmentId = 0; long memoryLimit = queryOptions.getMemLimit(); Map beToExecStates = Maps.newHashMap(); - boolean needTrigger = fragments.size() > 2; + // If #fragments >=3, use twoPhaseExecution with exec_plan_fragments_prepare and exec_plan_fragments_start, + // else use exec_plan_fragments directly. + boolean twoPhaseExecution = fragments.size() >= 3; for (PlanFragment fragment : fragments) { FragmentExecParams params = fragmentExecParamsMap.get(fragment.getFragmentId()); @@ -615,7 +617,7 @@ private void sendFragment() throws TException, RpcException, UserException { // and the BE will determine whether all Fragments have been executed based on this information. tParam.setFragmentNumOnHost(hostCounter.count(execState.address)); tParam.setBackendId(execState.backend.getId()); - tParam.setNeedWaitExecutionTrigger(needTrigger); + tParam.setNeedWaitExecutionTrigger(twoPhaseExecution); backendExecStates.add(execState); if (needCheckBackendState) { @@ -628,7 +630,8 @@ private void sendFragment() throws TException, RpcException, UserException { BackendExecStates states = beToExecStates.get(execState.backend.getId()); if (states == null) { - states = new BackendExecStates(execState.backend.getId(), execState.brpcAddress); + states = new BackendExecStates(execState.backend.getId(), execState.brpcAddress, + twoPhaseExecution); beToExecStates.putIfAbsent(execState.backend.getId(), states); } states.addState(execState); @@ -644,7 +647,7 @@ private void sendFragment() throws TException, RpcException, UserException { } waitRpc(futures, this.timeoutDeadline - System.currentTimeMillis(), "send fragments"); - if (needTrigger) { + if (twoPhaseExecution) { // 5. send and wait execution start rpc futures.clear(); for (BackendExecStates states : beToExecStates.values()) { @@ -2087,10 +2090,12 @@ public class BackendExecStates { long beId; TNetworkAddress brpcAddr; List states = Lists.newArrayList(); + boolean twoPhaseExecution = false; - public BackendExecStates(long beId, TNetworkAddress brpcAddr) { + public BackendExecStates(long beId, TNetworkAddress brpcAddr, boolean twoPhaseExecution) { this.beId = beId; this.brpcAddr = brpcAddr; + this.twoPhaseExecution = twoPhaseExecution; } public void addState(BackendExecState state) { @@ -2119,7 +2124,8 @@ public Future execRemoteFragmentsAsync( for (BackendExecState state : states) { paramsList.addToParamsList(state.rpcParams); } - return BackendServiceProxy.getInstance().execPlanFragmentsAsync(brpcAddr, paramsList); + return BackendServiceProxy.getInstance() + .execPlanFragmentsAsync(brpcAddr, paramsList, twoPhaseExecution); } catch (RpcException e) { // DO NOT throw exception here, return a complete future with error code, // so that the following logic will cancel the fragment. diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java index 5160769de1722c..41689a9b6b7389 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java @@ -89,7 +89,7 @@ public void beginTransaction(TStreamLoadPutRequest request) throws UserException TExecPlanFragmentParamsList paramsList = new TExecPlanFragmentParamsList(); paramsList.addToParamsList(tRequest); Future future = - BackendServiceProxy.getInstance().execPlanFragmentsAsync(address, paramsList); + BackendServiceProxy.getInstance().execPlanFragmentsAsync(address, paramsList, false); InternalService.PExecPlanFragmentResult result = future.get(5, TimeUnit.SECONDS); TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); if (code != TStatusCode.OK) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java index 3742f79e6c8442..77101e76f0fe2d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java @@ -54,6 +54,11 @@ public Future execPlanFragmentAsync( return stub.execPlanFragment(request); } + public Future execPlanFragmentPrepareAsync( + InternalService.PExecPlanFragmentRequest request) { + return stub.execPlanFragmentPrepare(request); + } + public Future execPlanFragmentStartAsync( InternalService.PExecPlanFragmentStartRequest request) { return stub.execPlanFragmentStart(request); diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index e37a3afa58b165..96206416d668fd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -93,9 +93,8 @@ private BackendServiceClient getProxy(TNetworkAddress address) { } } - public Future execPlanFragmentsAsync( - TNetworkAddress address, TExecPlanFragmentParamsList paramsList) - throws TException, RpcException { + public Future execPlanFragmentsAsync(TNetworkAddress address, + TExecPlanFragmentParamsList paramsList, boolean twoPhaseExecution) throws TException, RpcException { InternalService.PExecPlanFragmentRequest.Builder builder = InternalService.PExecPlanFragmentRequest.newBuilder(); if (Config.use_compact_thrift_rpc) { @@ -112,7 +111,11 @@ public Future execPlanFragmentsAsync( final InternalService.PExecPlanFragmentRequest pRequest = builder.build(); try { final BackendServiceClient client = getProxy(address); - return client.execPlanFragmentAsync(pRequest); + if (twoPhaseExecution) { + return client.execPlanFragmentPrepareAsync(pRequest); + } else { + return client.execPlanFragmentAsync(pRequest); + } } catch (Throwable e) { LOG.warn("Execute plan fragment catch a exception, address={}:{}", address.getHostname(), address.getPort(), e); diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java index f3ad8c2d329997..6864d1a016e051 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java @@ -253,7 +253,8 @@ public void testNormal(@Mocked GlobalTransactionMgr transactionMgr, minTimes = 0; result = 105L; - backendServiceProxy.execPlanFragmentsAsync((TNetworkAddress) any, (TExecPlanFragmentParamsList) any); + backendServiceProxy.execPlanFragmentsAsync((TNetworkAddress) any, (TExecPlanFragmentParamsList) any, + anyBoolean); minTimes = 0; result = execFuture; @@ -261,7 +262,8 @@ public void testNormal(@Mocked GlobalTransactionMgr transactionMgr, minTimes = 0; result = commitFuture; - backendServiceProxy.sendData((TNetworkAddress) any, (Types.PUniqueId) any, (List) any); + backendServiceProxy.sendData((TNetworkAddress) any, (Types.PUniqueId) any, + (List) any); minTimes = 0; result = sendDataFuture; @@ -324,7 +326,8 @@ public void testExecFragmentFail(@Mocked GlobalTransactionMgr transactionMgr, minTimes = 0; result = 105L; - backendServiceProxy.execPlanFragmentsAsync((TNetworkAddress) any, (TExecPlanFragmentParamsList) any); + backendServiceProxy.execPlanFragmentsAsync((TNetworkAddress) any, (TExecPlanFragmentParamsList) any, + anyBoolean); minTimes = 0; result = execFuture; @@ -390,7 +393,8 @@ public void testCommitTxnFail(@Mocked GlobalTransactionMgr transactionMgr, minTimes = 0; result = 105L; - backendServiceProxy.execPlanFragmentsAsync((TNetworkAddress) any, (TExecPlanFragmentParamsList) any); + backendServiceProxy.execPlanFragmentsAsync((TNetworkAddress) any, (TExecPlanFragmentParamsList) any, + anyBoolean); minTimes = 0; result = execFuture; diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index bc3014d6503a29..3cccbb9d5d615f 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -468,7 +468,10 @@ message PResetRPCChannelResponse { service PBackendService { rpc transmit_data(PTransmitDataParams) returns (PTransmitDataResult); + // If #fragments of a query is < 3, use exec_plan_fragment directly. + // If #fragments of a query is >=3, use exec_plan_fragment_prepare + exec_plan_fragment_start rpc exec_plan_fragment(PExecPlanFragmentRequest) returns (PExecPlanFragmentResult); + rpc exec_plan_fragment_prepare(PExecPlanFragmentRequest) returns (PExecPlanFragmentResult); rpc exec_plan_fragment_start(PExecPlanFragmentStartRequest) returns (PExecPlanFragmentResult); rpc cancel_plan_fragment(PCancelPlanFragmentRequest) returns (PCancelPlanFragmentResult); rpc fetch_data(PFetchDataRequest) returns (PFetchDataResult); From a98a82e68f02efb6ae8718cb9e113fcf245262d1 Mon Sep 17 00:00:00 2001 From: morningman Date: Wed, 1 Jun 2022 23:42:14 +0800 Subject: [PATCH 7/8] fix ut --- .../org/apache/doris/utframe/MockedBackendFactory.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java index 9578560876e614..d93bce32374c55 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java @@ -344,6 +344,15 @@ public void execPlanFragment(InternalService.PExecPlanFragmentRequest request, responseObserver.onCompleted(); } + @Override + public void execPlanFragmentPrepare(InternalService.PExecPlanFragmentRequest request, + StreamObserver responseObserver) { + System.out.println("get exec_plan_fragment_prepare request"); + responseObserver.onNext(InternalService.PExecPlanFragmentResult.newBuilder() + .setStatus(Types.PStatus.newBuilder().setStatusCode(0)).build()); + responseObserver.onCompleted(); + } + @Override public void execPlanFragmentStart(InternalService.PExecPlanFragmentStartRequest request, StreamObserver responseObserver) { From d606a3e7a7038b187016f50a83044df3575ddace Mon Sep 17 00:00:00 2001 From: morningman Date: Fri, 3 Jun 2022 10:10:18 +0800 Subject: [PATCH 8/8] add missing service --- be/src/service/internal_service.cpp | 7 +++++++ be/src/service/internal_service.h | 5 +++++ 2 files changed, 12 insertions(+) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 8ad2ad1056775f..82c0f2ba1ceb3e 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -120,6 +120,13 @@ void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* c st.to_protobuf(response->mutable_status()); } +void PInternalServiceImpl::exec_plan_fragment_prepare(google::protobuf::RpcController* cntl_base, + const PExecPlanFragmentRequest* request, + PExecPlanFragmentResult* response, + google::protobuf::Closure* done) { + exec_plan_fragment(cntl_base, request, response, done); +} + void PInternalServiceImpl::exec_plan_fragment_start(google::protobuf::RpcController* controller, const PExecPlanFragmentStartRequest* request, PExecPlanFragmentResult* result, diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index c2560633d474ce..32fcead91bb5c1 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -45,6 +45,11 @@ class PInternalServiceImpl : public PBackendService { PExecPlanFragmentResult* result, google::protobuf::Closure* done) override; + void exec_plan_fragment_prepare(google::protobuf::RpcController* controller, + const PExecPlanFragmentRequest* request, + PExecPlanFragmentResult* result, + google::protobuf::Closure* done) override; + void exec_plan_fragment_start(google::protobuf::RpcController* controller, const PExecPlanFragmentStartRequest* request, PExecPlanFragmentResult* result,