Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 8 additions & 32 deletions fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.doris.analysis.BoolLiteral;
import org.apache.doris.analysis.DecimalLiteral;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.FloatLiteral;
import org.apache.doris.analysis.IntLiteral;
import org.apache.doris.analysis.LiteralExpr;
Expand Down Expand Up @@ -63,11 +62,11 @@
import org.apache.doris.plugin.AuditEvent.AuditEventBuilder;
import org.apache.doris.resource.Tag;
import org.apache.doris.service.arrowflight.results.FlightSqlChannel;
import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.Histogram;
import org.apache.doris.system.Backend;
import org.apache.doris.task.LoadTaskInfo;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TResultSinkType;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;
Expand Down Expand Up @@ -134,10 +133,7 @@ public enum ConnectType {
protected volatile String peerIdentity;
private final Map<String, String> preparedQuerys = new HashMap<>();
private String runningQuery;
private TNetworkAddress resultFlightServerAddr;
private TNetworkAddress resultInternalServiceAddr;
private ArrayList<Expr> resultOutputExprs;
private TUniqueId finstId;
private final List<FlightSqlEndpointsLocation> flightSqlEndpointsLocations = Lists.newArrayList();
private boolean returnResultFromLocal = true;
// mysql net
protected volatile MysqlChannel mysqlChannel;
Expand Down Expand Up @@ -730,36 +726,16 @@ public String getRunningQuery() {
return runningQuery;
}

public void setResultFlightServerAddr(TNetworkAddress resultFlightServerAddr) {
this.resultFlightServerAddr = resultFlightServerAddr;
public void addFlightSqlEndpointsLocation(FlightSqlEndpointsLocation flightSqlEndpointsLocation) {
this.flightSqlEndpointsLocations.add(flightSqlEndpointsLocation);
}

public TNetworkAddress getResultFlightServerAddr() {
return resultFlightServerAddr;
public List<FlightSqlEndpointsLocation> getFlightSqlEndpointsLocations() {
return flightSqlEndpointsLocations;
}

public void setResultInternalServiceAddr(TNetworkAddress resultInternalServiceAddr) {
this.resultInternalServiceAddr = resultInternalServiceAddr;
}

public TNetworkAddress getResultInternalServiceAddr() {
return resultInternalServiceAddr;
}

public void setResultOutputExprs(ArrayList<Expr> resultOutputExprs) {
this.resultOutputExprs = resultOutputExprs;
}

public ArrayList<Expr> getResultOutputExprs() {
return resultOutputExprs;
}

public void setFinstId(TUniqueId finstId) {
this.finstId = finstId;
}

public TUniqueId getFinstId() {
return finstId;
public void clearFlightSqlEndpointsLocations() {
flightSqlEndpointsLocations.clear();
}

public void setReturnResultFromLocal(boolean returnResultFromLocal) {
Expand Down
32 changes: 16 additions & 16 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.doris.rpc.RpcException;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.task.LoadEtlTask;
Expand Down Expand Up @@ -733,29 +734,27 @@ protected void execInternal() throws Exception {
enableParallelResultSink = queryOptions.isEnableParallelOutfile();
}

TNetworkAddress execBeAddr = topParams.instanceExecParams.get(0).host;
Set<TNetworkAddress> addrs = new HashSet<>();
for (FInstanceExecParam param : topParams.instanceExecParams) {
if (addrs.contains(param.host)) {
continue;
}
addrs.add(param.host);
receivers.add(new ResultReceiver(queryId, param.instanceId, addressToBackendID.get(param.host),
toBrpcHost(param.host), this.timeoutDeadline,
context.getSessionVariable().getMaxMsgSizeOfResultReceiver(), enableParallelResultSink));
}

if (!context.isReturnResultFromLocal()) {
Preconditions.checkState(context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL));
if (enableParallelResultSink) {
context.setFinstId(queryId);
if (context.isReturnResultFromLocal()) {
receivers.add(new ResultReceiver(queryId, param.instanceId, addressToBackendID.get(param.host),
toBrpcHost(param.host), this.timeoutDeadline,
context.getSessionVariable().getMaxMsgSizeOfResultReceiver(), enableParallelResultSink));
} else {
context.setFinstId(topParams.instanceExecParams.get(0).instanceId);
Preconditions.checkState(context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL));
TUniqueId finstId;
if (enableParallelResultSink) {
finstId = queryId;
} else {
finstId = topParams.instanceExecParams.get(0).instanceId;
}
context.addFlightSqlEndpointsLocation(new FlightSqlEndpointsLocation(finstId,
toArrowFlightHost(param.host), toBrpcHost(param.host), fragments.get(0).getOutputExprs()));
}
context.setFinstId(topParams.instanceExecParams.get(0).instanceId);
context.setResultFlightServerAddr(toArrowFlightHost(execBeAddr));
context.setResultInternalServiceAddr(toBrpcHost(execBeAddr));
context.setResultOutputExprs(fragments.get(0).getOutputExprs());
}

LOG.info("dispatch result sink of query {} to {}", DebugUtil.printId(queryId),
Expand All @@ -766,7 +765,8 @@ protected void execInternal() throws Exception {
// set the broker address for OUTFILE sink
ResultFileSink topResultFileSink = (ResultFileSink) topDataSink;
FsBroker broker = Env.getCurrentEnv().getBrokerMgr()
.getBroker(topResultFileSink.getBrokerName(), execBeAddr.getHostname());
.getBroker(topResultFileSink.getBrokerName(),
topParams.instanceExecParams.get(0).host.getHostname());
topResultFileSink.setBrokerAddr(broker.host, broker.port);
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.doris.qe.runtime.ThriftPlansBuilder;
import org.apache.doris.resource.workloadgroup.QueryQueue;
import org.apache.doris.resource.workloadgroup.QueueToken;
import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TErrorTabletInfo;
import org.apache.doris.thrift.TNetworkAddress;
Expand Down Expand Up @@ -90,7 +91,7 @@ public NereidsCoordinator(ConnectContext context, Analyzer analyzer,
this.coordinatorContext.setJobProcessor(buildJobProcessor(coordinatorContext));

Preconditions.checkState(!planner.getFragments().isEmpty()
&& coordinatorContext.instanceNum.get() > 0, "Fragment and Instance can not be empty˚");
&& coordinatorContext.instanceNum.get() > 0, "Fragment and Instance can not be empty˚");
}

// broker load
Expand Down Expand Up @@ -431,18 +432,22 @@ private void setForArrowFlight(CoordinatorContext coordinatorContext, PipelineDi
if (dataSink instanceof ResultSink || dataSink instanceof ResultFileSink) {
if (connectContext != null && !connectContext.isReturnResultFromLocal()) {
Preconditions.checkState(connectContext.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL));

AssignedJob firstInstance = topPlan.getInstanceJobs().get(0);
BackendWorker worker = (BackendWorker) firstInstance.getAssignedWorker();
Backend backend = worker.getBackend();

connectContext.setFinstId(firstInstance.instanceId());
if (backend.getArrowFlightSqlPort() < 0) {
throw new IllegalStateException("be arrow_flight_sql_port cannot be empty.");
for (AssignedJob instance : topPlan.getInstanceJobs()) {
BackendWorker worker = (BackendWorker) instance.getAssignedWorker();
Backend backend = worker.getBackend();
if (backend.getArrowFlightSqlPort() < 0) {
throw new IllegalStateException("be arrow_flight_sql_port cannot be empty.");
}
TUniqueId finstId;
if (connectContext.getSessionVariable().enableParallelResultSink()) {
finstId = getQueryId();
} else {
finstId = instance.instanceId();
}
connectContext.addFlightSqlEndpointsLocation(new FlightSqlEndpointsLocation(finstId,
backend.getArrowFlightAddress(), backend.getBrpcAddress(),
topPlan.getFragmentJob().getFragment().getOutputExprs()));
}
connectContext.setResultFlightServerAddr(backend.getArrowFlightAddress());
connectContext.setResultInternalServiceAddr(backend.getBrpcAddress());
connectContext.setResultOutputExprs(topPlan.getFragmentJob().getFragment().getOutputExprs());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
import org.apache.doris.mysql.MysqlCommand;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation;
import org.apache.doris.service.arrowflight.results.FlightSqlResultCacheEntry;
import org.apache.doris.service.arrowflight.sessions.FlightSessionsManager;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
Expand Down Expand Up @@ -187,6 +189,7 @@ private FlightInfo executeQueryStatement(String peerIdentity, ConnectContext con
Preconditions.checkState(!query.isEmpty());
// After the previous query was executed, there was no getStreamStatement to take away the result.
connectContext.getFlightSqlChannel().reset();
connectContext.clearFlightSqlEndpointsLocations();
try (FlightSqlConnectProcessor flightSQLConnectProcessor = new FlightSqlConnectProcessor(connectContext)) {
flightSQLConnectProcessor.handleQuery(query);
if (connectContext.getState().getStateType() == MysqlStateType.ERR) {
Expand Down Expand Up @@ -225,50 +228,52 @@ private FlightInfo executeQueryStatement(String peerIdentity, ConnectContext con
}
} else {
// Now only query stmt will pull results from BE.
Schema schema = flightSQLConnectProcessor.fetchArrowFlightSchema(5000);
if (schema == null) {
flightSQLConnectProcessor.fetchArrowFlightSchema(5000);
if (flightSQLConnectProcessor.getArrowSchema() == null) {
throw CallStatus.INTERNAL.withDescription("fetch arrow flight schema is null")
.toRuntimeException();
}

TUniqueId queryId = connectContext.queryId();
if (!connectContext.getSessionVariable().enableParallelResultSink()) {
// only one instance
queryId = connectContext.getFinstId();
}
// Ticket contains the IP and Brpc Port of the Doris BE node where the query result is located.
final ByteString handle = ByteString.copyFromUtf8(
DebugUtil.printId(queryId) + "&" + connectContext.getResultInternalServiceAddr().hostname
+ "&" + connectContext.getResultInternalServiceAddr().port + "&" + query);
TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder().setStatementHandle(handle)
.build();
Ticket ticket = new Ticket(Any.pack(ticketStatement).toByteArray());
// TODO Support multiple endpoints.
Location location;
if (flightSQLConnectProcessor.getPublicAccessAddr().isSetHostname()) {
// In a production environment, it is often inconvenient to expose Doris BE nodes
// to the external network.
// However, a reverse proxy (such as nginx) can be added to all Doris BE nodes,
// and the external client will be randomly routed to a Doris BE node when connecting to nginx.
// The query results of Arrow Flight SQL will be randomly saved on a Doris BE node.
// If it is different from the Doris BE node randomly routed by nginx,
// data forwarding needs to be done inside the Doris BE node.
if (flightSQLConnectProcessor.getPublicAccessAddr().isSetPort()) {
location = Location.forGrpcInsecure(
flightSQLConnectProcessor.getPublicAccessAddr().hostname,
flightSQLConnectProcessor.getPublicAccessAddr().port);
List<FlightEndpoint> endpoints = Lists.newArrayList();
for (FlightSqlEndpointsLocation endpointLoc : connectContext.getFlightSqlEndpointsLocations()) {
TUniqueId tid = endpointLoc.getFinstId();
// Ticket contains the IP and Brpc Port of the Doris BE node where the query result is located.
final ByteString handle = ByteString.copyFromUtf8(
DebugUtil.printId(tid) + "&" + endpointLoc.getResultInternalServiceAddr().hostname + "&"
+ endpointLoc.getResultInternalServiceAddr().port + "&" + query);
TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder()
.setStatementHandle(handle).build();
Ticket ticket = new Ticket(Any.pack(ticketStatement).toByteArray());
Location location;
if (endpointLoc.getResultPublicAccessAddr().isSetHostname()) {
// In a production environment, it is often inconvenient to expose Doris BE nodes
// to the external network.
// However, a reverse proxy (such as nginx) can be added to all Doris BE nodes,
// and the external client will be randomly routed to a Doris BE node when connecting
// to nginx.
// The query results of Arrow Flight SQL will be randomly saved on a Doris BE node.
// If it is different from the Doris BE node randomly routed by nginx,
// data forwarding needs to be done inside the Doris BE node.
if (endpointLoc.getResultPublicAccessAddr().isSetPort()) {
location = Location.forGrpcInsecure(endpointLoc.getResultPublicAccessAddr().hostname,
endpointLoc.getResultPublicAccessAddr().port);
} else {
location = Location.forGrpcInsecure(endpointLoc.getResultPublicAccessAddr().hostname,
endpointLoc.getResultFlightServerAddr().port);
}
} else {
location = Location.forGrpcInsecure(
flightSQLConnectProcessor.getPublicAccessAddr().hostname,
connectContext.getResultFlightServerAddr().port);
location = Location.forGrpcInsecure(endpointLoc.getResultFlightServerAddr().hostname,
endpointLoc.getResultFlightServerAddr().port);
}
} else {
location = Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname,
connectContext.getResultFlightServerAddr().port);
// By default, the query results of all BE nodes will be aggregated to one BE node.
// ADBC Client will only receive one endpoint and pull data from the BE node
// corresponding to this endpoint.
// `set global enable_parallel_result_sink=true;` to allow each BE to return query results
// separately. ADBC Client will receive multiple endpoints and pull data from each endpoint.
endpoints.add(new FlightEndpoint(ticket, location));
}
List<FlightEndpoint> endpoints = Collections.singletonList(new FlightEndpoint(ticket, location));
// TODO Set in BE callback after query end, Client will not callback.
return new FlightInfo(schema, descriptor, endpoints, -1, -1);
return new FlightInfo(flightSQLConnectProcessor.getArrowSchema(), descriptor, endpoints, -1, -1);
}
}
} catch (Exception e) {
Expand Down
Loading
Loading