Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/runtime/buffer_control_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ void GetResultBatchCtx::on_data(TFetchDataResult* t_result, int64_t packet_seq,
ThriftSerializer ser(false, 4096);
auto st = ser.serialize(&t_result->result_batch, &len, &buf);
if (st.ok()) {
// TODO(yangzhengguo) this is just for compatible with old version, this should be removed in the release 0.15
cntl->response_attachment().append(buf, len);
result->set_row_batch(std::string(buf, buf + len));
result->set_packet_seq(packet_seq);
result->set_eos(eos);
} else {
Expand Down
11 changes: 8 additions & 3 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,13 @@ void PInternalServiceImpl<T>::exec_plan_fragment(google::protobuf::RpcController
google::protobuf::Closure* done) {
brpc::ClosureGuard closure_guard(done);
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
auto st = _exec_plan_fragment(cntl);
auto st = Status::OK();
if (request->has_request()) {
st = _exec_plan_fragment(request->request());
} else {
// TODO(yangzhengguo) this is just for compatible with old version, this should be removed in the release 0.15
st = _exec_plan_fragment(cntl->request_attachment().to_string());
}
if (!st.ok()) {
LOG(WARNING) << "exec plan fragment failed, errmsg=" << st.get_error_msg();
}
Expand Down Expand Up @@ -129,8 +135,7 @@ void PInternalServiceImpl<T>::tablet_writer_cancel(google::protobuf::RpcControll
}

template <typename T>
Status PInternalServiceImpl<T>::_exec_plan_fragment(brpc::Controller* cntl) {
auto ser_request = cntl->request_attachment().to_string();
Status PInternalServiceImpl<T>::_exec_plan_fragment(const std::string& ser_request) {
TExecPlanFragmentParams t_request;
{
const uint8_t* buf = (const uint8_t*)ser_request.data();
Expand Down
2 changes: 1 addition & 1 deletion be/src/service/internal_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class PInternalServiceImpl : public T {
PCacheResponse* response, google::protobuf::Closure* done) override;

private:
Status _exec_plan_fragment(brpc::Controller* cntl);
Status _exec_plan_fragment(const std::string& s_request);

private:
ExecEnv* _exec_env;
Expand Down
101 changes: 44 additions & 57 deletions fe/fe-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -229,36 +229,11 @@ under the License.
<artifactId>joda-time</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/com.baidu/jprotobuf -->
<dependency>
<groupId>com.baidu</groupId>
<artifactId>jprotobuf</artifactId>
<classifier>jar-with-dependencies</classifier>
</dependency>

<!-- https://mvnrepository.com/artifact/com.baidu/jprotobuf-rpc-common -->
<dependency>
<groupId>com.baidu</groupId>
<artifactId>jprotobuf-rpc-common</artifactId>
</dependency>

<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/com.baidu/jprotobuf-rpc-core -->
<dependency>
<groupId>com.baidu</groupId>
<artifactId>jprotobuf-rpc-core</artifactId>
<exclusions>
<exclusion>
<artifactId>servlet-api</artifactId>
<groupId>javax.servlet</groupId>
</exclusion>
</exclusions>
</dependency>

<!-- https://mvnrepository.com/artifact/org.json/json -->
<dependency>
<groupId>org.json</groupId>
Expand Down Expand Up @@ -607,6 +582,20 @@ under the License.
<artifactId>tree-printer</artifactId>
</dependency>

<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
</dependency>

<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
</dependency>

<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -674,28 +663,45 @@ under the License.
</configuration>
</plugin>

<!-- run make to generate Version and builtin -->
<!-- also parse the proto for FE -->

<!-- protobuf -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>3.0.0</version>
<groupId>com.github.os72</groupId>
<artifactId>protoc-jar-maven-plugin</artifactId>
<version>3.11.1</version>
<executions>
<execution>
<id>make-dir</id>
<phase>generate-sources</phase>
<goals>
<goal>exec</goal>
<goal>run</goal>
</goals>
<configuration>
<executable>mkdir</executable>
<arguments>
<argument>-p</argument>
<argument>${basedir}/target/generated-sources/proto</argument>
</arguments>
<skip>${skip.plugin}</skip>
<!-- <protocArtifact>com.google.protobuf:protoc:${protobuf.version}</protocArtifact>-->
<protocCommand>${doris.thirdparty}/installed/bin/protoc</protocCommand>
<protocVersion>${protobuf.version}</protocVersion>
<inputDirectories>
<include>${doris.home}/gensrc/proto</include>
</inputDirectories>
<outputTargets>
<outputTarget>
<type>java</type>
</outputTarget>
<outputTarget>
<type>grpc-java</type>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}</pluginArtifact>
</outputTarget>
</outputTargets>
</configuration>
</execution>
</executions>
</plugin>
<!-- run make to generate Version and builtin -->
<!-- also parse the proto for FE -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>gensrc</id>
<phase>generate-sources</phase>
Expand All @@ -711,24 +717,6 @@ under the License.
<skip>${skip.plugin}</skip>
</configuration>
</execution>
<execution>
<id>gen_proto</id>
<phase>generate-sources</phase>
<goals>
<!-- DO NOT use goal 'java', it will terminate the VM after done -->
<goal>exec</goal>
</goals>
<configuration>
<executable>${java.home}/bin/java</executable>
<arguments>
<argument>-jar</argument>
<argument>${settings.localRepository}/com/baidu/jprotobuf/${jprotobuf.version}/jprotobuf-${jprotobuf.version}-jar-with-dependencies.jar</argument>
<argument>--java_out=${basedir}/target/generated-sources/proto</argument>
<argument>${doris.home}/gensrc/proto/internal_service.proto</argument>
</arguments>
<skip>${skip.plugin}</skip>
</configuration>
</execution>
</executions>
</plugin>

Expand All @@ -748,7 +736,6 @@ under the License.
<sources>
<!-- add arbitrary num of src dirs here -->
<source>${basedir}/target/generated-sources/build/</source>
<source>${basedir}/target/generated-sources/proto/</source>
</sources>
</configuration>
</execution>
Expand Down
8 changes: 4 additions & 4 deletions fe/fe-core/src/main/java/org/apache/doris/common/Status.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.doris.common;

import org.apache.doris.proto.PStatus;
import org.apache.doris.proto.Status.PStatus;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;

Expand Down Expand Up @@ -81,9 +81,9 @@ public void setStatus(String msg) {
}

public void setPstatus(PStatus status) {
this.errorCode = TStatusCode.findByValue(status.status_code);
if (status.error_msgs != null && !status.error_msgs.isEmpty()) {
this.errorMsg = status.error_msgs.get(0);
this.errorCode = TStatusCode.findByValue(status.getStatusCode());
if (!status.getErrorMsgsList().isEmpty()) {
this.errorMsg = status.getErrorMsgs(0);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@
import org.apache.doris.common.util.Counter;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.RuntimeProfile;
import org.apache.doris.proto.PTriggerProfileReportResult;
import org.apache.doris.proto.PUniqueId;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.Types;
import org.apache.doris.qe.QueryStatisticsItem;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.PTriggerProfileReportRequest;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TNetworkAddress;
Expand Down Expand Up @@ -203,23 +202,24 @@ private void triggerProfileReport(Collection<QueryStatisticsItem> items, boolean
}
// specified query instance which will report.
if (!allQuery) {
final PUniqueId pUId = new PUniqueId();
pUId.hi = instanceInfo.getInstanceId().hi;
pUId.lo = instanceInfo.getInstanceId().lo;
final Types.PUniqueId pUId = Types.PUniqueId.newBuilder()
.setHi(instanceInfo.getInstanceId().hi)
.setLo(instanceInfo.getInstanceId().lo)
.build();
request.addInstanceId(pUId);
}
}
}
recvResponse(sendRequest(requests));
}

private List<Pair<Request, Future<PTriggerProfileReportResult>>> sendRequest(
private List<Pair<Request, Future<InternalService.PTriggerProfileReportResult>>> sendRequest(
Map<TNetworkAddress, Request> requests) throws AnalysisException {
final List<Pair<Request, Future<PTriggerProfileReportResult>>> futures = Lists.newArrayList();
final List<Pair<Request, Future<InternalService.PTriggerProfileReportResult>>> futures = Lists.newArrayList();
for (TNetworkAddress address : requests.keySet()) {
final Request request = requests.get(address);
final PTriggerProfileReportRequest pbRequest =
new PTriggerProfileReportRequest(request.getInstanceIds());
final InternalService.PTriggerProfileReportRequest pbRequest = InternalService.PTriggerProfileReportRequest
.newBuilder().addAllInstanceIds(request.getInstanceIds()).build();
try {
futures.add(Pair.create(request, BackendServiceProxy.getInstance().
triggerProfileReportAsync(address, pbRequest)));
Expand All @@ -230,18 +230,18 @@ private List<Pair<Request, Future<PTriggerProfileReportResult>>> sendRequest(
return futures;
}

private void recvResponse(List<Pair<Request, Future<PTriggerProfileReportResult>>> futures)
private void recvResponse(List<Pair<Request, Future<InternalService.PTriggerProfileReportResult>>> futures)
throws AnalysisException {
final String reasonPrefix = "Fail to receive result.";
for (Pair<Request, Future<PTriggerProfileReportResult>> pair : futures) {
for (Pair<Request, Future<InternalService.PTriggerProfileReportResult>> pair : futures) {
try {
final PTriggerProfileReportResult result
final InternalService.PTriggerProfileReportResult result
= pair.second.get(2, TimeUnit.SECONDS);
final TStatusCode code = TStatusCode.findByValue(result.status.status_code);
final TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
String errMsg = "";
if (result.status.error_msgs != null && !result.status.error_msgs.isEmpty()) {
errMsg = result.status.error_msgs.get(0);
if (!result.getStatus().getErrorMsgsList().isEmpty()) {
errMsg = result.getStatus().getErrorMsgs(0);
}
throw new AnalysisException(reasonPrefix + " backend:" + pair.first.getAddress()
+ " reason:" + errMsg);
Expand Down Expand Up @@ -347,7 +347,7 @@ public long getScanBytes() {

private static class Request {
private final TNetworkAddress address;
private final List<PUniqueId> instanceIds;
private final List<Types.PUniqueId> instanceIds;

public Request(TNetworkAddress address) {
this.address = address;
Expand All @@ -358,11 +358,11 @@ public TNetworkAddress getAddress() {
return address;
}

public List<PUniqueId> getInstanceIds() {
public List<Types.PUniqueId> getInstanceIds() {
return instanceIds;
}

public void addInstanceId(PUniqueId instanceId) {
public void addInstanceId(Types.PUniqueId instanceId) {
this.instanceIds.add(instanceId);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.doris.common.util;

import org.apache.doris.common.Pair;
import org.apache.doris.proto.PUniqueId;
import org.apache.doris.proto.Types;
import org.apache.doris.thrift.TUniqueId;

import java.io.PrintWriter;
Expand Down Expand Up @@ -135,9 +135,9 @@ public static String printId(final UUID id) {
return builder.toString();
}

public static String printId(final PUniqueId id) {
public static String printId(final Types.PUniqueId id) {
StringBuilder builder = new StringBuilder();
builder.append(Long.toHexString(id.hi)).append("-").append(Long.toHexString(id.lo));
builder.append(Long.toHexString(id.getHi())).append("-").append(Long.toHexString(id.getLo()));
return builder.toString();
}

Expand Down
Loading