Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ public void addChildren(List <? extends NodeType > n) {
public ArrayList<NodeType> getChildren() { return children; }
public void clearChildren() { children.clear(); }

public void removeNode(int i){
if (children != null && i>=0 && i< children.size()) {
children.remove(i);
}
}

/**
* Count the total number of nodes in this tree. Leaf node will return 1.
* Non-leaf node will include all its children.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class ProfileManager {
public static final String SQL_STATEMENT = "Sql Statement";
public static final String USER = "User";
public static final String DEFAULT_DB = "Default Db";
public static final String IS_CACHED = "IsCached";

public static final ArrayList<String> PROFILE_HEADERS = new ArrayList(
Arrays.asList(QUERY_ID, USER, DEFAULT_DB, SQL_STATEMENT, QUERY_TYPE,
Expand Down
105 changes: 97 additions & 8 deletions fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@
import org.apache.doris.planner.Planner;
import org.apache.doris.proto.PQueryStatistics;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.cache.Cache;
import org.apache.doris.qe.cache.CacheAnalyzer;
import org.apache.doris.qe.cache.CacheAnalyzer.CacheMode;
import org.apache.doris.qe.cache.CacheBeProxy;
import org.apache.doris.qe.cache.CacheProxy;
import org.apache.doris.rewrite.ExprRewriter;
import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
import org.apache.doris.rpc.RpcException;
Expand Down Expand Up @@ -114,6 +119,7 @@ public class StmtExecutor {
private boolean isProxy;
private ShowResultSet proxyResultSet = null;
private PQueryStatistics statisticsForAuditLog;
private boolean isCached;

// this constructor is mainly for proxy
public StmtExecutor(ConnectContext context, OriginStatement originStmt, boolean isProxy) {
Expand Down Expand Up @@ -155,6 +161,8 @@ public void initProfile(long beginTimeInNanoSecond) {
summaryProfile.addInfoString(ProfileManager.USER, context.getQualifiedUser());
summaryProfile.addInfoString(ProfileManager.DEFAULT_DB, context.getDatabase());
summaryProfile.addInfoString(ProfileManager.SQL_STATEMENT, originStmt.originStmt);
summaryProfile.addInfoString(ProfileManager.IS_CACHED, isCached ? "Yes" : "No");

profile.addChild(summaryProfile);
if (coord != null) {
coord.getQueryProfile().getCounterTotalTime().setValue(TimeUtils.getEstimatedTime(beginTimeInNanoSecond));
Expand Down Expand Up @@ -581,6 +589,78 @@ private void handleSetStmt() {
context.getState().setOk();
}

private void sendChannel(MysqlChannel channel, List<CacheProxy.CacheValue> cacheValues, boolean isEos)
throws Exception {
RowBatch batch = null;
for (CacheBeProxy.CacheValue value : cacheValues) {
batch = value.getRowBatch();
for (ByteBuffer row : batch.getBatch().getRows()) {
channel.sendOnePacket(row);
}
context.updateReturnRows(batch.getBatch().getRows().size());
}
if (isEos) {
if (batch != null) {
statisticsForAuditLog = batch.getQueryStatistics();
}
context.getState().setEof();
return;
}
}

private boolean handleCacheStmt(CacheAnalyzer cacheAnalyzer,MysqlChannel channel) throws Exception {
RowBatch batch = null;
CacheBeProxy.FetchCacheResult cacheResult = cacheAnalyzer.getCacheData();
CacheMode mode = cacheAnalyzer.getCacheMode();
if (cacheResult != null) {
isCached = true;
if (cacheAnalyzer.getHitRange() == Cache.HitRange.Full) {
sendChannel(channel, cacheResult.getValueList(), true);
return true;
}
//rewrite sql
if (mode == CacheMode.Partition) {
if (cacheAnalyzer.getHitRange() == Cache.HitRange.Left) {
sendChannel(channel, cacheResult.getValueList(), false);
}
SelectStmt newSelectStmt = cacheAnalyzer.getRewriteStmt();
newSelectStmt.reset();
analyzer = new Analyzer(context.getCatalog(), context);
newSelectStmt.analyze(analyzer);
planner = new Planner();
planner.plan(newSelectStmt, analyzer, context.getSessionVariable().toThrift());
}
}

coord = new Coordinator(context, analyzer, planner);
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord));
coord.exec();

while (true) {
batch = coord.getNext();
if (batch.getBatch() != null) {
cacheAnalyzer.copyRowBatch(batch);
for (ByteBuffer row : batch.getBatch().getRows()) {
channel.sendOnePacket(row);
}
context.updateReturnRows(batch.getBatch().getRows().size());
}
if (batch.isEos()) {
break;
}
}

if (cacheResult != null && cacheAnalyzer.getHitRange() == Cache.HitRange.Right) {
sendChannel(channel, cacheResult.getValueList(), false);
}

cacheAnalyzer.updateCache();
statisticsForAuditLog = batch.getQueryStatistics();
context.getState().setEof();
return false;
}

// Process a select statement.
private void handleQueryStmt() throws Exception {
// Every time set no send flag and clean all data in buffer
Expand All @@ -601,12 +681,6 @@ private void handleQueryStmt() throws Exception {
handleExplainStmt(explainString);
return;
}
coord = new Coordinator(context, analyzer, planner);

QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord));

coord.exec();

// send result
// 1. If this is a query with OUTFILE clause, eg: select * from tbl1 into outfile xxx,
Expand All @@ -620,6 +694,21 @@ private void handleQueryStmt() throws Exception {
MysqlChannel channel = context.getMysqlChannel();
boolean isOutfileQuery = queryStmt.hasOutFileClause();
boolean isSendFields = false;
if (!isOutfileQuery) {
sendFields(queryStmt.getColLabels(), queryStmt.getResultExprs());
}

//Sql and PartitionCache
CacheAnalyzer cacheAnalyzer = new CacheAnalyzer(context, parsedStmt, planner);
if (cacheAnalyzer.enableCache()) {
handleCacheStmt(cacheAnalyzer, channel);
return;
}

coord = new Coordinator(context, analyzer, planner);
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord));
coord.exec();
while (true) {
batch = coord.getNext();
// for outfile query, there will be only one empty batch send back with eos flag
Expand All @@ -632,8 +721,8 @@ private void handleQueryStmt() throws Exception {
}
for (ByteBuffer row : batch.getBatch().getRows()) {
channel.sendOnePacket(row);
}
context.updateReturnRows(batch.getBatch().getRows().size());
}
context.updateReturnRows(batch.getBatch().getRows().size());
}
if (batch.isEos()) {
break;
Expand Down
10 changes: 4 additions & 6 deletions fe/fe-core/src/main/java/org/apache/doris/qe/cache/Cache.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.doris.qe.cache;

import org.apache.doris.analysis.SelectStmt;
//import org.apache.doris.common.Config;
import org.apache.doris.common.Config;
import org.apache.doris.common.Status;
import org.apache.doris.qe.RowBatch;
import org.apache.doris.thrift.TUniqueId;
Expand All @@ -38,8 +38,8 @@ public enum HitRange {

protected TUniqueId queryId;
protected SelectStmt selectStmt;
//protected RowBatchBuilder rowBatchBuilder;
//protected CacheAnalyzer.CacheTable latestTable;
protected RowBatchBuilder rowBatchBuilder;
protected CacheAnalyzer.CacheTable latestTable;
protected CacheProxy proxy;
protected HitRange hitRange;

Expand Down Expand Up @@ -72,7 +72,6 @@ public HitRange getHitRange() {
public abstract void updateCache();

protected boolean checkRowLimit() {
/*
if (rowBatchBuilder == null) {
return false;
}
Expand All @@ -82,7 +81,6 @@ protected boolean checkRowLimit() {
return false;
} else {
return true;
}*/
return false;
}
}
}
Loading