From 087700c9b5cd3a86e99e666c17674eab382b7440 Mon Sep 17 00:00:00 2001 From: yangzhg Date: Mon, 15 Feb 2021 20:46:52 +0800 Subject: [PATCH 1/2] remove jprotobuf and use grpc client to connect brpc service --- be/src/runtime/buffer_control_block.cpp | 2 + be/src/service/internal_service.cpp | 11 +- be/src/service/internal_service.h | 2 +- fe/fe-core/pom.xml | 107 ++++---- .../java/org/apache/doris/common/Status.java | 8 +- .../common/proc/CurrentQueryInfoProvider.java | 38 +-- .../apache/doris/common/util/DebugUtil.java | 6 +- .../apache/doris/common/util/KafkaUtil.java | 50 ++-- .../org/apache/doris/qe/ConnectProcessor.java | 14 +- .../java/org/apache/doris/qe/Coordinator.java | 88 ++++--- .../org/apache/doris/qe/ResultReceiver.java | 37 ++- .../java/org/apache/doris/qe/RowBatch.java | 2 +- .../org/apache/doris/qe/StmtExecutor.java | 56 +++-- .../java/org/apache/doris/qe/cache/Cache.java | 3 +- .../apache/doris/qe/cache/CacheAnalyzer.java | 17 +- .../apache/doris/qe/cache/CacheBeProxy.java | 72 ++---- .../doris/qe/cache/CacheCoordinator.java | 20 +- .../org/apache/doris/qe/cache/CacheProxy.java | 228 +----------------- .../apache/doris/qe/cache/PartitionCache.java | 48 ++-- .../doris/qe/cache/RowBatchBuilder.java | 45 +++- .../org/apache/doris/qe/cache/SqlCache.java | 32 ++- .../doris/rpc/BackendServiceClient.java | 76 ++++++ .../apache/doris/rpc/BackendServiceProxy.java | 164 ++++--------- .../org/apache/doris/rpc/PBackendService.java | 67 ----- .../doris/rpc/PExecPlanFragmentRequest.java | 24 -- .../apache/doris/rpc/PFetchDataRequest.java | 33 --- .../rpc/PTriggerProfileReportRequest.java | 42 ---- .../rpc/ThriftClientAttachmentHandler.java | 33 --- .../apache/doris/planner/QueryPlanTest.java | 5 +- .../apache/doris/qe/ConnectProcessorTest.java | 9 +- .../apache/doris/qe/PartitionCacheTest.java | 11 +- .../apache/doris/utframe/MockedBackend.java | 26 +- .../doris/utframe/MockedBackendFactory.java | 140 ++++++----- fe/pom.xml | 54 ++--- gensrc/proto/internal_service.proto | 3 + 35 files changed, 597 insertions(+), 976 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/rpc/PBackendService.java delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/rpc/PExecPlanFragmentRequest.java delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/rpc/PFetchDataRequest.java delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/rpc/PTriggerProfileReportRequest.java delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/rpc/ThriftClientAttachmentHandler.java diff --git a/be/src/runtime/buffer_control_block.cpp b/be/src/runtime/buffer_control_block.cpp index 9d21999b2deb5b..64b2e02bc93d79 100644 --- a/be/src/runtime/buffer_control_block.cpp +++ b/be/src/runtime/buffer_control_block.cpp @@ -50,7 +50,9 @@ void GetResultBatchCtx::on_data(TFetchDataResult* t_result, int64_t packet_seq, ThriftSerializer ser(false, 4096); auto st = ser.serialize(&t_result->result_batch, &len, &buf); if (st.ok()) { + // TODO(yangzhengguo) this is just for compatible with old version, this should be removed in the release 0.15 cntl->response_attachment().append(buf, len); + result->set_row_batch(std::string(buf, buf + len)); result->set_packet_seq(packet_seq); result->set_eos(eos); } else { diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 51480fbb038116..84e93988c6fc99 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -77,7 +77,13 @@ void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController google::protobuf::Closure* done) { brpc::ClosureGuard closure_guard(done); brpc::Controller* cntl = static_cast(cntl_base); - auto st = _exec_plan_fragment(cntl); + auto st = Status::OK(); + if (request->has_request()) { + st = _exec_plan_fragment(request->request()); + } else { + // TODO(yangzhengguo) this is just for compatible with old version, this should be removed in the release 0.15 + st = _exec_plan_fragment(cntl->request_attachment().to_string()); + } if (!st.ok()) { LOG(WARNING) << "exec plan fragment failed, errmsg=" << st.get_error_msg(); } @@ -129,8 +135,7 @@ void PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcControll } template -Status PInternalServiceImpl::_exec_plan_fragment(brpc::Controller* cntl) { - auto ser_request = cntl->request_attachment().to_string(); +Status PInternalServiceImpl::_exec_plan_fragment(const std::string& ser_request) { TExecPlanFragmentParams t_request; { const uint8_t* buf = (const uint8_t*)ser_request.data(); diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index 1010f147fbd8a4..91196aea6486ed 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -89,7 +89,7 @@ class PInternalServiceImpl : public T { PCacheResponse* response, google::protobuf::Closure* done) override; private: - Status _exec_plan_fragment(brpc::Controller* cntl); + Status _exec_plan_fragment(const std::string& s_request); private: ExecEnv* _exec_env; diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml index ac7e6f841b198e..75ea786d014b64 100644 --- a/fe/fe-core/pom.xml +++ b/fe/fe-core/pom.xml @@ -229,36 +229,11 @@ under the License. joda-time - - - com.baidu - jprotobuf - jar-with-dependencies - - - - - com.baidu - jprotobuf-rpc-common - - commons-io commons-io - - - com.baidu - jprotobuf-rpc-core - - - servlet-api - javax.servlet - - - - org.json @@ -306,12 +281,6 @@ under the License. mysql-connector-java - - - io.netty - netty-all - - org.objenesis @@ -607,6 +576,20 @@ under the License. tree-printer + + io.grpc + grpc-netty + + + + io.grpc + grpc-protobuf + + + + io.grpc + grpc-stub + @@ -674,28 +657,45 @@ under the License. - - + + - org.codehaus.mojo - exec-maven-plugin - 3.0.0 + com.github.os72 + protoc-jar-maven-plugin + 3.11.1 - make-dir generate-sources - exec + run - mkdir - - -p - ${basedir}/target/generated-sources/proto - - ${skip.plugin} + + ${doris.thirdparty}/installed/bin/protoc + ${protobuf.version} + + ${doris.home}/gensrc/proto + + + + java + + + grpc-java + io.grpc:protoc-gen-grpc-java:${grpc.version} + + + + + + + + org.codehaus.mojo + exec-maven-plugin + 3.0.0 + gensrc generate-sources @@ -711,24 +711,6 @@ under the License. ${skip.plugin} - - gen_proto - generate-sources - - - exec - - - ${java.home}/bin/java - - -jar - ${settings.localRepository}/com/baidu/jprotobuf/${jprotobuf.version}/jprotobuf-${jprotobuf.version}-jar-with-dependencies.jar - --java_out=${basedir}/target/generated-sources/proto - ${doris.home}/gensrc/proto/internal_service.proto - - ${skip.plugin} - - @@ -748,7 +730,6 @@ under the License. ${basedir}/target/generated-sources/build/ - ${basedir}/target/generated-sources/proto/ diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Status.java b/fe/fe-core/src/main/java/org/apache/doris/common/Status.java index 2fad775475370c..7d6b7c609c4a38 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Status.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Status.java @@ -17,7 +17,7 @@ package org.apache.doris.common; -import org.apache.doris.proto.PStatus; +import org.apache.doris.proto.Status.PStatus; import org.apache.doris.thrift.TStatus; import org.apache.doris.thrift.TStatusCode; @@ -81,9 +81,9 @@ public void setStatus(String msg) { } public void setPstatus(PStatus status) { - this.errorCode = TStatusCode.findByValue(status.status_code); - if (status.error_msgs != null && !status.error_msgs.isEmpty()) { - this.errorMsg = status.error_msgs.get(0); + this.errorCode = TStatusCode.findByValue(status.getStatusCode()); + if (!status.getErrorMsgsList().isEmpty()) { + this.errorMsg = status.getErrorMsgs(0); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java index 46add4da5d03e7..0dd2c8ebf1e8e2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java @@ -23,11 +23,10 @@ import org.apache.doris.common.util.Counter; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.RuntimeProfile; -import org.apache.doris.proto.PTriggerProfileReportResult; -import org.apache.doris.proto.PUniqueId; +import org.apache.doris.proto.InternalService; +import org.apache.doris.proto.Types; import org.apache.doris.qe.QueryStatisticsItem; import org.apache.doris.rpc.BackendServiceProxy; -import org.apache.doris.rpc.PTriggerProfileReportRequest; import org.apache.doris.rpc.RpcException; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TNetworkAddress; @@ -203,9 +202,10 @@ private void triggerProfileReport(Collection items, boolean } // specified query instance which will report. if (!allQuery) { - final PUniqueId pUId = new PUniqueId(); - pUId.hi = instanceInfo.getInstanceId().hi; - pUId.lo = instanceInfo.getInstanceId().lo; + final Types.PUniqueId pUId = Types.PUniqueId.newBuilder() + .setHi(instanceInfo.getInstanceId().hi) + .setLo(instanceInfo.getInstanceId().lo) + .build(); request.addInstanceId(pUId); } } @@ -213,13 +213,13 @@ private void triggerProfileReport(Collection items, boolean recvResponse(sendRequest(requests)); } - private List>> sendRequest( + private List>> sendRequest( Map requests) throws AnalysisException { - final List>> futures = Lists.newArrayList(); + final List>> futures = Lists.newArrayList(); for (TNetworkAddress address : requests.keySet()) { final Request request = requests.get(address); - final PTriggerProfileReportRequest pbRequest = - new PTriggerProfileReportRequest(request.getInstanceIds()); + final InternalService.PTriggerProfileReportRequest pbRequest = InternalService.PTriggerProfileReportRequest + .newBuilder().addAllInstanceIds(request.getInstanceIds()).build(); try { futures.add(Pair.create(request, BackendServiceProxy.getInstance(). triggerProfileReportAsync(address, pbRequest))); @@ -230,18 +230,18 @@ private List>> sendRequest( return futures; } - private void recvResponse(List>> futures) + private void recvResponse(List>> futures) throws AnalysisException { final String reasonPrefix = "Fail to receive result."; - for (Pair> pair : futures) { + for (Pair> pair : futures) { try { - final PTriggerProfileReportResult result + final InternalService.PTriggerProfileReportResult result = pair.second.get(2, TimeUnit.SECONDS); - final TStatusCode code = TStatusCode.findByValue(result.status.status_code); + final TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); if (code != TStatusCode.OK) { String errMsg = ""; - if (result.status.error_msgs != null && !result.status.error_msgs.isEmpty()) { - errMsg = result.status.error_msgs.get(0); + if (!result.getStatus().getErrorMsgsList().isEmpty()) { + errMsg = result.getStatus().getErrorMsgs(0); } throw new AnalysisException(reasonPrefix + " backend:" + pair.first.getAddress() + " reason:" + errMsg); @@ -347,7 +347,7 @@ public long getScanBytes() { private static class Request { private final TNetworkAddress address; - private final List instanceIds; + private final List instanceIds; public Request(TNetworkAddress address) { this.address = address; @@ -358,11 +358,11 @@ public TNetworkAddress getAddress() { return address; } - public List getInstanceIds() { + public List getInstanceIds() { return instanceIds; } - public void addInstanceId(PUniqueId instanceId) { + public void addInstanceId(Types.PUniqueId instanceId) { this.instanceIds.add(instanceId); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugUtil.java index 012a5ee3e2b13e..c8eca0b25badf0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugUtil.java @@ -18,7 +18,7 @@ package org.apache.doris.common.util; import org.apache.doris.common.Pair; -import org.apache.doris.proto.PUniqueId; +import org.apache.doris.proto.Types; import org.apache.doris.thrift.TUniqueId; import java.io.PrintWriter; @@ -135,9 +135,9 @@ public static String printId(final UUID id) { return builder.toString(); } - public static String printId(final PUniqueId id) { + public static String printId(final Types.PUniqueId id) { StringBuilder builder = new StringBuilder(); - builder.append(Long.toHexString(id.hi)).append("-").append(Long.toHexString(id.lo)); + builder.append(Long.toHexString(id.getHi())).append("-").append(Long.toHexString(id.getLo())); return builder.toString(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java index 6f75f8e5f4695f..4ecb520c490b93 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java @@ -21,19 +21,13 @@ import org.apache.doris.common.ClientPool; import org.apache.doris.common.LoadException; import org.apache.doris.common.UserException; -import org.apache.doris.proto.PKafkaLoadInfo; -import org.apache.doris.proto.PKafkaMetaProxyRequest; -import org.apache.doris.proto.PProxyRequest; -import org.apache.doris.proto.PProxyResult; -import org.apache.doris.proto.PStringPair; +import org.apache.doris.proto.InternalService; import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.system.Backend; import org.apache.doris.thrift.BackendService; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TStatusCode; -import com.google.common.collect.Lists; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -42,6 +36,7 @@ import java.util.Map; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; public class KafkaUtil { private static final Logger LOG = LogManager.getLogger(KafkaUtil.class); @@ -61,31 +56,30 @@ public static List getAllKafkaPartitions(String brokerList, String topi address = new TNetworkAddress(be.getHost(), be.getBrpcPort()); // create request - PKafkaLoadInfo kafkaLoadInfo = new PKafkaLoadInfo(); - kafkaLoadInfo.brokers = brokerList; - kafkaLoadInfo.topic = topic; - for (Map.Entry entry : convertedCustomProperties.entrySet()) { - PStringPair pair = new PStringPair(); - pair.key = entry.getKey(); - pair.val = entry.getValue(); - if (kafkaLoadInfo.properties == null) { - kafkaLoadInfo.properties = Lists.newArrayList(); - } - kafkaLoadInfo.properties.add(pair); - } - PKafkaMetaProxyRequest kafkaRequest = new PKafkaMetaProxyRequest(); - kafkaRequest.kafka_info = kafkaLoadInfo; - PProxyRequest request = new PProxyRequest(); - request.kafka_meta_request = kafkaRequest; + InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest( + InternalService.PKafkaMetaProxyRequest.newBuilder() + .setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder() + .setBrokers(brokerList) + .setTopic(topic) + .addAllProperties( + convertedCustomProperties.entrySet().stream().map( + e -> InternalService.PStringPair.newBuilder() + .setKey(e.getKey()) + .setVal(e.getValue()) + .build() + ).collect(Collectors.toList()) + ) + ) + ).build(); // get info - Future future = BackendServiceProxy.getInstance().getInfo(address, request); - PProxyResult result = future.get(5, TimeUnit.SECONDS); - TStatusCode code = TStatusCode.findByValue(result.status.status_code); + Future future = BackendServiceProxy.getInstance().getInfo(address, request); + InternalService.PProxyResult result = future.get(5, TimeUnit.SECONDS); + TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); if (code != TStatusCode.OK) { - throw new UserException("failed to get kafka partition info: " + result.status.error_msgs); + throw new UserException("failed to get kafka partition info: " + result.getStatus().getErrorMsgsList()); } else { - return result.kafka_meta_result.partition_ids; + return result.getKafkaMetaResult().getPartitionIdsList(); } } catch (Exception e) { LOG.warn("failed to get partitions.", e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index 29349239e15399..8a30f1e375981a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -44,7 +44,7 @@ import org.apache.doris.mysql.MysqlSerializer; import org.apache.doris.mysql.MysqlServerStatusFlag; import org.apache.doris.plugin.AuditEvent.EventType; -import org.apache.doris.proto.PQueryStatistics; +import org.apache.doris.proto.Data; import org.apache.doris.service.FrontendOptions; import org.apache.doris.thrift.TMasterOpRequest; import org.apache.doris.thrift.TMasterOpResult; @@ -107,16 +107,16 @@ private void handlePing() { ctx.getState().setOk(); } - private void auditAfterExec(String origStmt, StatementBase parsedStmt, PQueryStatistics statistics) { + private void auditAfterExec(String origStmt, StatementBase parsedStmt, Data.PQueryStatistics statistics) { // slow query long endTime = System.currentTimeMillis(); long elapseMs = endTime - ctx.getStartTime(); ctx.getAuditEventBuilder().setEventType(EventType.AFTER_QUERY) .setState(ctx.getState().toString()).setQueryTime(elapseMs) - .setScanBytes(statistics == null ? 0 : statistics.scan_bytes) - .setScanRows(statistics == null ? 0 : statistics.scan_rows) - .setCpuTimeMs(statistics == null ? 0 : statistics.cpu_ms) + .setScanBytes(statistics == null ? 0 : statistics.getScanBytes()) + .setScanRows(statistics == null ? 0 : statistics.getScanRows()) + .setCpuTimeMs(statistics == null ? 0 : statistics.getCpuMs()) .setReturnRows(ctx.getReturnRows()) .setStmtId(ctx.getStmtId()) .setQueryId(ctx.queryId() == null ? "NaN" : DebugUtil.printId(ctx.queryId())); @@ -181,7 +181,7 @@ private void handleQuery() { // execute this query. StatementBase parsedStmt = null; - List> auditInfoList = Lists.newArrayList(); + List> auditInfoList = Lists.newArrayList(); boolean alreadyAddedToAuditInfoList = false; try { List stmts = analyze(originStmt); @@ -232,7 +232,7 @@ private void handleQuery() { // audit after exec if (!auditInfoList.isEmpty()) { - for (Pair audit : auditInfoList) { + for (Pair audit : auditInfoList) { auditAfterExec(originStmt.replace("\n", " "), audit.first, audit.second); } } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 1081ebc268b406..7d628ba095c503 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -50,9 +50,7 @@ import org.apache.doris.planner.ScanNode; import org.apache.doris.planner.SetOperationNode; import org.apache.doris.planner.UnionNode; -import org.apache.doris.proto.PExecPlanFragmentResult; -import org.apache.doris.proto.PPlanFragmentCancelReason; -import org.apache.doris.proto.PStatus; +import org.apache.doris.proto.InternalService; import org.apache.doris.qe.QueryStatisticsItem.FragmentInstanceInfo; import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.rpc.RpcException; @@ -318,7 +316,7 @@ private void prepare() { for (PlanFragment fragment : fragments) { fragmentExecParamsMap.put(fragment.getFragmentId(), new FragmentExecParams(fragment)); } - + // set inputFragments for (PlanFragment fragment : fragments) { if (!(fragment.getSink() instanceof DataStreamSink)) { @@ -335,7 +333,7 @@ private void prepare() { queryProfile = new RuntimeProfile("Execution Profile " + DebugUtil.printId(queryId)); fragmentProfile = new ArrayList(); - for (int i = 0; i < fragmentSize; i ++) { + for (int i = 0; i < fragmentSize; i++) { fragmentProfile.add(new RuntimeProfile("Fragment " + i)); queryProfile.addChild(fragmentProfile.get(i)); } @@ -511,16 +509,16 @@ private void sendFragment() throws TException, RpcException, UserException { backendIdx++; } - for (Pair> pair : futures) { + for (Pair> pair : futures) { TStatusCode code; String errMsg = null; Exception exception = null; try { - PExecPlanFragmentResult result = pair.second.get(Config.remote_fragment_exec_timeout_ms, + InternalService.PExecPlanFragmentResult result = pair.second.get(Config.remote_fragment_exec_timeout_ms, TimeUnit.MILLISECONDS); - code = TStatusCode.findByValue(result.status.status_code); - if (result.status.error_msgs != null && !result.status.error_msgs.isEmpty()) { - errMsg = result.status.error_msgs.get(0); + code = TStatusCode.findByValue(result.getStatus().getStatusCode()); + if (!result.getStatus().getErrorMsgsList().isEmpty()) { + errMsg = result.getStatus().getErrorMsgsList().get(0); } } catch (ExecutionException e) { LOG.warn("catch a execute exception", e); @@ -548,7 +546,7 @@ private void sendFragment() throws TException, RpcException, UserException { LOG.warn("exec plan fragment failed, errmsg={}, code: {}, fragmentId={}, backend={}:{}", errMsg, code, fragment.getFragmentId(), pair.first.address.hostname, pair.first.address.port); - cancelInternal(PPlanFragmentCancelReason.INTERNAL_ERROR); + cancelInternal(InternalService.PPlanFragmentCancelReason.INTERNAL_ERROR); switch (code) { case TIMEOUT: throw new UserException("query timeout. backend id: " + pair.first.backend.getId()); @@ -669,7 +667,7 @@ private void updateStatus(Status status, TUniqueId instanceId) { queryStatus.setStatus(status); LOG.warn("one instance report fail throw updateStatus(), need cancel. job id: {}, query id: {}, instance id: {}", jobId, DebugUtil.printId(queryId), instanceId != null ? DebugUtil.printId(instanceId) : "NaN"); - cancelInternal(PPlanFragmentCancelReason.INTERNAL_ERROR); + cancelInternal(InternalService.PPlanFragmentCancelReason.INTERNAL_ERROR); } finally { lock.unlock(); } @@ -720,11 +718,11 @@ public RowBatch getNext() throws Exception { this.returnedAllResults = true; // if this query is a block query do not cancel. - Long numLimitRows = fragments.get(0).getPlanRoot().getLimit(); + Long numLimitRows = fragments.get(0).getPlanRoot().getLimit(); boolean hasLimit = numLimitRows > 0; if (!isBlockQuery && instanceIds.size() > 1 && hasLimit && numReceivedRows >= numLimitRows) { LOG.debug("no block query, return num >= limit rows, need cancel"); - cancelInternal(PPlanFragmentCancelReason.LIMIT_REACH); + cancelInternal(InternalService.PPlanFragmentCancelReason.LIMIT_REACH); } } else { numReceivedRows += resultBatch.getBatch().getRowsSize(); @@ -746,13 +744,13 @@ public void cancel() { queryStatus.setStatus(Status.CANCELLED); } LOG.warn("cancel execution of query, this is outside invoke"); - cancelInternal(PPlanFragmentCancelReason.USER_CANCEL); + cancelInternal(InternalService.PPlanFragmentCancelReason.USER_CANCEL); } finally { unlock(); } } - private void cancelInternal(PPlanFragmentCancelReason cancelReason) { + private void cancelInternal(InternalService.PPlanFragmentCancelReason cancelReason) { if (null != receiver) { receiver.cancel(); } @@ -760,11 +758,11 @@ private void cancelInternal(PPlanFragmentCancelReason cancelReason) { if (profileDoneSignal != null) { // count down to zero to notify all objects waiting for this profileDoneSignal.countDownToZero(new Status()); - LOG.info("unfinished instance: {}", profileDoneSignal.getLeftMarks().stream().map(e->DebugUtil.printId(e.getKey())).toArray()); + LOG.info("unfinished instance: {}", profileDoneSignal.getLeftMarks().stream().map(e -> DebugUtil.printId(e.getKey())).toArray()); } } - private void cancelRemoteFragmentsAsync(PPlanFragmentCancelReason cancelReason) { + private void cancelRemoteFragmentsAsync(InternalService.PPlanFragmentCancelReason cancelReason) { for (BackendExecState backendExecState : backendExecStates) { backendExecState.cancelFragmentInstance(cancelReason); } @@ -971,7 +969,7 @@ private void computeFragmentHosts() throws Exception { throw new UserException("there is no scanNode Backend"); } this.addressToBackendID.put(execHostport, backendIdRef.getRef()); - FInstanceExecParam instanceParam = new FInstanceExecParam(null, execHostport, + FInstanceExecParam instanceParam = new FInstanceExecParam(null, execHostport, 0, params); params.instanceExecParams.add(instanceParam); continue; @@ -1018,7 +1016,7 @@ private void computeFragmentHosts() throws Exception { // random select some instance // get distinct host, when parallel_fragment_exec_instance_num > 1, single host may execute several instances Set hostSet = Sets.newHashSet(); - for (FInstanceExecParam execParams: fragmentExecParamsMap.get(inputFragmentId).instanceExecParams) { + for (FInstanceExecParam execParams : fragmentExecParamsMap.get(inputFragmentId).instanceExecParams) { hostSet.add(execParams.host); } List hosts = Lists.newArrayList(hostSet); @@ -1028,7 +1026,7 @@ private void computeFragmentHosts() throws Exception { params.instanceExecParams.add(instanceParam); } } else { - for (FInstanceExecParam execParams: fragmentExecParamsMap.get(inputFragmentId).instanceExecParams) { + for (FInstanceExecParam execParams : fragmentExecParamsMap.get(inputFragmentId).instanceExecParams) { FInstanceExecParam instanceParam = new FInstanceExecParam(null, execParams.host, 0, params); params.instanceExecParams.add(instanceParam); } @@ -1129,7 +1127,7 @@ private boolean isColocateJoin(PlanNode node) { return false; } - + // Returns the id of the leftmost node of any of the gives types in 'plan_root', // or INVALID_PLAN_NODE_ID if no such node present. private Pair findLeftmostNode(PlanNode plan) { @@ -1273,14 +1271,14 @@ private void computeScanRangeAssignmentByColocate( } Map bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(scanNode.getFragmentId()); HashMap assignedBytesPerHost = Maps.newHashMap(); - for(Integer bucketSeq: scanNode.bucketSeq2locations.keySet()) { + for (Integer bucketSeq : scanNode.bucketSeq2locations.keySet()) { //fill scanRangeParamsList List locations = scanNode.bucketSeq2locations.get(bucketSeq); if (!bucketSeqToAddress.containsKey(bucketSeq)) { getExecHostPortForFragmentIDAndBucketSeq(locations.get(0), scanNode.getFragmentId(), bucketSeq, assignedBytesPerHost); } - for(TScanRangeLocations location: locations) { + for (TScanRangeLocations location : locations) { Map> scanRanges = findOrInsert(bucketSeqToScanRange, bucketSeq, new HashMap>()); @@ -1438,7 +1436,7 @@ public void endProfile() { * return true otherwise. * NOTICE: return true does not mean that coordinator executed success, * the caller should check queryStatus for result. - * + * * We divide the entire waiting process into multiple rounds, * with a maximum of 30 seconds per round. And after each round of waiting, * check the status of the BE. If the BE status is abnormal, the wait is ended @@ -1590,7 +1588,7 @@ private void getExecHostPortForFragmentIDAndBucketSeq(TScanRangeLocations seqLoc //buckendIdToBucketCountMap does not contain the new backend, insert into it if (!buckendIdToBucketCountMap.containsKey(backendIdRef.getRef())) { buckendIdToBucketCountMap.put(backendIdRef.getRef(), 1); - } else { //buckendIdToBucketCountMap contains the new backend, update it + } else { //buckendIdToBucketCountMap contains the new backend, update it buckendIdToBucketCountMap.put(backendIdRef.getRef(), buckendIdToBucketCountMap.get(backendIdRef.getRef()) + 1); } } else { //the backend with buckendId is alive, update buckendIdToBucketCountMap directly @@ -1612,14 +1610,14 @@ private void computeScanRangeAssignmentByBucket( Map bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(scanNode.getFragmentId()); BucketSeqToScanRange bucketSeqToScanRange = fragmentIdBucketSeqToScanRangeMap.get(scanNode.getFragmentId()); - for (Integer bucketSeq: scanNode.bucketSeq2locations.keySet()) { + for (Integer bucketSeq : scanNode.bucketSeq2locations.keySet()) { //fill scanRangeParamsList List locations = scanNode.bucketSeq2locations.get(bucketSeq); if (!bucketSeqToAddress.containsKey(bucketSeq)) { getExecHostPortForFragmentIDAndBucketSeq(locations.get(0), scanNode.getFragmentId(), bucketSeq, idToBackend, addressToBackendID); } - for(TScanRangeLocations location: locations) { + for (TScanRangeLocations location : locations) { Map> scanRanges = findOrInsert(bucketSeqToScanRange, bucketSeq, new HashMap>()); @@ -1715,7 +1713,7 @@ public class BackendExecState { TNetworkAddress address; Backend backend; long lastMissingHeartbeatTime = -1; - + public BackendExecState(PlanFragmentId fragmentId, int instanceId, int profileFragmentId, TExecPlanFragmentParams rpcParams, Map addressToBackendID) { this.profileFragmentId = profileFragmentId; @@ -1773,7 +1771,7 @@ public synchronized void printProfile(StringBuilder builder) { // cancel the fragment instance. // return true if cancel success. Otherwise, return false - public synchronized boolean cancelFragmentInstance(PPlanFragmentCancelReason cancelReason) { + public synchronized boolean cancelFragmentInstance(InternalService.PPlanFragmentCancelReason cancelReason) { if (LOG.isDebugEnabled()) { LOG.debug("cancelRemoteFragments initiated={} done={} hasCanceled={} backend: {}, fragment instance id={}, reason: {}", this.initiated, this.done, this.hasCanceled, backend.getId(), @@ -1826,7 +1824,7 @@ public boolean isBackendStateHealthy() { return true; } - public Future execRemoteFragmentAsync() throws TException, RpcException { + public Future execRemoteFragmentAsync() throws TException, RpcException { TNetworkAddress brpcAddress = null; try { brpcAddress = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); @@ -1839,7 +1837,7 @@ public Future execRemoteFragmentAsync() throws TExcepti } catch (RpcException e) { // DO NOT throw exception here, return a complete future with error code, // so that the following logic will cancel the fragment. - return new Future() { + return new Future() { @Override public boolean cancel(boolean mayInterruptIfRunning) { return false; @@ -1856,19 +1854,19 @@ public boolean isDone() { } @Override - public PExecPlanFragmentResult get() { - PExecPlanFragmentResult result = new PExecPlanFragmentResult(); - PStatus pStatus = new PStatus(); - pStatus.error_msgs = Lists.newArrayList(); - pStatus.error_msgs.add(e.getMessage()); - // use THRIFT_RPC_ERROR so that this BE will be added to the blacklist later. - pStatus.status_code = TStatusCode.THRIFT_RPC_ERROR.getValue(); - result.status = pStatus; + public InternalService.PExecPlanFragmentResult get() { + InternalService.PExecPlanFragmentResult result = InternalService.PExecPlanFragmentResult + .newBuilder() + .setStatus(org.apache.doris.proto.Status.PStatus.newBuilder() + .addErrorMsgs(e.getMessage()) + .setStatusCode(TStatusCode.THRIFT_RPC_ERROR.getValue()) + .build()) + .build(); return result; } @Override - public PExecPlanFragmentResult get(long timeout, TimeUnit unit) { + public InternalService.PExecPlanFragmentResult get(long timeout, TimeUnit unit) { return get(); } }; @@ -1891,8 +1889,8 @@ private TUniqueId fragmentInstanceId() { // used to assemble TPlanFragmentExecParas protected class FragmentExecParams { public PlanFragment fragment; - public List destinations = Lists.newArrayList(); - public Map perExchNumSenders = Maps.newHashMap(); + public List destinations = Lists.newArrayList(); + public Map perExchNumSenders = Maps.newHashMap(); public List inputFragments = Lists.newArrayList(); public List instanceExecParams = Lists.newArrayList(); @@ -1964,8 +1962,8 @@ public void appendScanRange(StringBuilder sb, List params) { TEsScanRange esScanRange = range.getScanRange().getEsScanRange(); if (esScanRange != null) { sb.append("{ index=").append(esScanRange.getIndex()) - .append(", shardid=").append(esScanRange.getShardId()) - .append("}"); + .append(", shardid=").append(esScanRange.getShardId()) + .append("}"); } } sb.append("]"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java index 916654307039d2..c1ff7325348f33 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java @@ -18,10 +18,9 @@ package org.apache.doris.qe; import org.apache.doris.common.Status; -import org.apache.doris.proto.PFetchDataResult; -import org.apache.doris.proto.PUniqueId; +import org.apache.doris.proto.InternalService; +import org.apache.doris.proto.Types; import org.apache.doris.rpc.BackendServiceProxy; -import org.apache.doris.rpc.PFetchDataRequest; import org.apache.doris.rpc.RpcException; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TResultBatch; @@ -45,14 +44,12 @@ public class ResultReceiver { private long packetIdx = 0; private long timeoutTs = 0; private TNetworkAddress address; - private PUniqueId finstId; + private Types.PUniqueId finstId; private Long backendId; private Thread currentThread; public ResultReceiver(TUniqueId tid, Long backendId, TNetworkAddress address, int timeoutMs) { - this.finstId = new PUniqueId(); - this.finstId.hi = tid.hi; - this.finstId.lo = tid.lo; + this.finstId = Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build(); this.backendId = backendId; this.address = address; this.timeoutTs = System.currentTimeMillis() + timeoutMs; @@ -65,11 +62,13 @@ public RowBatch getNext(Status status) throws TException { final RowBatch rowBatch = new RowBatch(); try { while (!isDone && !isCancel) { - PFetchDataRequest request = new PFetchDataRequest(finstId); + InternalService.PFetchDataRequest request = InternalService.PFetchDataRequest.newBuilder() + .setFinstId(finstId) + .build(); currentThread = Thread.currentThread(); - Future future = BackendServiceProxy.getInstance().fetchDataAsync(address, request); - PFetchDataResult pResult = null; + Future future = BackendServiceProxy.getInstance().fetchDataAsync(address, request); + InternalService.PFetchDataResult pResult = null; while (pResult == null) { long currentTs = System.currentTimeMillis(); if (currentTs >= timeoutTs) { @@ -86,30 +85,30 @@ public RowBatch getNext(Status status) throws TException { } } } - TStatusCode code = TStatusCode.findByValue(pResult.status.status_code); + TStatusCode code = TStatusCode.findByValue(pResult.getStatus().getStatusCode()); if (code != TStatusCode.OK) { - status.setPstatus(pResult.status); + status.setPstatus(pResult.getStatus()); return null; } - rowBatch.setQueryStatistics(pResult.query_statistics); + rowBatch.setQueryStatistics(pResult.getQueryStatistics()); - if (packetIdx != pResult.packet_seq) { - LOG.warn("receive packet failed, expect={}, receive={}", packetIdx, pResult.packet_seq); + if (packetIdx != pResult.getPacketSeq()) { + LOG.warn("receive packet failed, expect={}, receive={}", packetIdx, pResult.getPacketSeq()); status.setRpcStatus("receive error packet"); return null; } packetIdx++; - isDone = pResult.eos; + isDone = pResult.getEos(); - byte[] serialResult = request.getSerializedResult(); - if (serialResult != null && serialResult.length > 0) { + if (pResult.hasRowBatch() && pResult.getRowBatch().size() > 0) { + byte[] serialResult = pResult.getRowBatch().toByteArray(); TResultBatch resultBatch = new TResultBatch(); TDeserializer deserializer = new TDeserializer(); deserializer.deserialize(resultBatch, serialResult); rowBatch.setBatch(resultBatch); - rowBatch.setEos(pResult.eos); + rowBatch.setEos(pResult.getEos()); return rowBatch; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/RowBatch.java b/fe/fe-core/src/main/java/org/apache/doris/qe/RowBatch.java index b6dfce96ecead0..087babb5374618 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/RowBatch.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/RowBatch.java @@ -17,7 +17,7 @@ package org.apache.doris.qe; -import org.apache.doris.proto.PQueryStatistics; +import org.apache.doris.proto.Data.PQueryStatistics; import org.apache.doris.thrift.TResultBatch; public final class RowBatch { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 4a361ec327efc3..8fc3c94ba4131b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -70,19 +70,19 @@ import org.apache.doris.mysql.MysqlSerializer; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.planner.Planner; -import org.apache.doris.proto.PQueryStatistics; +import org.apache.doris.proto.Data; +import org.apache.doris.proto.InternalService; import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.qe.cache.Cache; import org.apache.doris.qe.cache.CacheAnalyzer; import org.apache.doris.qe.cache.CacheAnalyzer.CacheMode; -import org.apache.doris.qe.cache.CacheBeProxy; -import org.apache.doris.qe.cache.CacheProxy; import org.apache.doris.rewrite.ExprRewriter; import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException; import org.apache.doris.rpc.RpcException; import org.apache.doris.task.LoadEtlTask; import org.apache.doris.thrift.TQueryOptions; import org.apache.doris.thrift.TQueryType; +import org.apache.doris.thrift.TResultBatch; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.TabletCommitInfo; import org.apache.doris.transaction.TransactionCommitFailedException; @@ -95,6 +95,7 @@ import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.protobuf.ByteString; import java.io.IOException; import java.io.StringReader; @@ -127,7 +128,7 @@ public class StmtExecutor { private Planner planner; private boolean isProxy; private ShowResultSet proxyResultSet = null; - private PQueryStatistics statisticsForAuditLog; + private Data.PQueryStatistics.Builder statisticsForAuditLog; private boolean isCached; private QueryPlannerProfile plannerProfile = new QueryPlannerProfile(); @@ -626,13 +627,20 @@ private void handleSetStmt() { // return true if the meta fields has been sent, otherwise, return false. // the meta fields must be sent right before the first batch of data(or eos flag). // so if it has data(or eos is true), this method must return true. - private boolean sendCachedValues(MysqlChannel channel, List cacheValues, + private boolean sendCachedValues(MysqlChannel channel, List cacheValues, SelectStmt selectStmt, boolean isSendFields, boolean isEos) throws Exception { RowBatch batch = null; boolean isSend = isSendFields; - for (CacheBeProxy.CacheValue value : cacheValues) { - batch = value.getRowBatch(); + for (InternalService.PCacheValue value : cacheValues) { + TResultBatch resultBatch = new TResultBatch(); + for (ByteString one : value.getRowsList()) { + resultBatch.addToRows(ByteBuffer.wrap(one.toByteArray())); + } + resultBatch.setPacketSeq(1); + resultBatch.setIsCompressed(false); + batch.setBatch(resultBatch); + batch.setEos(true); if (!isSend) { // send meta fields before sending first data batch. sendFields(selectStmt.getColLabels(), exprToType(selectStmt.getResultExprs())); @@ -646,7 +654,8 @@ private boolean sendCachedValues(MysqlChannel channel, List exprToType(List exprs) { return exprs.stream().map(e -> e.getType().getPrimitiveType()).collect(Collectors.toList()); } } - diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/Cache.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/Cache.java index c2a054e54749b0..17111792c8c038 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/Cache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/Cache.java @@ -20,6 +20,7 @@ import org.apache.doris.analysis.SelectStmt; import org.apache.doris.common.Config; import org.apache.doris.common.Status; +import org.apache.doris.proto.InternalService; import org.apache.doris.qe.RowBatch; import org.apache.doris.thrift.TUniqueId; import org.apache.logging.log4j.LogManager; @@ -50,7 +51,7 @@ protected Cache(TUniqueId queryId, SelectStmt selectStmt) { hitRange = HitRange.None; } - public abstract CacheProxy.FetchCacheResult getCacheData(Status status); + public abstract InternalService.PFetchCacheResult getCacheData(Status status); public HitRange getHitRange() { return hitRange; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java index ddd272542ce96f..096e5ca5ff3670 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java @@ -40,6 +40,7 @@ import org.apache.doris.planner.OlapScanNode; import org.apache.doris.planner.Planner; import org.apache.doris.planner.ScanNode; +import org.apache.doris.proto.InternalService; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.RowBatch; import org.apache.doris.thrift.TUniqueId; @@ -267,8 +268,7 @@ private CacheMode innerCheckCacheMode(long now) { return CacheMode.Partition; } - public CacheBeProxy.FetchCacheResult getCacheData() { - CacheProxy.FetchCacheResult cacheResult = null; + public InternalService.PFetchCacheResult getCacheData() { cacheMode = innerCheckCacheMode(0); if (cacheMode == CacheMode.NoNeed) { return null; @@ -277,13 +277,18 @@ public CacheBeProxy.FetchCacheResult getCacheData() { return null; } Status status = new Status(); - cacheResult = cache.getCacheData(status); - + InternalService.PFetchCacheResult cacheResult = cache.getCacheData(status); + int rowCount = 0; + int dataSize = 0; + for (InternalService.PCacheValue value : cacheResult.getValuesList()) { + rowCount += value.getRowsCount(); + dataSize += value.getDataSize(); + } if (status.ok() && cacheResult != null) { LOG.debug("hit cache, mode {}, queryid {}, all count {}, value count {}, row count {}, data size {}", cacheMode, DebugUtil.printId(queryId), - cacheResult.all_count, cacheResult.value_count, - cacheResult.row_count, cacheResult.data_size); + cacheResult.getAllCount(), cacheResult.getValuesCount(), + rowCount, dataSize); } else { LOG.debug("miss cache, mode {}, queryid {}, code {}, msg {}", cacheMode, DebugUtil.printId(queryId), status.getErrorCode(), status.getErrorMsg()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheBeProxy.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheBeProxy.java index 064c86b3b5679c..d7e5eda3334675 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheBeProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheBeProxy.java @@ -18,14 +18,8 @@ package org.apache.doris.qe.cache; import org.apache.doris.common.Status; -import org.apache.doris.proto.PCacheResponse; -import org.apache.doris.proto.PCacheStatus; -import org.apache.doris.proto.PClearCacheRequest; -import org.apache.doris.proto.PClearType; -import org.apache.doris.proto.PFetchCacheRequest; -import org.apache.doris.proto.PFetchCacheResult; -import org.apache.doris.proto.PUniqueId; -import org.apache.doris.proto.PUpdateCacheRequest; +import org.apache.doris.proto.InternalService; +import org.apache.doris.proto.Types; import org.apache.doris.qe.SimpleScheduler; import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.rpc.RpcException; @@ -48,8 +42,8 @@ public class CacheBeProxy extends CacheProxy { private static final Logger LOG = LogManager.getLogger(CacheBeProxy.class); - public void updateCache(UpdateCacheRequest request, int timeoutMs, Status status) { - PUniqueId sqlKey = request.sql_key; + public void updateCache(InternalService.PUpdateCacheRequest request, int timeoutMs, Status status) { + Types.PUniqueId sqlKey = request.getSqlKey(); Backend backend = CacheCoordinator.getInstance().findBackend(sqlKey); if (backend == null) { LOG.warn("update cache can't find backend, sqlKey {}", sqlKey); @@ -57,13 +51,13 @@ public void updateCache(UpdateCacheRequest request, int timeoutMs, Status status } TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); try { - PUpdateCacheRequest updateRequest = request.getRpcRequest(); - Future future = BackendServiceProxy.getInstance().updateCache(address, updateRequest); - PCacheResponse response = future.get(timeoutMs, TimeUnit.MICROSECONDS); - if (response.status == PCacheStatus.CACHE_OK) { + Future future = BackendServiceProxy.getInstance() + .updateCache(address, request); + InternalService.PCacheResponse response = future.get(timeoutMs, TimeUnit.MICROSECONDS); + if (response.getStatus() == InternalService.PCacheStatus.CACHE_OK) { status.setStatus(new Status(TStatusCode.OK, "CACHE_OK")); } else { - status.setStatus(response.status.toString()); + status.setStatus(response.getStatus().toString()); } } catch (Exception e) { LOG.warn("update cache exception, sqlKey {}", sqlKey, e); @@ -72,35 +66,18 @@ public void updateCache(UpdateCacheRequest request, int timeoutMs, Status status } } - public FetchCacheResult fetchCache(FetchCacheRequest request, int timeoutMs, Status status) { - PUniqueId sqlKey = request.sql_key; + public InternalService.PFetchCacheResult fetchCache(InternalService.PFetchCacheRequest request, + int timeoutMs, Status status) { + Types.PUniqueId sqlKey = request.getSqlKey(); Backend backend = CacheCoordinator.getInstance().findBackend(sqlKey); if (backend == null) { return null; } TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); - long timeoutTs = System.currentTimeMillis() + timeoutMs; - FetchCacheResult result = null; try { - PFetchCacheRequest fetchRequest = request.getRpcRequest(); - Future future = BackendServiceProxy.getInstance().fetchCache(address, fetchRequest); - PFetchCacheResult fetchResult = null; - while (fetchResult == null) { - long currentTs = System.currentTimeMillis(); - if (currentTs >= timeoutTs) { - throw new TimeoutException("query cache timeout"); - } - fetchResult = future.get(timeoutTs - currentTs, TimeUnit.MILLISECONDS); - if (fetchResult.status == PCacheStatus.CACHE_OK) { - status = new Status(TStatusCode.OK, ""); - result = new FetchCacheResult(); - result.setResult(fetchResult); - return result; - } else { - status.setStatus(fetchResult.status.toString()); - return null; - } - } + Future future = BackendServiceProxy.getInstance() + .fetchCache(address, request); + return future.get(timeoutMs, TimeUnit.MILLISECONDS); } catch (RpcException e) { LOG.warn("fetch catch rpc exception, sqlKey {}, backend {}", sqlKey, backend.getId(), e); status.setRpcStatus(e.getMessage()); @@ -114,16 +91,15 @@ public FetchCacheResult fetchCache(FetchCacheRequest request, int timeoutMs, Sta } catch (TimeoutException e) { LOG.warn("fetch result timeout, sqlKey {}, backend {}", sqlKey, backend.getId(), e); status.setStatus("query timeout"); - } finally { } - return result; + return null; } - public void clearCache(PClearCacheRequest request) { + public void clearCache(InternalService.PClearCacheRequest request) { this.clearCache(request, CacheCoordinator.getInstance().getBackendList()); } - public void clearCache(PClearCacheRequest request, List beList) { + public void clearCache(InternalService.PClearCacheRequest request, List beList) { int retry; Status status = new Status(); for (Backend backend : beList) { @@ -143,18 +119,18 @@ public void clearCache(PClearCacheRequest request, List beList) { } } - protected boolean clearCache(PClearCacheRequest request, Backend backend, int timeoutMs, Status status) { + protected boolean clearCache(InternalService.PClearCacheRequest request, Backend backend, int timeoutMs, Status status) { TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); try { - request.clear_type = PClearType.CLEAR_ALL; + request = request.toBuilder().setClearType(InternalService.PClearType.CLEAR_ALL).build(); LOG.info("clear all backend cache, backendId {}", backend.getId()); - Future future = BackendServiceProxy.getInstance().clearCache(address, request); - PCacheResponse response = future.get(timeoutMs, TimeUnit.MICROSECONDS); - if (response.status == PCacheStatus.CACHE_OK) { + Future future = BackendServiceProxy.getInstance().clearCache(address, request); + InternalService.PCacheResponse response = future.get(timeoutMs, TimeUnit.MICROSECONDS); + if (response.getStatus() == InternalService.PCacheStatus.CACHE_OK) { status.setStatus(new Status(TStatusCode.OK, "CACHE_OK")); return true; } else { - status.setStatus(response.status.toString()); + status.setStatus(response.getStatus().toString()); return false; } } catch (Exception e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheCoordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheCoordinator.java index 97a73012a9c9ba..dbbb40313917a4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheCoordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheCoordinator.java @@ -18,7 +18,7 @@ package org.apache.doris.qe.cache; import org.apache.doris.catalog.Catalog; -import org.apache.doris.proto.PUniqueId; +import org.apache.doris.proto.Types; import org.apache.doris.qe.SimpleScheduler; import org.apache.doris.system.Backend; @@ -68,13 +68,13 @@ protected CacheCoordinator() { * @param sqlKey 128 bit's sql md5 * @return Backend */ - public Backend findBackend(PUniqueId sqlKey) { + public Backend findBackend(Types.PUniqueId sqlKey) { resetBackend(); Backend virtualNode = null; try { belock.lock(); - SortedMap headMap = virtualNodes.headMap(sqlKey.hi); - SortedMap tailMap = virtualNodes.tailMap(sqlKey.hi); + SortedMap headMap = virtualNodes.headMap(sqlKey.getHi()); + SortedMap tailMap = virtualNodes.tailMap(sqlKey.getHi()); int retryTimes = 0; while (true) { if (tailMap == null || tailMap.size() == 0) { @@ -131,9 +131,9 @@ private void clearBackend(ImmutableMap idToBackend) { if (!idToBackend.containsKey(bid)) { for (int i = 0; i < VIRTUAL_NODES; i++) { String nodeName = String.valueOf(bid) + "::" + String.valueOf(i); - PUniqueId nodeId = CacheBeProxy.getMd5(nodeName); - virtualNodes.remove(nodeId.hi); - LOG.debug("remove backend id {}, virtual node name {} hashcode {}", bid, nodeName, nodeId.hi); + Types.PUniqueId nodeId = CacheBeProxy.getMd5(nodeName); + virtualNodes.remove(nodeId.getHi()); + LOG.debug("remove backend id {}, virtual node name {} hashcode {}", bid, nodeName, nodeId.getHi()); } itr.remove(); } @@ -147,9 +147,9 @@ public void addBackend(Backend backend) { realNodes.put(backend.getId(), backend); for (int i = 0; i < VIRTUAL_NODES; i++) { String nodeName = String.valueOf(backend.getId()) + "::" + String.valueOf(i); - PUniqueId nodeId = CacheBeProxy.getMd5(nodeName); - virtualNodes.put(nodeId.hi, backend); - LOG.debug("add backend id {}, virtual node name {} hashcode {}", backend.getId(), nodeName, nodeId.hi); + Types.PUniqueId nodeId = CacheBeProxy.getMd5(nodeName); + virtualNodes.put(nodeId.getHi(), backend); + LOG.debug("add backend id {}, virtual node name {} hashcode {}", backend.getId(), nodeName, nodeId.getHi()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheProxy.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheProxy.java index a750737469fc9d..f9664d2664694e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheProxy.java @@ -18,25 +18,14 @@ package org.apache.doris.qe.cache; import org.apache.doris.common.Status; -import org.apache.doris.common.util.DebugUtil; -import org.apache.doris.proto.PCacheParam; -import org.apache.doris.proto.PCacheValue; -import org.apache.doris.proto.PClearCacheRequest; -import org.apache.doris.proto.PFetchCacheRequest; -import org.apache.doris.proto.PFetchCacheResult; -import org.apache.doris.proto.PUniqueId; -import org.apache.doris.proto.PUpdateCacheRequest; -import org.apache.doris.qe.RowBatch; -import org.apache.doris.thrift.TResultBatch; +import org.apache.doris.proto.InternalService; +import org.apache.doris.proto.Types; -import com.google.common.collect.Lists; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.nio.ByteBuffer; import java.security.MessageDigest; -import java.util.List; /** * It encapsulates the request and response parameters and methods, @@ -48,203 +37,6 @@ public abstract class CacheProxy { public static int UPDATE_TIMEOUT = 10000; public static int CLEAR_TIMEOUT = 30000; - public static class CacheParam extends PCacheParam { - public CacheParam(PCacheParam param) { - partition_key = param.partition_key; - last_version = param.last_version; - last_version_time = param.last_version_time; - } - - public CacheParam(long partitionKey, long lastVersion, long lastVersionTime) { - partition_key = partitionKey; - last_version = lastVersion; - last_version_time = lastVersionTime; - } - - public PCacheParam getParam() { - PCacheParam param = new PCacheParam(); - param.partition_key = partition_key; - param.last_version = last_version; - param.last_version_time = last_version_time; - return param; - } - - public void debug() { - LOG.info("cache param, part key {}, version {}, time {}", - partition_key, last_version, last_version_time); - } - } - - public static class CacheValue extends PCacheValue { - public CacheParam param; - public TResultBatch resultBatch; - - public CacheValue() { - param = null; - rows = Lists.newArrayList(); - data_size = 0; - resultBatch = new TResultBatch(); - } - - public void addRpcResult(PCacheValue value) { - param = new CacheParam(value.param); - data_size += value.data_size; - rows.addAll(value.rows); - } - - public RowBatch getRowBatch() { - for (byte[] one : rows) { - resultBatch.addToRows(ByteBuffer.wrap(one)); - } - RowBatch batch = new RowBatch(); - resultBatch.setPacketSeq(1); - resultBatch.setIsCompressed(false); - batch.setBatch(resultBatch); - batch.setEos(true); - return batch; - } - - public void addUpdateResult(long partitionKey, long lastVersion, long lastVersionTime, List rowList) { - param = new CacheParam(partitionKey, lastVersion, lastVersionTime); - for (byte[] buf : rowList) { - data_size += buf.length; - rows.add(buf); - } - } - - public PCacheValue getRpcValue() { - PCacheValue value = new PCacheValue(); - value.param = param.getParam(); - value.data_size = data_size; - value.rows = rows; - return value; - } - - public void debug() { - LOG.info("cache value, partkey {}, ver:{}, time {}, row_num {}, data_size {}", - param.partition_key, param.last_version, param.last_version_time, - rows.size(), - data_size); - for (int i = 0; i < rows.size(); i++) { - LOG.info("{}:{}", i, rows.get(i)); - } - } - } - - public static class UpdateCacheRequest extends PUpdateCacheRequest { - public int value_count; - public int row_count; - public int data_size; - private List valueList; - - public UpdateCacheRequest(String sqlStr) { - this.sql_key = getMd5(sqlStr); - this.valueList = Lists.newArrayList(); - value_count = 0; - row_count = 0; - data_size = 0; - } - - public void addValue(long partitionKey, long lastVersion, long lastVersionTime, List rowList) { - CacheValue value = new CacheValue(); - value.addUpdateResult(partitionKey, lastVersion, lastVersionTime, rowList); - valueList.add(value); - value_count++; - } - - public PUpdateCacheRequest getRpcRequest() { - value_count = valueList.size(); - PUpdateCacheRequest request = new PUpdateCacheRequest(); - request.values = Lists.newArrayList(); - request.sql_key = sql_key; - for (CacheValue value : valueList) { - request.values.add(value.getRpcValue()); - row_count += value.rows.size(); - data_size = value.data_size; - } - return request; - } - - public void debug() { - LOG.info("update cache request, sql_key {}, value_size {}", DebugUtil.printId(sql_key), - valueList.size()); - for (CacheValue value : valueList) { - value.debug(); - } - } - } - - - public static class FetchCacheRequest extends PFetchCacheRequest { - private List paramList; - - public FetchCacheRequest(String sqlStr) { - this.sql_key = getMd5(sqlStr); - this.paramList = Lists.newArrayList(); - } - - public void addParam(long partitionKey, long lastVersion, long lastVersionTime) { - CacheParam param = new CacheParam(partitionKey, lastVersion, lastVersionTime); - paramList.add(param); - } - - public PFetchCacheRequest getRpcRequest() { - PFetchCacheRequest request = new PFetchCacheRequest(); - request.params = Lists.newArrayList(); - request.sql_key = sql_key; - for (CacheParam param : paramList) { - request.params.add(param.getParam()); - } - return request; - } - - public void debug() { - LOG.info("fetch cache request, sql_key {}, param count {}", DebugUtil.printId(sql_key), paramList.size()); - for (CacheParam param : paramList) { - param.debug(); - } - } - } - - public static class FetchCacheResult extends PFetchCacheResult { - public int all_count; - public int value_count; - public int row_count; - public int data_size; - private List valueList; - - public FetchCacheResult() { - valueList = Lists.newArrayList(); - all_count = 0; - value_count = 0; - row_count = 0; - data_size = 0; - } - - public List getValueList() { - return valueList; - } - - public void setResult(PFetchCacheResult rpcResult) { - value_count = rpcResult.values.size(); - for (int i = 0; i < rpcResult.values.size(); i++) { - PCacheValue rpcValue = rpcResult.values.get(i); - CacheValue value = new CacheValue(); - value.addRpcResult(rpcValue); - valueList.add(value); - row_count += value.rows.size(); - data_size += value.data_size; - } - } - - public void debug() { - LOG.info("fetch cache result, value size {}", valueList.size()); - for (CacheValue value : valueList) { - value.debug(); - } - } - } - public enum CacheProxyType { FE, BE, @@ -265,14 +57,15 @@ public static CacheProxy getCacheProxy(CacheProxyType type) { return null; } - public abstract void updateCache(UpdateCacheRequest request, int timeoutMs, Status status); + public abstract void updateCache(InternalService.PUpdateCacheRequest request, int timeoutMs, Status status); - public abstract FetchCacheResult fetchCache(FetchCacheRequest request, int timeoutMs, Status status); + public abstract InternalService.PFetchCacheResult fetchCache(InternalService.PFetchCacheRequest request, + int timeoutMs, Status status); - public abstract void clearCache(PClearCacheRequest clearRequest); + public abstract void clearCache(InternalService.PClearCacheRequest clearRequest); - public static PUniqueId getMd5(String str) { + public static Types.PUniqueId getMd5(String str) { MessageDigest msgDigest; try { //128 bit @@ -281,9 +74,10 @@ public static PUniqueId getMd5(String str) { return null; } final byte[] digest = msgDigest.digest(str.getBytes()); - PUniqueId key = new PUniqueId(); - key.lo = getLongFromByte(digest, 0);//64 bit - key.hi = getLongFromByte(digest, 8);//64 bit + Types.PUniqueId key = Types.PUniqueId.newBuilder() + .setLo(getLongFromByte(digest, 0)) + .setHi(getLongFromByte(digest, 8)) + .build(); return key; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionCache.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionCache.java index a820eb68941f96..8c705be87b1e68 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionCache.java @@ -29,6 +29,7 @@ import org.apache.doris.common.Status; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.metric.MetricRepo; +import org.apache.doris.proto.InternalService; import org.apache.doris.qe.RowBatch; import org.apache.doris.thrift.TUniqueId; @@ -38,6 +39,7 @@ import org.apache.logging.log4j.Logger; import java.util.List; +import java.util.stream.Collectors; public class PartitionCache extends Cache { private static final Logger LOG = LogManager.getLogger(PartitionCache.class); @@ -73,33 +75,33 @@ public void setCacheInfo(CacheAnalyzer.CacheTable latestTable, RangePartitionInf this.newRangeList = Lists.newArrayList(); } - public CacheProxy.FetchCacheResult getCacheData(Status status) { - CacheProxy.FetchCacheRequest request; + public InternalService.PFetchCacheResult getCacheData(Status status) { + rewriteSelectStmt(null); - request = new CacheBeProxy.FetchCacheRequest(nokeyStmt.toSql()); range = new PartitionRange(this.partitionPredicate, this.olapTable, this.partitionInfo); if (!range.analytics()) { status.setStatus("analytics range error"); return null; } - - for (PartitionRange.PartitionSingle single : range.getPartitionSingleList()) { - request.addParam(single.getCacheKey().realValue(), - single.getPartition().getVisibleVersion(), - single.getPartition().getVisibleVersionTime() - ); - } - - CacheProxy.FetchCacheResult cacheResult = proxy.fetchCache(request, 10000, status); + InternalService.PFetchCacheRequest request = InternalService.PFetchCacheRequest.newBuilder() + .setSqlKey(CacheProxy.getMd5(nokeyStmt.toSql())) + .addAllParams(range.getPartitionSingleList().stream().map( + p -> InternalService.PCacheParam.newBuilder() + .setPartitionKey(p.getCacheKey().realValue()) + .setLastVersion(p.getPartition().getVisibleVersion()) + .setLastVersionTime(p.getPartition().getVisibleVersionTime()) + .build()).collect(Collectors.toList()) + ).build(); + InternalService.PFetchCacheResult cacheResult = proxy.fetchCache(request, 10000, status); if (status.ok() && cacheResult != null) { - cacheResult.all_count = range.getPartitionSingleList().size(); - for (CacheBeProxy.CacheValue value : cacheResult.getValueList()) { - range.setCacheFlag(value.param.partition_key); + for (InternalService.PCacheValue value : cacheResult.getValuesList()) { + range.setCacheFlag(value.getParam().getPartitionKey()); } + cacheResult = cacheResult.toBuilder().setAllCount(range.getPartitionSingleList().size()).build(); MetricRepo.COUNTER_CACHE_HIT_PARTITION.increase(1L); MetricRepo.COUNTER_CACHE_PARTITION_ALL.increase((long) range.getPartitionSingleList().size()); - MetricRepo.COUNTER_CACHE_PARTITION_HIT.increase((long) cacheResult.getValueList().size()); + MetricRepo.COUNTER_CACHE_PARTITION_HIT.increase((long) cacheResult.getValuesList().size()); } range.setTooNewByID(latestTable.latestPartitionId); @@ -125,15 +127,21 @@ public void updateCache() { return; } - CacheBeProxy.UpdateCacheRequest updateRequest = rowBatchBuilder.buildPartitionUpdateRequest(nokeyStmt.toSql()); - if (updateRequest.value_count > 0) { + InternalService.PUpdateCacheRequest updateRequest = rowBatchBuilder.buildPartitionUpdateRequest(nokeyStmt.toSql()); + if (updateRequest.getValuesCount() > 0) { CacheBeProxy proxy = new CacheBeProxy(); Status status = new Status(); proxy.updateCache(updateRequest, CacheProxy.UPDATE_TIMEOUT, status); + int rowCount = 0; + int dataSize = 0; + for (InternalService.PCacheValue value : updateRequest.getValuesList()) { + rowCount += value.getRowsCount(); + dataSize += value.getDataSize(); + } LOG.info("update cache model {}, queryid {}, sqlkey {}, value count {}, row count {}, data size {}", CacheAnalyzer.CacheMode.Partition, DebugUtil.printId(queryId), - DebugUtil.printId(updateRequest.sql_key), - updateRequest.value_count, updateRequest.row_count, updateRequest.data_size); + DebugUtil.printId(updateRequest.getSqlKey()), + updateRequest.getValuesCount(), rowCount, dataSize); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/RowBatchBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/RowBatchBuilder.java index e7f3a3afe31871..244b6fb959eac3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/RowBatchBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/RowBatchBuilder.java @@ -17,10 +17,10 @@ package org.apache.doris.qe.cache; -import com.google.common.collect.Lists; import org.apache.doris.analysis.Expr; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Type; +import org.apache.doris.proto.InternalService; import org.apache.doris.qe.RowBatch; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -30,14 +30,18 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; +import java.util.stream.Collectors; + +import com.google.common.collect.Lists; +import com.google.protobuf.ByteString; /** - * According to the query partition range and cache hit, the rowbatch to update the cache is constructed + * According to the query partition range and cache hit, the rowbatch to update the cache is constructed */ public class RowBatchBuilder { private static final Logger LOG = LogManager.getLogger(RowBatchBuilder.class); - private CacheBeProxy.UpdateCacheRequest updateRequest; + private InternalService.PUpdateCacheRequest updateRequest; private CacheAnalyzer.CacheMode cacheMode; private int keyIndex; private Type keyType; @@ -63,8 +67,8 @@ public RowBatchBuilder(CacheAnalyzer.CacheMode model) { } public void buildPartitionIndex(ArrayList resultExpr, - List columnLabel, Column partColumn, - List newSingleList) { + List columnLabel, Column partColumn, + List newSingleList) { if (cacheMode != CacheAnalyzer.CacheMode.Partition) { return; } @@ -95,11 +99,19 @@ public void copyRowData(RowBatch rowBatch) { } } - public CacheBeProxy.UpdateCacheRequest buildSqlUpdateRequest(String sql, long partitionKey, long lastVersion, long lastestTime) { + public InternalService.PUpdateCacheRequest buildSqlUpdateRequest(String sql, long partitionKey, long lastVersion, long lastestTime) { if (updateRequest == null) { - updateRequest = new CacheBeProxy.UpdateCacheRequest(sql); + updateRequest = InternalService.PUpdateCacheRequest.newBuilder().setSqlKey(CacheProxy.getMd5(sql)).build(); } - updateRequest.addValue(partitionKey, lastVersion, lastestTime, rowList); + updateRequest = updateRequest.toBuilder() + .addValues(InternalService.PCacheValue.newBuilder() + .setParam(InternalService.PCacheParam.newBuilder() + .setPartitionKey(partitionKey) + .setLastVersion(lastVersion) + .setLastVersionTime(lastestTime) + .build()).addAllRows( + rowList.stream().map(row -> ByteString.copyFrom(row)) + .collect(Collectors.toList()))).build(); return updateRequest; } @@ -115,7 +127,7 @@ public PartitionRange.PartitionKeyType getKeyFromRow(byte[] row, int index, Type if (i == index) { byte[] content = Arrays.copyOfRange(buf.array(), buf.position(), buf.position() + len); String str = new String(content); - key.init(type, str.toString()); + key.init(type, str); } } return key; @@ -124,9 +136,9 @@ public PartitionRange.PartitionKeyType getKeyFromRow(byte[] row, int index, Type /** * Rowbatch split to Row */ - public CacheBeProxy.UpdateCacheRequest buildPartitionUpdateRequest(String sql) { + public InternalService.PUpdateCacheRequest buildPartitionUpdateRequest(String sql) { if (updateRequest == null) { - updateRequest = new CacheBeProxy.UpdateCacheRequest(sql); + updateRequest = InternalService.PUpdateCacheRequest.newBuilder().setSqlKey(CacheProxy.getMd5(sql)).build(); } HashMap> partRowMap = new HashMap<>(); List partitionRowList; @@ -150,8 +162,15 @@ public CacheBeProxy.UpdateCacheRequest buildPartitionUpdateRequest(String sql) { Long key = entry.getKey(); PartitionRange.PartitionSingle partition = cachePartMap.get(key); partitionRowList = entry.getValue(); - updateRequest.addValue(key, partition.getPartition().getVisibleVersion(), - partition.getPartition().getVisibleVersionTime(), partitionRowList); + updateRequest = updateRequest.toBuilder() + .addValues(InternalService.PCacheValue.newBuilder() + .setParam(InternalService.PCacheParam.newBuilder() + .setPartitionKey(key) + .setLastVersion(partition.getPartition().getVisibleVersion()) + .setLastVersionTime(partition.getPartition().getVisibleVersionTime()) + .build()).addAllRows( + partitionRowList.stream().map(row -> ByteString.copyFrom(row)) + .collect(Collectors.toList()))).build(); } return updateRequest; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/SqlCache.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/SqlCache.java index dd67c6e87e364b..3d11af2e7df159 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/SqlCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/SqlCache.java @@ -21,6 +21,7 @@ import org.apache.doris.common.Status; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.metric.MetricRepo; +import org.apache.doris.proto.InternalService; import org.apache.doris.qe.RowBatch; import org.apache.doris.thrift.TUniqueId; import org.apache.logging.log4j.LogManager; @@ -37,13 +38,18 @@ public void setCacheInfo(CacheAnalyzer.CacheTable latestTable) { this.latestTable = latestTable; } - public CacheProxy.FetchCacheResult getCacheData(Status status) { - CacheProxy.FetchCacheRequest request = new CacheProxy.FetchCacheRequest(selectStmt.toSql()); - request.addParam(latestTable.latestPartitionId, latestTable.latestVersion, - latestTable.latestTime); - CacheProxy.FetchCacheResult cacheResult = proxy.fetchCache(request, 10000, status); + public InternalService.PFetchCacheResult getCacheData(Status status) { + InternalService.PFetchCacheRequest request = InternalService.PFetchCacheRequest.newBuilder() + .setSqlKey(CacheProxy.getMd5(selectStmt.toSql())) + .addParams(InternalService.PCacheParam.newBuilder() + .setPartitionKey(latestTable.latestPartitionId) + .setLastVersion(latestTable.latestVersion) + .setLastVersionTime(latestTable.latestTime)) + .build(); + + InternalService.PFetchCacheResult cacheResult = proxy.fetchCache(request, 10000, status); if (status.ok() && cacheResult != null) { - cacheResult.all_count = 1; + cacheResult = cacheResult.toBuilder().setAllCount(1).build(); MetricRepo.COUNTER_CACHE_HIT_SQL.increase(1L); hitRange = HitRange.Full; } @@ -66,15 +72,21 @@ public void updateCache() { return; } - CacheBeProxy.UpdateCacheRequest updateRequest = rowBatchBuilder.buildSqlUpdateRequest(selectStmt.toSql(), + InternalService.PUpdateCacheRequest updateRequest = rowBatchBuilder.buildSqlUpdateRequest(selectStmt.toSql(), latestTable.latestPartitionId, latestTable.latestVersion, latestTable.latestTime); - if (updateRequest.value_count > 0) { + if (updateRequest.getValuesCount() > 0) { CacheBeProxy proxy = new CacheBeProxy(); Status status = new Status(); proxy.updateCache(updateRequest, CacheProxy.UPDATE_TIMEOUT, status); + int rowCount = 0; + int dataSize = 0; + for (InternalService.PCacheValue value : updateRequest.getValuesList()) { + rowCount += value.getRowsCount(); + dataSize += value.getDataSize(); + } LOG.info("update cache model {}, queryid {}, sqlkey {}, value count {}, row count {}, data size {}", - CacheAnalyzer.CacheMode.Sql, DebugUtil.printId(queryId), DebugUtil.printId(updateRequest.sql_key), - updateRequest.value_count, updateRequest.row_count, updateRequest.data_size); + CacheAnalyzer.CacheMode.Sql, DebugUtil.printId(queryId), DebugUtil.printId(updateRequest.getSqlKey()), + updateRequest.getValuesCount(), rowCount, dataSize); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java new file mode 100644 index 00000000000000..93cae029dd2e47 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.rpc; + +import org.apache.doris.proto.InternalService; +import org.apache.doris.proto.PBackendServiceGrpc; +import org.apache.doris.thrift.TNetworkAddress; + +import java.util.concurrent.Future; + +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; + +public class BackendServiceClient { + private final PBackendServiceGrpc.PBackendServiceFutureStub stub; + private final ManagedChannel channel; + + public BackendServiceClient(TNetworkAddress address) { + this(ManagedChannelBuilder.forAddress(address.getHostname(), address.getPort()).usePlaintext()); + } + + public BackendServiceClient(ManagedChannelBuilder channelBuilder) { + channel = channelBuilder.enableRetry().maxRetryAttempts(3).build(); + stub = PBackendServiceGrpc.newFutureStub(channel); + } + + public Future execPlanFragmentAsync( + InternalService.PExecPlanFragmentRequest request) { + return stub.execPlanFragment(request); + } + + public Future cancelPlanFragmentAsync( + InternalService.PCancelPlanFragmentRequest request) { + return stub.cancelPlanFragment(request); + } + + public Future fetchDataAsync(InternalService.PFetchDataRequest request) { + return stub.fetchData(request); + } + + public Future updateCache(InternalService.PUpdateCacheRequest request) { + return stub.updateCache(request); + } + + public Future fetchCache(InternalService.PFetchCacheRequest request) { + return stub.fetchCache(request); + } + + public Future clearCache(InternalService.PClearCacheRequest request) { + return stub.clearCache(request); + } + + public Future triggerProfileReport( + InternalService.PTriggerProfileReportRequest request) { + return stub.triggerProfileReport(request); + } + + public Future getInfo(InternalService.PProxyRequest request) { + return stub.getInfo(request); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index 5d2c3afe69b5d0..ad557e0ccf6491 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -17,60 +17,28 @@ package org.apache.doris.rpc; -import org.apache.doris.common.Config; -import org.apache.doris.common.util.JdkUtils; -import org.apache.doris.proto.PCacheResponse; -import org.apache.doris.proto.PCancelPlanFragmentRequest; -import org.apache.doris.proto.PCancelPlanFragmentResult; -import org.apache.doris.proto.PClearCacheRequest; -import org.apache.doris.proto.PExecPlanFragmentResult; -import org.apache.doris.proto.PFetchCacheRequest; -import org.apache.doris.proto.PFetchCacheResult; -import org.apache.doris.proto.PFetchDataResult; -import org.apache.doris.proto.PPlanFragmentCancelReason; -import org.apache.doris.proto.PProxyRequest; -import org.apache.doris.proto.PProxyResult; -import org.apache.doris.proto.PTriggerProfileReportResult; -import org.apache.doris.proto.PUniqueId; -import org.apache.doris.proto.PUpdateCacheRequest; +import org.apache.doris.proto.InternalService; +import org.apache.doris.proto.Types; import org.apache.doris.thrift.TExecPlanFragmentParams; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TUniqueId; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; - -import com.baidu.bjf.remoting.protobuf.utils.JDKCompilerHelper; -import com.baidu.bjf.remoting.protobuf.utils.compiler.JdkCompiler; -import com.baidu.jprotobuf.pbrpc.client.ProtobufRpcProxy; -import com.baidu.jprotobuf.pbrpc.transport.RpcClient; -import com.baidu.jprotobuf.pbrpc.transport.RpcClientOptions; -import com.google.common.collect.Maps; +import org.apache.thrift.TSerializer; import java.util.Map; -import java.util.NoSuchElementException; import java.util.concurrent.Future; +import com.google.common.collect.Maps; +import com.google.protobuf.ByteString; + public class BackendServiceProxy { private static final Logger LOG = LogManager.getLogger(BackendServiceProxy.class); - - private RpcClient rpcClient; - // TODO(zc): use TNetworkAddress, - private Map serviceMap; - private static volatile BackendServiceProxy INSTANCE; - - static { - int javaRuntimeVersion = JdkUtils.getJavaVersionAsInteger(System.getProperty("java.version")); - JDKCompilerHelper.setCompiler(new JdkCompiler(JdkCompiler.class.getClassLoader(), String.valueOf(javaRuntimeVersion))); - } + private final Map serviceMap; public BackendServiceProxy() { - final RpcClientOptions rpcOptions = new RpcClientOptions(); - rpcOptions.setMaxWait(Config.brpc_idle_wait_max_time); - rpcOptions.setThreadPoolSize(Config.brpc_number_of_concurrent_requests_processed); - rpcClient = new RpcClient(rpcOptions); serviceMap = Maps.newHashMap(); } @@ -85,42 +53,24 @@ public static BackendServiceProxy getInstance() { return INSTANCE; } - private synchronized PBackendService getProxy(TNetworkAddress address) { - PBackendService service = serviceMap.get(address); + private synchronized BackendServiceClient getProxy(TNetworkAddress address) { + BackendServiceClient service = serviceMap.get(address); if (service != null) { return service; } - ProtobufRpcProxy proxy = new ProtobufRpcProxy(rpcClient, PBackendService.class); - proxy.setHost(address.getHostname()); - proxy.setPort(address.getPort()); - service = proxy.proxy(); + service = new BackendServiceClient(address); serviceMap.put(address, service); return service; } - public Future execPlanFragmentAsync( + public Future execPlanFragmentAsync( TNetworkAddress address, TExecPlanFragmentParams tRequest) throws TException, RpcException { - final PExecPlanFragmentRequest pRequest = new PExecPlanFragmentRequest(); - pRequest.setRequest(tRequest); + final InternalService.PExecPlanFragmentRequest pRequest = InternalService.PExecPlanFragmentRequest.newBuilder() + .setRequest(ByteString.copyFrom(new TSerializer().serialize(tRequest))).build(); try { - final PBackendService service = getProxy(address); - return service.execPlanFragmentAsync(pRequest); - } catch (NoSuchElementException e) { - try { - // retry - try { - Thread.sleep(10); - } catch (InterruptedException interruptedException) { - // do nothing - } - final PBackendService service = getProxy(address); - return service.execPlanFragmentAsync(pRequest); - } catch (NoSuchElementException noSuchElementException) { - LOG.warn("Execute plan fragment retry failed, address={}:{}", - address.getHostname(), address.getPort(), noSuchElementException); - throw new RpcException(address.hostname, e.getMessage()); - } + final BackendServiceClient client = getProxy(address); + return client.execPlanFragmentAsync(pRequest); } catch (Throwable e) { LOG.warn("Execute plan fragment catch a exception, address={}:{}", address.getHostname(), address.getPort(), e); @@ -128,32 +78,17 @@ public Future execPlanFragmentAsync( } } - public Future cancelPlanFragmentAsync( - TNetworkAddress address, TUniqueId finstId, PPlanFragmentCancelReason cancelReason) throws RpcException { - final PCancelPlanFragmentRequest pRequest = new PCancelPlanFragmentRequest(); - PUniqueId uid = new PUniqueId(); - uid.hi = finstId.hi; - uid.lo = finstId.lo; - pRequest.finst_id = uid; - pRequest.cancel_reason = cancelReason; + public Future cancelPlanFragmentAsync( + TNetworkAddress address, TUniqueId finstId, InternalService.PPlanFragmentCancelReason cancelReason) + throws RpcException { + final InternalService.PCancelPlanFragmentRequest pRequest = InternalService.PCancelPlanFragmentRequest + .newBuilder() + .setFinstId( + Types.PUniqueId.newBuilder().setHi(finstId.hi).setLo(finstId.lo).build()) + .setCancelReason(cancelReason).build(); try { - final PBackendService service = getProxy(address); - return service.cancelPlanFragmentAsync(pRequest); - } catch (NoSuchElementException e) { - // retry - try { - try { - Thread.sleep(10); - } catch (InterruptedException interruptedException) { - // do nothing - } - final PBackendService service = getProxy(address); - return service.cancelPlanFragmentAsync(pRequest); - } catch (NoSuchElementException noSuchElementException) { - LOG.warn("Cancel plan fragment retry failed, address={}:{}", - address.getHostname(), address.getPort(), noSuchElementException); - throw new RpcException(address.hostname, e.getMessage()); - } + final BackendServiceClient client = getProxy(address); + return client.cancelPlanFragmentAsync(pRequest); } catch (Throwable e) { LOG.warn("Cancel plan fragment catch a exception, address={}:{}", address.getHostname(), address.getPort(), e); @@ -161,11 +96,11 @@ public Future cancelPlanFragmentAsync( } } - public Future fetchDataAsync( - TNetworkAddress address, PFetchDataRequest request) throws RpcException { + public Future fetchDataAsync( + TNetworkAddress address, InternalService.PFetchDataRequest request) throws RpcException { try { - PBackendService service = getProxy(address); - return service.fetchDataAsync(request); + final BackendServiceClient client = getProxy(address); + return client.fetchDataAsync(request); } catch (Throwable e) { LOG.warn("fetch data catch a exception, address={}:{}", address.getHostname(), address.getPort(), e); @@ -173,11 +108,11 @@ public Future fetchDataAsync( } } - public Future updateCache( - TNetworkAddress address, PUpdateCacheRequest request) throws RpcException{ + public Future updateCache( + TNetworkAddress address, InternalService.PUpdateCacheRequest request) throws RpcException { try { - PBackendService service = getProxy(address); - return service.updateCache(request); + final BackendServiceClient client = getProxy(address); + return client.updateCache(request); } catch (Throwable e) { LOG.warn("update cache catch a exception, address={}:{}", address.getHostname(), address.getPort(), e); @@ -185,11 +120,11 @@ public Future updateCache( } } - public Future fetchCache( - TNetworkAddress address, PFetchCacheRequest request) throws RpcException { + public Future fetchCache( + TNetworkAddress address, InternalService.PFetchCacheRequest request) throws RpcException { try { - PBackendService service = getProxy(address); - return service.fetchCache(request); + final BackendServiceClient client = getProxy(address); + return client.fetchCache(request); } catch (Throwable e) { LOG.warn("fetch cache catch a exception, address={}:{}", address.getHostname(), address.getPort(), e); @@ -197,11 +132,11 @@ public Future fetchCache( } } - public Future clearCache( - TNetworkAddress address, PClearCacheRequest request) throws RpcException { + public Future clearCache( + TNetworkAddress address, InternalService.PClearCacheRequest request) throws RpcException { try { - PBackendService service = getProxy(address); - return service.clearCache(request); + final BackendServiceClient client = getProxy(address); + return client.clearCache(request); } catch (Throwable e) { LOG.warn("clear cache catch a exception, address={}:{}", address.getHostname(), address.getPort(), e); @@ -209,12 +144,11 @@ public Future clearCache( } } - - public Future triggerProfileReportAsync( - TNetworkAddress address, PTriggerProfileReportRequest request) throws RpcException { + public Future triggerProfileReportAsync( + TNetworkAddress address, InternalService.PTriggerProfileReportRequest request) throws RpcException { try { - final PBackendService service = getProxy(address); - return service.triggerProfileReport(request); + final BackendServiceClient client = getProxy(address); + return client.triggerProfileReport(request); } catch (Throwable e) { LOG.warn("fetch data catch a exception, address={}:{}", address.getHostname(), address.getPort(), e); @@ -222,11 +156,11 @@ public Future triggerProfileReportAsync( } } - public Future getInfo( - TNetworkAddress address, PProxyRequest request) throws RpcException { + public Future getInfo( + TNetworkAddress address, InternalService.PProxyRequest request) throws RpcException { try { - final PBackendService service = getProxy(address); - return service.getInfo(request); + final BackendServiceClient client = getProxy(address); + return client.getInfo(request); } catch (Throwable e) { LOG.warn("failed to get info, address={}:{}", address.getHostname(), address.getPort(), e); throw new RpcException(address.hostname, e.getMessage()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/PBackendService.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/PBackendService.java deleted file mode 100644 index 6e95fe6c0ea0df..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/PBackendService.java +++ /dev/null @@ -1,67 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.rpc; - -import org.apache.doris.proto.PCacheResponse; -import org.apache.doris.proto.PCancelPlanFragmentRequest; -import org.apache.doris.proto.PCancelPlanFragmentResult; -import org.apache.doris.proto.PClearCacheRequest; -import org.apache.doris.proto.PExecPlanFragmentResult; -import org.apache.doris.proto.PFetchCacheRequest; -import org.apache.doris.proto.PFetchCacheResult; -import org.apache.doris.proto.PFetchDataResult; -import org.apache.doris.proto.PProxyRequest; -import org.apache.doris.proto.PProxyResult; -import org.apache.doris.proto.PTriggerProfileReportResult; -import org.apache.doris.proto.PUpdateCacheRequest; - -import com.baidu.jprotobuf.pbrpc.ProtobufRPC; - -import java.util.concurrent.Future; - -public interface PBackendService { - @ProtobufRPC(serviceName = "PBackendService", methodName = "exec_plan_fragment", - attachmentHandler = ThriftClientAttachmentHandler.class, onceTalkTimeout = 10000) - Future execPlanFragmentAsync(PExecPlanFragmentRequest request); - - @ProtobufRPC(serviceName = "PBackendService", methodName = "cancel_plan_fragment", - onceTalkTimeout = 5000) - Future cancelPlanFragmentAsync(PCancelPlanFragmentRequest request); - - // we set timeout to 1 day, because now there is no way to give different timeout for each RPC call - @ProtobufRPC(serviceName = "PBackendService", methodName = "fetch_data", - attachmentHandler = ThriftClientAttachmentHandler.class, onceTalkTimeout = 86400000) - Future fetchDataAsync(PFetchDataRequest request); - - @ProtobufRPC(serviceName = "PBackendService", methodName = "update_cache", onceTalkTimeout = 10000) - Future updateCache(PUpdateCacheRequest request); - - @ProtobufRPC(serviceName = "PBackendService", methodName = "fetch_cache", onceTalkTimeout = 10000) - Future fetchCache(PFetchCacheRequest request); - - @ProtobufRPC(serviceName = "PBackendService", methodName = "clear_cache", onceTalkTimeout = 10000) - Future clearCache(PClearCacheRequest request); - - @ProtobufRPC(serviceName = "PBackendService", methodName = "trigger_profile_report", - attachmentHandler = ThriftClientAttachmentHandler.class, onceTalkTimeout = 10000) - Future triggerProfileReport(PTriggerProfileReportRequest request); - - @ProtobufRPC(serviceName = "PBackendService", methodName = "get_info", onceTalkTimeout = 10000) - Future getInfo(PProxyRequest request); -} - diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/PExecPlanFragmentRequest.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/PExecPlanFragmentRequest.java deleted file mode 100644 index a183795435570f..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/PExecPlanFragmentRequest.java +++ /dev/null @@ -1,24 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.rpc; - -import com.baidu.bjf.remoting.protobuf.annotation.ProtobufClass; - -@ProtobufClass -public class PExecPlanFragmentRequest extends AttachmentRequest { -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/PFetchDataRequest.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/PFetchDataRequest.java deleted file mode 100644 index 7cad9d44182559..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/PFetchDataRequest.java +++ /dev/null @@ -1,33 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.rpc; - -import org.apache.doris.proto.PUniqueId; - -import com.baidu.bjf.remoting.protobuf.annotation.Protobuf; -import com.baidu.bjf.remoting.protobuf.annotation.ProtobufClass; - -@ProtobufClass -public class PFetchDataRequest extends AttachmentRequest { - public PFetchDataRequest() {} - public PFetchDataRequest(PUniqueId finstId) { - this.finstId = finstId; - } - @Protobuf(order = 1, required = true) - public PUniqueId finstId; -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/PTriggerProfileReportRequest.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/PTriggerProfileReportRequest.java deleted file mode 100644 index 2bfce900273f83..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/PTriggerProfileReportRequest.java +++ /dev/null @@ -1,42 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.rpc; - -import org.apache.doris.proto.PUniqueId; - -import com.baidu.bjf.remoting.protobuf.FieldType; -import com.baidu.bjf.remoting.protobuf.annotation.Protobuf; -import com.baidu.bjf.remoting.protobuf.annotation.ProtobufClass; -import com.google.common.collect.Lists; - -import java.util.List; - -@ProtobufClass -public class PTriggerProfileReportRequest extends AttachmentRequest { - - @Protobuf(fieldType = FieldType.OBJECT, order = 1, required = false) - List instanceIds; - - public PTriggerProfileReportRequest() { - } - - public PTriggerProfileReportRequest(List instanceIds) { - this.instanceIds = Lists.newArrayList(); - this.instanceIds.addAll(instanceIds); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/ThriftClientAttachmentHandler.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/ThriftClientAttachmentHandler.java deleted file mode 100644 index a6f05c92e198db..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/ThriftClientAttachmentHandler.java +++ /dev/null @@ -1,33 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.rpc; - -import com.baidu.jprotobuf.pbrpc.ClientAttachmentHandler; - -public class ThriftClientAttachmentHandler implements ClientAttachmentHandler { - @Override - public byte[] handleRequest(String serviceName, String methodName, Object... objects) { - AttachmentRequest request = (AttachmentRequest) objects[0]; - return request.getSerializedRequest(); - } - @Override - public void handleResponse(byte[] bytes, String serviceName, String methodName, Object... objects) { - AttachmentRequest result = (AttachmentRequest) objects[0]; - result.setSerializedResult(bytes); - } -} diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java index 613aa300cf0b8a..f0b6c5ad5967f1 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java @@ -1469,7 +1469,7 @@ public void testFromUnixTimeRewrite() throws Exception { explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "EXPLAIN " + sql); System.out.println("wangxixu-explain:"+explainString); Assert.assertTrue(explainString.contains("PREDICATES: `query_time` < 1614614400, `query_time` >= 0")); - + } @Test @@ -1493,6 +1493,3 @@ public void testSetVarInQueryStmt() throws Exception { } } } - - - diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java index 74216081aa52ef..2a7fe57144e175 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java @@ -28,7 +28,7 @@ import org.apache.doris.mysql.MysqlOkPacket; import org.apache.doris.mysql.MysqlSerializer; import org.apache.doris.plugin.AuditEvent.AuditEventBuilder; -import org.apache.doris.proto.PQueryStatistics; +import org.apache.doris.proto.Data; import org.apache.doris.thrift.TUniqueId; import org.junit.Assert; @@ -55,7 +55,7 @@ public class ConnectProcessorTest { @Mocked private static SocketChannel socketChannel; - private static PQueryStatistics statistics = new PQueryStatistics(); + private static Data.PQueryStatistics statistics = Data.PQueryStatistics.newBuilder().build(); @BeforeClass public static void setUpClass() { @@ -97,10 +97,7 @@ public static void setUpClass() { serializer.writeEofString(""); fieldListPacket = serializer.toByteBuffer(); } - - statistics.scan_bytes = 0L; - statistics.scan_rows = 0L; - statistics.cpu_ms = 0L; + statistics = statistics.toBuilder().setCpuMs(0L).setScanRows(0).setScanBytes(0).build(); MetricRepo.init(); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/PartitionCacheTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/PartitionCacheTest.java index 4f1525f9f62922..f2959f8ba600e7 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/PartitionCacheTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/PartitionCacheTest.java @@ -52,7 +52,7 @@ import org.apache.doris.planner.OlapScanNode; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.ScanNode; -import org.apache.doris.proto.PUniqueId; +import org.apache.doris.proto.Types; import org.apache.doris.qe.cache.Cache; import org.apache.doris.qe.cache.CacheAnalyzer; import org.apache.doris.qe.cache.CacheAnalyzer.CacheMode; @@ -433,15 +433,12 @@ public void testCacheNode() throws Exception { cp.addBackend(bd2); cp.addBackend(bd3); - PUniqueId key1 = new PUniqueId(); - key1.hi = 1L; - key1.lo = 1L; + Types.PUniqueId key1 = Types.PUniqueId.newBuilder().setHi(1L).setLo(1L).build(); Backend bk = cp.findBackend(key1); Assert.assertNotNull(bk); Assert.assertEquals(bk.getId(),3); - - key1.hi = 669560558156283345L; - key1.lo = 1L; + + key1 = key1.toBuilder().setHi(669560558156283345L).build(); bk = cp.findBackend(key1); Assert.assertNotNull(bk); Assert.assertEquals(bk.getId(),1); diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackend.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackend.java index 844828b80fd058..0924fdc01457d8 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackend.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackend.java @@ -18,15 +18,14 @@ package org.apache.doris.utframe; import org.apache.doris.common.ThriftServer; -import org.apache.doris.common.util.JdkUtils; +import org.apache.doris.proto.PBackendServiceGrpc; import org.apache.doris.thrift.BackendService; import org.apache.doris.thrift.HeartbeatService; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.utframe.MockedBackendFactory.BeThriftService; -import com.baidu.bjf.remoting.protobuf.utils.JDKCompilerHelper; -import com.baidu.bjf.remoting.protobuf.utils.compiler.JdkCompiler; -import com.baidu.jprotobuf.pbrpc.transport.RpcServer; +import io.grpc.Server; +import io.grpc.ServerBuilder; import org.apache.thrift.TProcessor; @@ -48,7 +47,7 @@ public class MockedBackend { private ThriftServer heartbeatServer; private ThriftServer beThriftServer; - private RpcServer rpcServer; + private Server backendServer; private String host; private int heartbeatPort; @@ -59,15 +58,10 @@ public class MockedBackend { // This must be set explicitly after creating mocked Backend private TNetworkAddress feAddress; - static { - int javaRuntimeVersion = JdkUtils.getJavaVersionAsInteger(System.getProperty("java.version")); - JDKCompilerHelper.setCompiler(new JdkCompiler(JdkCompiler.class.getClassLoader(), String.valueOf(javaRuntimeVersion))); - } - public MockedBackend(String host, int heartbeatPort, int thriftPort, int brpcPort, int httpPort, HeartbeatService.Iface hbService, - BeThriftService backendService, - Object pBackendService) throws IOException { + BeThriftService backendService, PBackendServiceGrpc.PBackendServiceImplBase pBackendService) + throws IOException { this.host = host; this.heartbeatPort = heartbeatPort; @@ -116,7 +110,7 @@ public void start() throws IOException { System.out.println("Be heartbeat service is started with port: " + heartbeatPort); beThriftServer.start(); System.out.println("Be thrift service is started with port: " + thriftPort); - rpcServer.start(brpcPort); + backendServer.start(); System.out.println("Be brpc service is started with port: " + brpcPort); } @@ -130,8 +124,8 @@ private void createBeThriftService(int beThriftPort, BackendService.Iface servic beThriftServer = new ThriftServer(beThriftPort, tprocessor); } - private void createBrpcService(int brpcPort, Object pBackendServiceImpl) { - rpcServer = new RpcServer(); - rpcServer.registerService(pBackendServiceImpl); + private void createBrpcService(int brpcPort, PBackendServiceGrpc.PBackendServiceImplBase pBackendServiceImpl) { + backendServer = ServerBuilder.forPort(brpcPort) + .addService(pBackendServiceImpl).build(); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java index 06b6ecd729a209..2dd7e803b822c7 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java @@ -18,18 +18,10 @@ package org.apache.doris.utframe; import org.apache.doris.common.ClientPool; -import org.apache.doris.proto.PCancelPlanFragmentRequest; -import org.apache.doris.proto.PCancelPlanFragmentResult; -import org.apache.doris.proto.PExecPlanFragmentResult; -import org.apache.doris.proto.PFetchDataResult; -import org.apache.doris.proto.PProxyRequest; -import org.apache.doris.proto.PProxyResult; -import org.apache.doris.proto.PQueryStatistics; -import org.apache.doris.proto.PStatus; -import org.apache.doris.proto.PTriggerProfileReportResult; -import org.apache.doris.rpc.PExecPlanFragmentRequest; -import org.apache.doris.rpc.PFetchDataRequest; -import org.apache.doris.rpc.PTriggerProfileReportRequest; +import org.apache.doris.proto.Data; +import org.apache.doris.proto.InternalService; +import org.apache.doris.proto.PBackendServiceGrpc; +import org.apache.doris.proto.Status; import org.apache.doris.thrift.BackendService; import org.apache.doris.thrift.FrontendService; import org.apache.doris.thrift.HeartbeatService; @@ -70,9 +62,9 @@ import org.apache.doris.thrift.TTransmitDataResult; import org.apache.doris.thrift.TUniqueId; -import com.baidu.jprotobuf.pbrpc.ProtobufRPCService; import com.google.common.collect.Maps; import com.google.common.collect.Queues; +import io.grpc.stub.StreamObserver; import org.apache.thrift.TException; @@ -94,16 +86,10 @@ public class MockedBackendFactory { public static final int BE_DEFAULT_BRPC_PORT = 8060; public static final int BE_DEFAULT_HTTP_PORT = 8040; - // create a default mocked backend with 3 default rpc services - public static MockedBackend createDefaultBackend() throws IOException { - return createBackend(BE_DEFAULT_IP, BE_DEFAULT_HEARTBEAT_PORT, BE_DEFAULT_THRIFT_PORT, BE_DEFAULT_BRPC_PORT, BE_DEFAULT_HTTP_PORT, - new DefaultHeartbeatServiceImpl(BE_DEFAULT_THRIFT_PORT, BE_DEFAULT_HTTP_PORT, BE_DEFAULT_BRPC_PORT), - new DefaultBeThriftServiceImpl(), new DefaultPBackendServiceImpl()); - } - // create a mocked backend with customize parameters public static MockedBackend createBackend(String host, int heartbeatPort, int thriftPort, int brpcPort, int httpPort, - HeartbeatService.Iface hbService, BeThriftService beThriftService, Object pBackendService) + HeartbeatService.Iface hbService, BeThriftService beThriftService, + PBackendServiceGrpc.PBackendServiceImplBase pBackendService) throws IOException { MockedBackend backend = new MockedBackend(host, heartbeatPort, thriftPort, brpcPort, httpPort, hbService, beThriftService, pBackendService); @@ -284,54 +270,94 @@ public TScanCloseResult closeScanner(TScanCloseParams params) throws TException } // The default Brpc service. - // TODO(cmy): Currently this service cannot correctly simulate the processing of query requests. - public static class DefaultPBackendServiceImpl { - @ProtobufRPCService(serviceName = "PBackendService", methodName = "exec_plan_fragment") - public PExecPlanFragmentResult exec_plan_fragment(PExecPlanFragmentRequest request) { + public static class DefaultPBackendServiceImpl extends PBackendServiceGrpc.PBackendServiceImplBase { + @Override + public void transmitData(InternalService.PTransmitDataParams request, StreamObserver responseObserver) { + responseObserver.onNext(InternalService.PTransmitDataResult.newBuilder() + .setStatus(Status.PStatus.newBuilder().setStatusCode(0)).build()); + responseObserver.onCompleted(); + } + + @Override + public void execPlanFragment(InternalService.PExecPlanFragmentRequest request, StreamObserver responseObserver) { System.out.println("get exec_plan_fragment request"); - PExecPlanFragmentResult result = new PExecPlanFragmentResult(); - PStatus pStatus = new PStatus(); - pStatus.status_code = 0; - result.status = pStatus; - return result; + responseObserver.onNext(InternalService.PExecPlanFragmentResult.newBuilder() + .setStatus(Status.PStatus.newBuilder().setStatusCode(0)).build()); + responseObserver.onCompleted(); } - @ProtobufRPCService(serviceName = "PBackendService", methodName = "cancel_plan_fragment") - public PCancelPlanFragmentResult cancel_plan_fragment(PCancelPlanFragmentRequest request) { + @Override + public void cancelPlanFragment(InternalService.PCancelPlanFragmentRequest request, StreamObserver responseObserver) { System.out.println("get cancel_plan_fragment request"); - PCancelPlanFragmentResult result = new PCancelPlanFragmentResult(); - PStatus pStatus = new PStatus(); - pStatus.status_code = 0; - result.status = pStatus; - return result; + responseObserver.onNext(InternalService.PCancelPlanFragmentResult.newBuilder() + .setStatus(Status.PStatus.newBuilder().setStatusCode(0)).build()); + responseObserver.onCompleted(); } - @ProtobufRPCService(serviceName = "PBackendService", methodName = "fetch_data") - public PFetchDataResult fetchDataAsync(PFetchDataRequest request) { - System.out.println("get fetch_data"); - PFetchDataResult result = new PFetchDataResult(); - PStatus pStatus = new PStatus(); - pStatus.status_code = 0; + @Override + public void fetchData(InternalService.PFetchDataRequest request, StreamObserver responseObserver) { + System.out.println("get fetch_data request"); + responseObserver.onNext(InternalService.PFetchDataResult.newBuilder() + .setStatus(Status.PStatus.newBuilder().setStatusCode(0)) + .setQueryStatistics(Data.PQueryStatistics.newBuilder() + .setScanRows(0L) + .setScanBytes(0L)) + .setEos(true) + .setPacketSeq(0L) + .build()); + responseObserver.onCompleted(); + } - PQueryStatistics pQueryStatistics = new PQueryStatistics(); - pQueryStatistics.scan_rows = 0L; - pQueryStatistics.scan_bytes = 0L; + @Override + public void tabletWriterOpen(InternalService.PTabletWriterOpenRequest request, StreamObserver responseObserver) { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } - result.status = pStatus; - result.packet_seq = 0L; - result.query_statistics = pQueryStatistics; - result.eos = true; - return result; + @Override + public void tabletWriterAddBatch(InternalService.PTabletWriterAddBatchRequest request, StreamObserver responseObserver) { + responseObserver.onNext(null); + responseObserver.onCompleted(); } - @ProtobufRPCService(serviceName = "PBackendService", methodName = "trigger_profile_report") - public PTriggerProfileReportResult triggerProfileReport(PTriggerProfileReportRequest request) { - return null; + @Override + public void tabletWriterCancel(InternalService.PTabletWriterCancelRequest request, StreamObserver responseObserver) { + responseObserver.onNext(null); + responseObserver.onCompleted(); } - @ProtobufRPCService(serviceName = "PBackendService", methodName = "get_info") - public PProxyResult getInfo(PProxyRequest request) { - return null; + @Override + public void triggerProfileReport(InternalService.PTriggerProfileReportRequest request, StreamObserver responseObserver) { + System.out.println("get triggerProfileReport request"); + responseObserver.onNext(InternalService.PTriggerProfileReportResult.newBuilder() + .setStatus(Status.PStatus.newBuilder().setStatusCode(0)).build()); + responseObserver.onCompleted(); + } + + @Override + public void getInfo(InternalService.PProxyRequest request, StreamObserver responseObserver) { + System.out.println("get get_info request"); + responseObserver.onNext(InternalService.PProxyResult.newBuilder() + .setStatus(Status.PStatus.newBuilder().setStatusCode(0)).build()); + responseObserver.onCompleted(); + } + + @Override + public void updateCache(InternalService.PUpdateCacheRequest request, StreamObserver responseObserver) { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + + @Override + public void fetchCache(InternalService.PFetchCacheRequest request, StreamObserver responseObserver) { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + + @Override + public void clearCache(InternalService.PClearCacheRequest request, StreamObserver responseObserver) { + responseObserver.onNext(null); + responseObserver.onCompleted(); } } } diff --git a/fe/pom.xml b/fe/pom.xml index 9419c871ce4ed7..6297db2ccfd17b 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -48,7 +48,8 @@ under the License. UTF-8 1.8 1.8 - 2.2.11 + 1.18.0 + 3.5.1 false @@ -273,40 +274,12 @@ under the License. 2.10.1 - - - com.baidu - jprotobuf - ${jprotobuf.version} - jar-with-dependencies - - - - - com.baidu - jprotobuf-rpc-common - 1.8 - - commons-io commons-io 2.6 - - - com.baidu - jprotobuf-rpc-core - 3.5.21 - - - com.baidu - jprotobuf - - - - org.json @@ -386,11 +359,32 @@ under the License. 2.1 + + io.grpc + grpc-netty + ${grpc.version} + provided + + + + io.grpc + grpc-protobuf + ${grpc.version} + provided + + + + io.grpc + grpc-stub + ${grpc.version} + provided + + com.google.protobuf protobuf-java - 3.5.1 + ${protobuf.version} diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index d5da5f45929193..5da20ce3521c51 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -112,6 +112,7 @@ message PTabletWriterCancelResult { }; message PExecPlanFragmentRequest { + optional bytes request = 1; }; message PExecPlanFragmentResult { @@ -145,6 +146,7 @@ message PFetchDataResult { optional int64 packet_seq = 2; optional bool eos = 3; optional PQueryStatistics query_statistics = 4; + optional bytes row_batch = 5; }; //Add message definition to fetch and update cache @@ -190,6 +192,7 @@ message PFetchCacheRequest { message PFetchCacheResult { required PCacheStatus status = 1; repeated PCacheValue values = 2; + optional int64 all_count = 3 [default = 0]; }; enum PClearType { From e7dcbf98fac6da87621526233ce21b185280d7c7 Mon Sep 17 00:00:00 2001 From: yangzhg Date: Fri, 26 Mar 2021 12:01:15 +0800 Subject: [PATCH 2/2] rebase master --- fe/fe-core/pom.xml | 6 ++++++ .../src/main/java/org/apache/doris/qe/Coordinator.java | 5 ++--- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml index 75ea786d014b64..d38adb9941141b 100644 --- a/fe/fe-core/pom.xml +++ b/fe/fe-core/pom.xml @@ -281,6 +281,12 @@ under the License. mysql-connector-java + + + io.netty + netty-all + + org.objenesis diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 7d628ba095c503..e6202e9498444d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -467,7 +467,8 @@ private void sendFragment() throws TException, RpcException, UserException { int instanceNum = params.instanceExecParams.size(); Preconditions.checkState(instanceNum > 0); List tParams = params.toThrift(backendIdx); - List>> futures = Lists.newArrayList(); + List>> futures = + Lists.newArrayList(); // update memory limit for colocate join if (colocateFragmentIds.contains(fragment.getFragmentId().asInt())) { @@ -1208,7 +1209,6 @@ private void computeColocateJoinInstanceParam(PlanFragmentId fragmentId, int par for (List>>> perInstanceScanRange : perInstanceScanRanges) { FInstanceExecParam instanceParam = new FInstanceExecParam(null, addressScanRange.getKey(), 0, params); - for (Pair>> nodeScanRangeMap : perInstanceScanRange) { instanceParam.bucketSeqSet.add(nodeScanRangeMap.first); for (Map.Entry> nodeScanRange : nodeScanRangeMap.second.entrySet()) { @@ -1691,7 +1691,6 @@ private void computeInstanceParam(PlanFragmentId fragmentId, int parallelExecIns } } - private BucketSeqToScanRange bucketSeqToScanRange = new BucketSeqToScanRange(); private Map> fragmentIdToSeqToAddressMap = Maps.newHashMap(); // cache the fragment id to its scan node ids. Used for colocate join.