Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ public static Pair<Double, String> getByteUint(long value) {
}

public static String printId(final TUniqueId id) {
if (id == null) {
return "";
}
StringBuilder builder = new StringBuilder();
builder.append(Long.toHexString(id.hi)).append("-").append(Long.toHexString(id.lo));
return builder.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -59,8 +59,8 @@ public class ProfileManager {
public static final String SQL_STATEMENT = "Sql Statement";
public static final String USER = "User";
public static final String DEFAULT_DB = "Default Db";
public static final String IS_CACHED = "IsCached";
public static final String IS_CACHED = "Is Cached";

public static final ArrayList<String> PROFILE_HEADERS = new ArrayList(
Arrays.asList(QUERY_ID, USER, DEFAULT_DB, SQL_STATEMENT, QUERY_TYPE,
START_TIME, END_TIME, TOTAL_TIME, QUERY_STATE));
Expand Down
76 changes: 52 additions & 24 deletions fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -589,41 +589,62 @@ private void handleSetStmt() {
context.getState().setOk();
}

private void sendChannel(MysqlChannel channel, List<CacheProxy.CacheValue> cacheValues, boolean isEos)
// send values from cache.
// return true if the meta fields has been sent, otherwise, return false.
// the meta fields must be sent right before the first batch of data(or eos flag).
// so if it has data(or eos is true), this method must return true.
private boolean sendCachedValues(MysqlChannel channel, List<CacheProxy.CacheValue> cacheValues,
SelectStmt selectStmt, boolean isSendFields, boolean isEos)
throws Exception {
RowBatch batch = null;
boolean isSend = isSendFields;
for (CacheBeProxy.CacheValue value : cacheValues) {
batch = value.getRowBatch();
if (!isSend) {
// send meta fields before sending first data batch.
sendFields(selectStmt.getColLabels(), selectStmt.getResultExprs());
isSend = true;
}
for (ByteBuffer row : batch.getBatch().getRows()) {
channel.sendOnePacket(row);
}
context.updateReturnRows(batch.getBatch().getRows().size());
}

if (isEos) {
if (batch != null) {
statisticsForAuditLog = batch.getQueryStatistics();
}
if (!isSend) {
sendFields(selectStmt.getColLabels(), selectStmt.getResultExprs());
isSend = true;
}
context.getState().setEof();
return;
}
return isSend;
}

private boolean handleCacheStmt(CacheAnalyzer cacheAnalyzer,MysqlChannel channel) throws Exception {
/**
* Handle the SelectStmt via Cache.
*/
private void handleCacheStmt(CacheAnalyzer cacheAnalyzer, MysqlChannel channel, SelectStmt selectStmt) throws Exception {
RowBatch batch = null;
CacheBeProxy.FetchCacheResult cacheResult = cacheAnalyzer.getCacheData();
CacheMode mode = cacheAnalyzer.getCacheMode();
SelectStmt newSelectStmt = selectStmt;
boolean isSendFields = false;
if (cacheResult != null) {
isCached = true;
if (cacheAnalyzer.getHitRange() == Cache.HitRange.Full) {
sendChannel(channel, cacheResult.getValueList(), true);
return true;
sendCachedValues(channel, cacheResult.getValueList(), newSelectStmt, isSendFields, true);
return;
}
//rewrite sql
// rewrite sql
if (mode == CacheMode.Partition) {
if (cacheAnalyzer.getHitRange() == Cache.HitRange.Left) {
sendChannel(channel, cacheResult.getValueList(), false);
isSendFields = sendCachedValues(channel, cacheResult.getValueList(), newSelectStmt, isSendFields, false);
}
SelectStmt newSelectStmt = cacheAnalyzer.getRewriteStmt();
newSelectStmt = cacheAnalyzer.getRewriteStmt();
newSelectStmt.reset();
analyzer = new Analyzer(context.getCatalog(), context);
newSelectStmt.analyze(analyzer);
Expand All @@ -641,6 +662,10 @@ private boolean handleCacheStmt(CacheAnalyzer cacheAnalyzer,MysqlChannel channel
batch = coord.getNext();
if (batch.getBatch() != null) {
cacheAnalyzer.copyRowBatch(batch);
if (!isSendFields) {
sendFields(newSelectStmt.getColLabels(), newSelectStmt.getResultExprs());
isSendFields = true;
}
for (ByteBuffer row : batch.getBatch().getRows()) {
channel.sendOnePacket(row);
}
Expand All @@ -652,13 +677,19 @@ private boolean handleCacheStmt(CacheAnalyzer cacheAnalyzer,MysqlChannel channel
}

if (cacheResult != null && cacheAnalyzer.getHitRange() == Cache.HitRange.Right) {
sendChannel(channel, cacheResult.getValueList(), false);
isSendFields = sendCachedValues(channel, cacheResult.getValueList(), newSelectStmt, isSendFields, false);
}

cacheAnalyzer.updateCache();

if (!isSendFields) {
sendFields(newSelectStmt.getColLabels(), newSelectStmt.getResultExprs());
isSendFields = true;
}

statisticsForAuditLog = batch.getQueryStatistics();
context.getState().setEof();
return false;
return;
}

// Process a select statement.
Expand All @@ -682,6 +713,17 @@ private void handleQueryStmt() throws Exception {
return;
}

RowBatch batch;
MysqlChannel channel = context.getMysqlChannel();
boolean isOutfileQuery = queryStmt.hasOutFileClause();

// Sql and PartitionCache
CacheAnalyzer cacheAnalyzer = new CacheAnalyzer(context, parsedStmt, planner);
if (cacheAnalyzer.enableCache() && !isOutfileQuery && queryStmt instanceof SelectStmt) {
handleCacheStmt(cacheAnalyzer, channel, (SelectStmt) queryStmt);
return;
}

// send result
// 1. If this is a query with OUTFILE clause, eg: select * from tbl1 into outfile xxx,
// We will not send real query result to client. Instead, we only send OK to client with
Expand All @@ -690,21 +732,7 @@ private void handleQueryStmt() throws Exception {
// Query OK, 10 rows affected (0.01 sec)
//
// 2. If this is a query, send the result expr fields first, and send result data back to client.
RowBatch batch;
MysqlChannel channel = context.getMysqlChannel();
boolean isOutfileQuery = queryStmt.hasOutFileClause();
boolean isSendFields = false;
if (!isOutfileQuery) {
sendFields(queryStmt.getColLabels(), queryStmt.getResultExprs());
}

//Sql and PartitionCache
CacheAnalyzer cacheAnalyzer = new CacheAnalyzer(context, parsedStmt, planner);
if (cacheAnalyzer.enableCache()) {
handleCacheStmt(cacheAnalyzer, channel);
return;
}

coord = new Coordinator(context, analyzer, planner);
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,24 @@
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.analysis.TableRef;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.RangePartitionInfo;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.RangePartitionInfo;
import org.apache.doris.common.Config;
import org.apache.doris.common.Status;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.Planner;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.RowBatch;
import org.apache.doris.common.Config;
import org.apache.doris.common.Status;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.collect.Lists;
import org.apache.doris.thrift.TUniqueId;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -179,9 +180,11 @@ public void checkCacheMode(long now) {

private CacheMode innerCheckCacheMode(long now) {
if (!enableCache()) {
LOG.debug("cache is disabled. queryid {}", DebugUtil.printId(queryId));
return CacheMode.NoNeed;
}
if (!(parsedStmt instanceof SelectStmt) || scanNodes.size() == 0) {
LOG.debug("not a select stmt or no scan node. queryid {}", DebugUtil.printId(queryId));
return CacheMode.NoNeed;
}
MetricRepo.COUNTER_QUERY_TABLE.increase(1L);
Expand All @@ -192,6 +195,7 @@ private CacheMode innerCheckCacheMode(long now) {
for (int i = 0; i < scanNodes.size(); i++) {
ScanNode node = scanNodes.get(i);
if (!(node instanceof OlapScanNode)) {
LOG.debug("query contains non-olap table. queryid {}", DebugUtil.printId(queryId));
return CacheMode.None;
}
OlapScanNode oNode = (OlapScanNode) node;
Expand All @@ -217,40 +221,42 @@ private CacheMode innerCheckCacheMode(long now) {
}

if (!enablePartitionCache()) {
LOG.debug("partition query cache is disabled. queryid {}", DebugUtil.printId(queryId));
return CacheMode.None;
}

//Check if selectStmt matches partition key
//Only one table can be updated in Config.cache_last_version_interval_second range
for (int i = 1; i < tblTimeList.size(); i++) {
if ((now - tblTimeList.get(i).latestTime) < Config.cache_last_version_interval_second * 1000) {
LOG.info("the time of other tables is newer than {}", Config.cache_last_version_interval_second);
LOG.debug("the time of other tables is newer than {} s, queryid {}",
Config.cache_last_version_interval_second, DebugUtil.printId(queryId));
return CacheMode.None;
}
}
olapTable = latestTable.olapTable;
if (olapTable.getPartitionInfo().getType() != PartitionType.RANGE) {
LOG.debug("the partition of OlapTable not RANGE type");
LOG.debug("the partition of OlapTable not RANGE type, queryid {}", DebugUtil.printId(queryId));
return CacheMode.None;
}
partitionInfo = (RangePartitionInfo) olapTable.getPartitionInfo();
List<Column> columns = partitionInfo.getPartitionColumns();
//Partition key has only one column
if (columns.size() != 1) {
LOG.info("the size of columns for partition key is {}", columns.size());
LOG.debug("more than one partition column, queryid {}", columns.size(), DebugUtil.printId(queryId));
return CacheMode.None;
}
partColumn = columns.get(0);
//Check if group expr contain partition column
if (!checkGroupByPartitionKey(this.selectStmt, partColumn)) {
LOG.info("not group by partition key, key {}", partColumn.getName());
LOG.debug("group by columns does not contains all partition column, queryid {}", DebugUtil.printId(queryId));
return CacheMode.None;
}
//Check if whereClause have one CompoundPredicate of partition column
List<CompoundPredicate> compoundPredicates = Lists.newArrayList();
getPartitionKeyFromSelectStmt(this.selectStmt, partColumn, compoundPredicates);
if (compoundPredicates.size() != 1) {
LOG.info("the predicate size include partition key has {}", compoundPredicates.size());
LOG.debug("empty or more than one predicates contain partition column, queryid {}", DebugUtil.printId(queryId));
return CacheMode.None;
}
partitionPredicate = compoundPredicates.get(0);
Expand All @@ -265,22 +271,21 @@ public CacheBeProxy.FetchCacheResult getCacheData() {
CacheProxy.FetchCacheResult cacheResult = null;
cacheMode = innerCheckCacheMode(0);
if (cacheMode == CacheMode.NoNeed) {
return cacheResult;
return null;
}
if (cacheMode == CacheMode.None) {
LOG.info("check cache mode {}, queryid {}", cacheMode, DebugUtil.printId(queryId));
return cacheResult;
return null;
}
Status status = new Status();
cacheResult = cache.getCacheData(status);

if (status.ok() && cacheResult != null) {
LOG.info("hit cache, mode {}, queryid {}, all count {}, value count {}, row count {}, data size {}",
LOG.debug("hit cache, mode {}, queryid {}, all count {}, value count {}, row count {}, data size {}",
cacheMode, DebugUtil.printId(queryId),
cacheResult.all_count, cacheResult.value_count,
cacheResult.row_count, cacheResult.data_size);
} else {
LOG.info("miss cache, mode {}, queryid {}, code {}, msg {}", cacheMode,
LOG.debug("miss cache, mode {}, queryid {}, code {}, msg {}", cacheMode,
DebugUtil.printId(queryId), status.getErrorCode(), status.getErrorMsg());
cacheResult = null;
}
Expand Down