From 54ad31a03feb7208e9ab447c9be00904c5b44810 Mon Sep 17 00:00:00 2001 From: marising Date: Mon, 10 Aug 2020 21:24:37 +0800 Subject: [PATCH] [Feature][Cache] Sql cache and partition cache #2581 1. Analyze what mode of cache can be used by query 2. Query cache before executing query in StmtExecutor 3. Two cache mode, sqlcache and partitioncache, are implemented --- .../org/apache/doris/common/TreeNode.java | 6 + .../doris/common/util/ProfileManager.java | 1 + .../org/apache/doris/qe/StmtExecutor.java | 105 ++- .../java/org/apache/doris/qe/cache/Cache.java | 10 +- .../apache/doris/qe/cache/CacheAnalyzer.java | 451 ++++++++++ .../apache/doris/qe/cache/PartitionCache.java | 217 +++++ .../apache/doris/qe/cache/PartitionRange.java | 604 +++++++++++++ .../doris/qe/cache/RowBatchBuilder.java | 158 ++++ .../org/apache/doris/qe/cache/SqlCache.java | 80 ++ .../apache/doris/qe/PartitionCacheTest.java | 851 ++++++++++++++++++ 10 files changed, 2469 insertions(+), 14 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionCache.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionRange.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/qe/cache/RowBatchBuilder.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/qe/cache/SqlCache.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/qe/PartitionCacheTest.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/TreeNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/TreeNode.java index a75563d4c15ff8..cb5a0df0ea6a56 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/TreeNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/TreeNode.java @@ -47,6 +47,12 @@ public void addChildren(List n) { public ArrayList getChildren() { return children; } public void clearChildren() { children.clear(); } + public void removeNode(int i){ + if (children != null && i>=0 && i< children.size()) { + children.remove(i); + } + } + /** * Count the total number of nodes in this tree. Leaf node will return 1. * Non-leaf node will include all its children. diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java index 1705bb632ccd88..d1d621d45de73e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java @@ -59,6 +59,7 @@ public class ProfileManager { public static final String SQL_STATEMENT = "Sql Statement"; public static final String USER = "User"; public static final String DEFAULT_DB = "Default Db"; + public static final String IS_CACHED = "IsCached"; public static final ArrayList PROFILE_HEADERS = new ArrayList( Arrays.asList(QUERY_ID, USER, DEFAULT_DB, SQL_STATEMENT, QUERY_TYPE, diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index be7405c2d5dad2..86331081529311 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -65,6 +65,11 @@ import org.apache.doris.planner.Planner; import org.apache.doris.proto.PQueryStatistics; import org.apache.doris.qe.QueryState.MysqlStateType; +import org.apache.doris.qe.cache.Cache; +import org.apache.doris.qe.cache.CacheAnalyzer; +import org.apache.doris.qe.cache.CacheAnalyzer.CacheMode; +import org.apache.doris.qe.cache.CacheBeProxy; +import org.apache.doris.qe.cache.CacheProxy; import org.apache.doris.rewrite.ExprRewriter; import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException; import org.apache.doris.rpc.RpcException; @@ -114,6 +119,7 @@ public class StmtExecutor { private boolean isProxy; private ShowResultSet proxyResultSet = null; private PQueryStatistics statisticsForAuditLog; + private boolean isCached; // this constructor is mainly for proxy public StmtExecutor(ConnectContext context, OriginStatement originStmt, boolean isProxy) { @@ -155,6 +161,8 @@ public void initProfile(long beginTimeInNanoSecond) { summaryProfile.addInfoString(ProfileManager.USER, context.getQualifiedUser()); summaryProfile.addInfoString(ProfileManager.DEFAULT_DB, context.getDatabase()); summaryProfile.addInfoString(ProfileManager.SQL_STATEMENT, originStmt.originStmt); + summaryProfile.addInfoString(ProfileManager.IS_CACHED, isCached ? "Yes" : "No"); + profile.addChild(summaryProfile); if (coord != null) { coord.getQueryProfile().getCounterTotalTime().setValue(TimeUtils.getEstimatedTime(beginTimeInNanoSecond)); @@ -581,6 +589,78 @@ private void handleSetStmt() { context.getState().setOk(); } + private void sendChannel(MysqlChannel channel, List cacheValues, boolean isEos) + throws Exception { + RowBatch batch = null; + for (CacheBeProxy.CacheValue value : cacheValues) { + batch = value.getRowBatch(); + for (ByteBuffer row : batch.getBatch().getRows()) { + channel.sendOnePacket(row); + } + context.updateReturnRows(batch.getBatch().getRows().size()); + } + if (isEos) { + if (batch != null) { + statisticsForAuditLog = batch.getQueryStatistics(); + } + context.getState().setEof(); + return; + } + } + + private boolean handleCacheStmt(CacheAnalyzer cacheAnalyzer,MysqlChannel channel) throws Exception { + RowBatch batch = null; + CacheBeProxy.FetchCacheResult cacheResult = cacheAnalyzer.getCacheData(); + CacheMode mode = cacheAnalyzer.getCacheMode(); + if (cacheResult != null) { + isCached = true; + if (cacheAnalyzer.getHitRange() == Cache.HitRange.Full) { + sendChannel(channel, cacheResult.getValueList(), true); + return true; + } + //rewrite sql + if (mode == CacheMode.Partition) { + if (cacheAnalyzer.getHitRange() == Cache.HitRange.Left) { + sendChannel(channel, cacheResult.getValueList(), false); + } + SelectStmt newSelectStmt = cacheAnalyzer.getRewriteStmt(); + newSelectStmt.reset(); + analyzer = new Analyzer(context.getCatalog(), context); + newSelectStmt.analyze(analyzer); + planner = new Planner(); + planner.plan(newSelectStmt, analyzer, context.getSessionVariable().toThrift()); + } + } + + coord = new Coordinator(context, analyzer, planner); + QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), + new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord)); + coord.exec(); + + while (true) { + batch = coord.getNext(); + if (batch.getBatch() != null) { + cacheAnalyzer.copyRowBatch(batch); + for (ByteBuffer row : batch.getBatch().getRows()) { + channel.sendOnePacket(row); + } + context.updateReturnRows(batch.getBatch().getRows().size()); + } + if (batch.isEos()) { + break; + } + } + + if (cacheResult != null && cacheAnalyzer.getHitRange() == Cache.HitRange.Right) { + sendChannel(channel, cacheResult.getValueList(), false); + } + + cacheAnalyzer.updateCache(); + statisticsForAuditLog = batch.getQueryStatistics(); + context.getState().setEof(); + return false; + } + // Process a select statement. private void handleQueryStmt() throws Exception { // Every time set no send flag and clean all data in buffer @@ -601,12 +681,6 @@ private void handleQueryStmt() throws Exception { handleExplainStmt(explainString); return; } - coord = new Coordinator(context, analyzer, planner); - - QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), - new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord)); - - coord.exec(); // send result // 1. If this is a query with OUTFILE clause, eg: select * from tbl1 into outfile xxx, @@ -620,6 +694,21 @@ private void handleQueryStmt() throws Exception { MysqlChannel channel = context.getMysqlChannel(); boolean isOutfileQuery = queryStmt.hasOutFileClause(); boolean isSendFields = false; + if (!isOutfileQuery) { + sendFields(queryStmt.getColLabels(), queryStmt.getResultExprs()); + } + + //Sql and PartitionCache + CacheAnalyzer cacheAnalyzer = new CacheAnalyzer(context, parsedStmt, planner); + if (cacheAnalyzer.enableCache()) { + handleCacheStmt(cacheAnalyzer, channel); + return; + } + + coord = new Coordinator(context, analyzer, planner); + QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), + new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord)); + coord.exec(); while (true) { batch = coord.getNext(); // for outfile query, there will be only one empty batch send back with eos flag @@ -632,8 +721,8 @@ private void handleQueryStmt() throws Exception { } for (ByteBuffer row : batch.getBatch().getRows()) { channel.sendOnePacket(row); - } - context.updateReturnRows(batch.getBatch().getRows().size()); + } + context.updateReturnRows(batch.getBatch().getRows().size()); } if (batch.isEos()) { break; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/Cache.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/Cache.java index 9504513edf0dec..c2a054e54749b0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/Cache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/Cache.java @@ -18,7 +18,7 @@ package org.apache.doris.qe.cache; import org.apache.doris.analysis.SelectStmt; -//import org.apache.doris.common.Config; +import org.apache.doris.common.Config; import org.apache.doris.common.Status; import org.apache.doris.qe.RowBatch; import org.apache.doris.thrift.TUniqueId; @@ -38,8 +38,8 @@ public enum HitRange { protected TUniqueId queryId; protected SelectStmt selectStmt; - //protected RowBatchBuilder rowBatchBuilder; - //protected CacheAnalyzer.CacheTable latestTable; + protected RowBatchBuilder rowBatchBuilder; + protected CacheAnalyzer.CacheTable latestTable; protected CacheProxy proxy; protected HitRange hitRange; @@ -72,7 +72,6 @@ public HitRange getHitRange() { public abstract void updateCache(); protected boolean checkRowLimit() { - /* if (rowBatchBuilder == null) { return false; } @@ -82,7 +81,6 @@ protected boolean checkRowLimit() { return false; } else { return true; - }*/ - return false; + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java new file mode 100644 index 00000000000000..2f2fba993f69c8 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java @@ -0,0 +1,451 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe.cache; + +import org.apache.doris.analysis.AggregateInfo; +import org.apache.doris.analysis.BinaryPredicate; +import org.apache.doris.analysis.CastExpr; +import org.apache.doris.analysis.CompoundPredicate; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.InlineViewRef; +import org.apache.doris.analysis.QueryStmt; +import org.apache.doris.analysis.SelectStmt; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.StatementBase; +import org.apache.doris.analysis.TableRef; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.RangePartitionInfo; +import org.apache.doris.catalog.PartitionType; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.Column; +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.metric.MetricRepo; +import org.apache.doris.planner.OlapScanNode; +import org.apache.doris.planner.Planner; +import org.apache.doris.planner.ScanNode; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.RowBatch; +import org.apache.doris.common.Config; +import org.apache.doris.common.Status; + +import com.google.common.collect.Lists; +import org.apache.doris.thrift.TUniqueId; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Analyze which caching mode a SQL is suitable for + * 1. T + 1 update is suitable for SQL mode + * 2. Partition by date, update the data of the day in near real time, which is suitable for Partition mode + */ +public class CacheAnalyzer { + private static final Logger LOG = LogManager.getLogger(CacheAnalyzer.class); + + /** + * NoNeed : disable config or variable, not query, not scan table etc. + */ + public enum CacheMode { + NoNeed, + None, + TTL, + Sql, + Partition + } + + private ConnectContext context; + private boolean enableSqlCache = false; + private boolean enablePartitionCache = false; + private TUniqueId queryId; + private CacheMode cacheMode; + private CacheTable latestTable; + private StatementBase parsedStmt; + private SelectStmt selectStmt; + private List scanNodes; + private OlapTable olapTable; + private RangePartitionInfo partitionInfo; + private Column partColumn; + private CompoundPredicate partitionPredicate; + private Cache cache; + + public Cache getCache() { + return cache; + } + + public CacheAnalyzer(ConnectContext context, StatementBase parsedStmt, Planner planner) { + this.context = context; + this.queryId = context.queryId(); + this.parsedStmt = parsedStmt; + scanNodes = planner.getScanNodes(); + latestTable = new CacheTable(); + checkCacheConfig(); + } + + //for unit test + public CacheAnalyzer(ConnectContext context, StatementBase parsedStmt, List scanNodes) { + this.context = context; + this.parsedStmt = parsedStmt; + this.scanNodes = scanNodes; + checkCacheConfig(); + } + + private void checkCacheConfig() { + if (Config.cache_enable_sql_mode) { + if (context.getSessionVariable().isEnableSqlCache()) { + enableSqlCache = true; + } + } + if (Config.cache_enable_partition_mode) { + if (context.getSessionVariable().isEnablePartitionCache()) { + enablePartitionCache = true; + } + } + } + + public CacheMode getCacheMode() { + return cacheMode; + } + + public class CacheTable implements Comparable { + public OlapTable olapTable; + public long latestPartitionId; + public long latestVersion; + public long latestTime; + + public CacheTable() { + olapTable = null; + latestPartitionId = 0; + latestVersion = 0; + latestTime = 0; + } + + @Override + public int compareTo(CacheTable table) { + return (int) (table.latestTime - this.latestTime); + } + + public void Debug() { + LOG.debug("table {}, partition id {}, ver {}, time {}", olapTable.getName(), latestPartitionId, latestVersion, latestTime); + } + } + + public boolean enableCache() { + return enableSqlCache || enablePartitionCache; + } + + public boolean enableSqlCache() { + return enableSqlCache; + } + + public boolean enablePartitionCache() { + return enablePartitionCache; + } + + /** + * Check cache mode with SQL and table + * 1、Only Olap table + * 2、The update time of the table is before Config.last_version_interval_time + * 2、PartitionType is PartitionType.RANGE, and partition key has only one column + * 4、Partition key must be included in the group by clause + * 5、Where clause must contain only one partition key predicate + * CacheMode.Sql + * xxx FROM user_profile, updated before Config.last_version_interval_time + * CacheMode.Partition, partition by event_date, only the partition of today will be updated. + * SELECT xxx FROM app_event WHERE event_date >= 20191201 AND event_date <= 20191207 GROUP BY event_date + * SELECT xxx FROM app_event INNER JOIN user_Profile ON app_event.user_id = user_profile.user_id xxx + * SELECT xxx FROM app_event INNER JOIN user_profile ON xxx INNER JOIN site_channel ON xxx + */ + public void checkCacheMode(long now) { + cacheMode = innerCheckCacheMode(now); + } + + private CacheMode innerCheckCacheMode(long now) { + if (!enableCache()) { + return CacheMode.NoNeed; + } + if (!(parsedStmt instanceof SelectStmt) || scanNodes.size() == 0) { + return CacheMode.NoNeed; + } + MetricRepo.COUNTER_QUERY_TABLE.increase(1L); + + this.selectStmt = (SelectStmt) parsedStmt; + //Check the last version time of the table + List tblTimeList = Lists.newArrayList(); + for (int i = 0; i < scanNodes.size(); i++) { + ScanNode node = scanNodes.get(i); + if (!(node instanceof OlapScanNode)) { + return CacheMode.None; + } + OlapScanNode oNode = (OlapScanNode) node; + OlapTable oTable = oNode.getOlapTable(); + CacheTable cTable = getLastUpdateTime(oTable); + tblTimeList.add(cTable); + } + MetricRepo.COUNTER_QUERY_OLAP_TABLE.increase(1L); + Collections.sort(tblTimeList); + latestTable = tblTimeList.get(0); + latestTable.Debug(); + + if (now == 0) { + now = nowtime(); + } + if (enableSqlCache() && + (now - latestTable.latestTime) >= Config.cache_last_version_interval_second * 1000) { + LOG.debug("TIME:{},{},{}", now, latestTable.latestTime, Config.cache_last_version_interval_second*1000); + cache = new SqlCache(this.queryId, this.selectStmt); + ((SqlCache) cache).setCacheInfo(this.latestTable); + MetricRepo.COUNTER_CACHE_MODE_SQL.increase(1L); + return CacheMode.Sql; + } + + if (!enablePartitionCache()) { + return CacheMode.None; + } + + //Check if selectStmt matches partition key + //Only one table can be updated in Config.cache_last_version_interval_second range + for (int i = 1; i < tblTimeList.size(); i++) { + if ((now - tblTimeList.get(i).latestTime) < Config.cache_last_version_interval_second * 1000) { + LOG.info("the time of other tables is newer than {}", Config.cache_last_version_interval_second); + return CacheMode.None; + } + } + olapTable = latestTable.olapTable; + if (olapTable.getPartitionInfo().getType() != PartitionType.RANGE) { + LOG.debug("the partition of OlapTable not RANGE type"); + return CacheMode.None; + } + partitionInfo = (RangePartitionInfo) olapTable.getPartitionInfo(); + List columns = partitionInfo.getPartitionColumns(); + //Partition key has only one column + if (columns.size() != 1) { + LOG.info("the size of columns for partition key is {}", columns.size()); + return CacheMode.None; + } + partColumn = columns.get(0); + //Check if group expr contain partition column + if (!checkGroupByPartitionKey(this.selectStmt, partColumn)) { + LOG.info("not group by partition key, key {}", partColumn.getName()); + return CacheMode.None; + } + //Check if whereClause have one CompoundPredicate of partition column + List compoundPredicates = Lists.newArrayList(); + getPartitionKeyFromSelectStmt(this.selectStmt, partColumn, compoundPredicates); + if (compoundPredicates.size() != 1) { + LOG.info("the predicate size include partition key has {}", compoundPredicates.size()); + return CacheMode.None; + } + partitionPredicate = compoundPredicates.get(0); + cache = new PartitionCache(this.queryId, this.selectStmt); + ((PartitionCache) cache).setCacheInfo(this.latestTable, this.partitionInfo, this.partColumn, + this.partitionPredicate); + MetricRepo.COUNTER_CACHE_MODE_PARTITION.increase(1L); + return CacheMode.Partition; + } + + public CacheBeProxy.FetchCacheResult getCacheData() { + CacheProxy.FetchCacheResult cacheResult = null; + cacheMode = innerCheckCacheMode(0); + if (cacheMode == CacheMode.NoNeed) { + return cacheResult; + } + if (cacheMode == CacheMode.None) { + LOG.info("check cache mode {}, queryid {}", cacheMode, DebugUtil.printId(queryId)); + return cacheResult; + } + Status status = new Status(); + cacheResult = cache.getCacheData(status); + + if (status.ok() && cacheResult != null) { + LOG.info("hit cache, mode {}, queryid {}, all count {}, value count {}, row count {}, data size {}", + cacheMode, DebugUtil.printId(queryId), + cacheResult.all_count, cacheResult.value_count, + cacheResult.row_count, cacheResult.data_size); + } else { + LOG.info("miss cache, mode {}, queryid {}, code {}, msg {}", cacheMode, + DebugUtil.printId(queryId), status.getErrorCode(), status.getErrorMsg()); + cacheResult = null; + } + return cacheResult; + } + + public long nowtime() { + return System.currentTimeMillis(); + } + + private void getPartitionKeyFromSelectStmt(SelectStmt stmt, Column partColumn, + List compoundPredicates) { + getPartitionKeyFromWhereClause(stmt.getWhereClause(), partColumn, compoundPredicates); + List tableRefs = stmt.getTableRefs(); + for (TableRef tblRef : tableRefs) { + if (tblRef instanceof InlineViewRef) { + InlineViewRef viewRef = (InlineViewRef) tblRef; + QueryStmt queryStmt = viewRef.getViewStmt(); + if (queryStmt instanceof SelectStmt) { + getPartitionKeyFromSelectStmt((SelectStmt) queryStmt, partColumn, compoundPredicates); + } + } + } + } + + /** + * Only support case 1 + * 1.key >= a and key <= b + * 2.key = a or key = b + * 3.key in(a,b,c) + */ + private void getPartitionKeyFromWhereClause(Expr expr, Column partColumn, + List compoundPredicates) { + if (expr == null) { + return; + } + if (expr instanceof CompoundPredicate) { + CompoundPredicate cp = (CompoundPredicate) expr; + if (cp.getOp() == CompoundPredicate.Operator.AND) { + if (cp.getChildren().size() == 2 && cp.getChild(0) instanceof BinaryPredicate && + cp.getChild(1) instanceof BinaryPredicate) { + BinaryPredicate leftPre = (BinaryPredicate) cp.getChild(0); + BinaryPredicate rightPre = (BinaryPredicate) cp.getChild(1); + String leftColumn = getColumnName(leftPre); + String rightColumn = getColumnName(rightPre); + if (leftColumn.equalsIgnoreCase(partColumn.getName()) && + rightColumn.equalsIgnoreCase(partColumn.getName())) { + compoundPredicates.add(cp); + } + } + } + for (Expr subExpr : expr.getChildren()) { + getPartitionKeyFromWhereClause(subExpr, partColumn, compoundPredicates); + } + } + } + + private String getColumnName(BinaryPredicate predicate) { + SlotRef slot = null; + if (predicate.getChild(0) instanceof SlotRef) { + slot = (SlotRef) predicate.getChild(0); + } else if (predicate.getChild(0) instanceof CastExpr) { + CastExpr expr = (CastExpr) predicate.getChild(0); + if (expr.getChild(0) instanceof SlotRef) { + slot = (SlotRef) expr.getChild(0); + } + } + + if (slot != null) { + return slot.getColumnName(); + } + return ""; + } + + /** + * Check the selectStmt and tableRefs always group by partition key + * 1. At least one group by + * 2. group by must contain partition key + */ + private boolean checkGroupByPartitionKey(SelectStmt stmt, Column partColumn) { + List aggInfoList = Lists.newArrayList(); + getAggInfoList(stmt, aggInfoList); + int groupbyCount = 0; + for (AggregateInfo aggInfo : aggInfoList) { + /* + Support COUNT(DISTINCT xxx) now,next version will remove the code + if (aggInfo.isDistinctAgg()) { + return false; + }*/ + ArrayList groupExprs = aggInfo.getGroupingExprs(); + if (groupExprs == null) { + continue; + } + groupbyCount += 1; + boolean matched = false; + for (Expr groupExpr : groupExprs) { + SlotRef slot = (SlotRef) groupExpr; + if (partColumn.getName().equals(slot.getColumnName())) { + matched = true; + break; + } + } + if (!matched) { + return false; + } + } + return groupbyCount > 0 ? true : false; + } + + private void getAggInfoList(SelectStmt stmt, List aggInfoList) { + AggregateInfo aggInfo = stmt.getAggInfo(); + if (aggInfo != null) { + aggInfoList.add(aggInfo); + } + List tableRefs = stmt.getTableRefs(); + for (TableRef tblRef : tableRefs) { + if (tblRef instanceof InlineViewRef) { + InlineViewRef viewRef = (InlineViewRef) tblRef; + QueryStmt queryStmt = viewRef.getViewStmt(); + if (queryStmt instanceof SelectStmt) { + getAggInfoList((SelectStmt) queryStmt, aggInfoList); + } + } + } + } + + private CacheTable getLastUpdateTime(OlapTable olapTable) { + CacheTable table = new CacheTable(); + table.olapTable = olapTable; + for (Partition partition : olapTable.getPartitions()) { + if (partition.getVisibleVersionTime() >= table.latestTime && + partition.getVisibleVersion() > table.latestVersion) { + table.latestPartitionId = partition.getId(); + table.latestTime = partition.getVisibleVersionTime(); + table.latestVersion = partition.getVisibleVersion(); + } + } + return table; + } + + public Cache.HitRange getHitRange() { + if (cacheMode == CacheMode.None) { + return Cache.HitRange.None; + } + return cache.getHitRange(); + } + + public SelectStmt getRewriteStmt() { + if (cacheMode != CacheMode.Partition) { + return null; + } + return cache.getRewriteStmt(); + } + + public void copyRowBatch(RowBatch rowBatch) { + if (cacheMode == CacheMode.None || cacheMode == CacheMode.NoNeed) { + return; + } + cache.copyRowBatch(rowBatch); + } + + public void updateCache() { + if (cacheMode == CacheMode.None || cacheMode == CacheMode.NoNeed) { + return; + } + cache.updateCache(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionCache.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionCache.java new file mode 100644 index 00000000000000..d88f6ac65ca7ff --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionCache.java @@ -0,0 +1,217 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe.cache; + +import com.google.common.collect.Lists; +import org.apache.doris.analysis.CompoundPredicate; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.InlineViewRef; +import org.apache.doris.analysis.QueryStmt; +import org.apache.doris.analysis.SelectStmt; +import org.apache.doris.analysis.TableRef; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.RangePartitionInfo; +import org.apache.doris.common.Status; +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.metric.MetricRepo; +import org.apache.doris.qe.RowBatch; +import org.apache.doris.thrift.TUniqueId; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; + +public class PartitionCache extends Cache { + private static final Logger LOG = LogManager.getLogger(PartitionCache.class); + private SelectStmt nokeyStmt; + private SelectStmt rewriteStmt; + private CompoundPredicate partitionPredicate; + private OlapTable olapTable; + private RangePartitionInfo partitionInfo; + private Column partColumn; + + private PartitionRange range; + private List newRangeList; + + public SelectStmt getRewriteStmt() { + return rewriteStmt; + } + + public SelectStmt getNokeyStmt() { + return nokeyStmt; + } + + public PartitionCache(TUniqueId queryId, SelectStmt selectStmt) { + super(queryId, selectStmt); + } + + public void setCacheInfo(CacheAnalyzer.CacheTable latestTable, RangePartitionInfo partitionInfo, Column partColumn, + CompoundPredicate partitionPredicate) { + this.latestTable = latestTable; + this.olapTable = latestTable.olapTable; + this.partitionInfo = partitionInfo; + this.partColumn = partColumn; + this.partitionPredicate = partitionPredicate; + this.newRangeList = Lists.newArrayList(); + } + + public CacheProxy.FetchCacheResult getCacheData(Status status) { + CacheProxy.FetchCacheRequest request; + rewriteSelectStmt(null); + request = new CacheBeProxy.FetchCacheRequest(nokeyStmt.toSql()); + range = new PartitionRange(this.partitionPredicate, this.olapTable, + this.partitionInfo); + if (!range.analytics()) { + status.setStatus("analytics range error"); + return null; + } + + for (PartitionRange.PartitionSingle single : range.getPartitionSingleList()) { + request.addParam(single.getCacheKey().realValue(), + single.getPartition().getVisibleVersion(), + single.getPartition().getVisibleVersionTime() + ); + } + + CacheProxy.FetchCacheResult cacheResult = proxy.fetchCache(request, 10000, status); + if (status.ok() && cacheResult != null) { + cacheResult.all_count = range.getPartitionSingleList().size(); + for (CacheBeProxy.CacheValue value : cacheResult.getValueList()) { + range.setCacheFlag(value.param.partition_key); + } + MetricRepo.COUNTER_CACHE_HIT_PARTITION.increase(1L); + MetricRepo.COUNTER_CACHE_PARTITION_ALL.increase((long) range.getPartitionSingleList().size()); + MetricRepo.COUNTER_CACHE_PARTITION_HIT.increase((long) cacheResult.getValueList().size()); + } + + range.setTooNewByID(latestTable.latestPartitionId); + //build rewrite sql + this.hitRange = range.buildDiskPartitionRange(newRangeList); + if (newRangeList != null && newRangeList.size() > 0) { + rewriteSelectStmt(newRangeList); + } + return cacheResult; + } + + public void copyRowBatch(RowBatch rowBatch) { + if (rowBatchBuilder == null) { + rowBatchBuilder = new RowBatchBuilder(CacheAnalyzer.CacheMode.Partition); + rowBatchBuilder.buildPartitionIndex(selectStmt.getResultExprs(), selectStmt.getColLabels(), + partColumn, range.buildUpdatePartitionRange()); + } + rowBatchBuilder.copyRowData(rowBatch); + } + + public void updateCache() { + if (!super.checkRowLimit()) { + return; + } + + CacheBeProxy.UpdateCacheRequest updateRequest = rowBatchBuilder.buildPartitionUpdateRequest(nokeyStmt.toSql()); + if (updateRequest.value_count > 0) { + CacheBeProxy proxy = new CacheBeProxy(); + Status status = new Status(); + proxy.updateCache(updateRequest, CacheProxy.UPDATE_TIMEOUT, status); + LOG.info("update cache model {}, queryid {}, sqlkey {}, value count {}, row count {}, data size {}", + CacheAnalyzer.CacheMode.Partition, DebugUtil.printId(queryId), + DebugUtil.printId(updateRequest.sql_key), + updateRequest.value_count, updateRequest.row_count, updateRequest.data_size); + } + } + + /** + * Set the predicate containing partition key to null + */ + public void rewriteSelectStmt(List newRangeList) { + if (newRangeList == null || newRangeList.size() == 0) { + this.nokeyStmt = (SelectStmt) this.selectStmt.clone(); + rewriteSelectStmt(nokeyStmt, this.partitionPredicate, null); + } else { + this.rewriteStmt = (SelectStmt) this.selectStmt.clone(); + rewriteSelectStmt(rewriteStmt, this.partitionPredicate, newRangeList); + } + } + + private void rewriteSelectStmt(SelectStmt newStmt, CompoundPredicate predicate, + List newRangeList) { + newStmt.setWhereClause( + rewriteWhereClause(newStmt.getWhereClause(), predicate, newRangeList) + ); + List tableRefs = newStmt.getTableRefs(); + for (TableRef tblRef : tableRefs) { + if (tblRef instanceof InlineViewRef) { + InlineViewRef viewRef = (InlineViewRef) tblRef; + QueryStmt queryStmt = viewRef.getViewStmt(); + if (queryStmt instanceof SelectStmt) { + rewriteSelectStmt((SelectStmt) queryStmt, predicate, newRangeList); + } + } + } + } + + /** + * Rewrite the query scope of partition key in the where condition + * origin expr : where eventdate>="2020-01-12" and eventdate<="2020-01-15" + * rewrite expr : where eventdate>="2020-01-14" and eventdate<="2020=01-15" + */ + private Expr rewriteWhereClause(Expr expr, CompoundPredicate predicate, + List newRangeList) { + if (expr == null) { + return null; + } + if (!(expr instanceof CompoundPredicate)) { + return expr; + } + if (expr.equals(predicate)) { + if (newRangeList == null) { + return null; + } else { + getPartitionRange().rewritePredicate((CompoundPredicate) expr, newRangeList); + return expr; + } + } + + for (int i = 0; i < expr.getChildren().size(); i++) { + Expr child = rewriteWhereClause(expr.getChild(i), predicate, newRangeList); + if (child == null) { + expr.removeNode(i); + i--; + } else { + expr.setChild(i, child); + } + } + if (expr.getChildren().size() == 0) { + return null; + } else if (expr.getChildren().size() == 1) { + return expr.getChild(0); + } else { + return expr; + } + } + + public PartitionRange getPartitionRange() { + if (range == null) { + range = new PartitionRange(this.partitionPredicate, + this.olapTable, this.partitionInfo); + return range; + } else { + return range; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionRange.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionRange.java new file mode 100644 index 00000000000000..194d786c16c873 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionRange.java @@ -0,0 +1,604 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe.cache; + +import org.apache.doris.analysis.CompoundPredicate; +import org.apache.doris.analysis.BinaryPredicate; +import org.apache.doris.analysis.DateLiteral; +import org.apache.doris.analysis.InPredicate; +import org.apache.doris.analysis.PartitionValue; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.LiteralExpr; +import org.apache.doris.analysis.IntLiteral; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.RangePartitionInfo; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.Config; +import org.apache.doris.planner.PartitionColumnFilter; + +import org.apache.doris.common.AnalysisException; + +import com.google.common.collect.Lists; +import com.google.common.collect.Range; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; +import java.util.Map; + +/** + * Convert the range of the partition to the list + * all partition by day/week/month split to day list + */ +public class PartitionRange { + private static final Logger LOG = LogManager.getLogger(PartitionRange.class); + + public class PartitionSingle { + private Partition partition; + private PartitionKey partitionKey; + private long partitionId; + private PartitionKeyType cacheKey; + private boolean fromCache; + private boolean tooNew; + + public Partition getPartition() { + return partition; + } + + public void setPartition(Partition partition) { + this.partition = partition; + } + + public PartitionKey getPartitionKey() { + return partitionKey; + } + + public void setPartitionKey(PartitionKey key) { + this.partitionKey = key; + } + + public long getPartitionId() { + return partitionId; + } + + public void setPartitionId(long partitionId) { + this.partitionId = partitionId; + } + + public PartitionKeyType getCacheKey() { + return cacheKey; + } + + public void setCacheKey(PartitionKeyType cacheKey) { + this.cacheKey.clone(cacheKey); + } + + public boolean isFromCache() { + return fromCache; + } + + public void setFromCache(boolean fromCache) { + this.fromCache = fromCache; + } + + public boolean isTooNew() { + return tooNew; + } + + public void setTooNew(boolean tooNew) { + this.tooNew = tooNew; + } + + public PartitionSingle() { + this.partitionId = 0; + this.cacheKey = new PartitionKeyType(); + this.fromCache = false; + this.tooNew = false; + } + + public void Debug() { + if (partition != null) { + LOG.info("partition id {}, cacheKey {}, version {}, time {}, fromCache {}, tooNew {} ", + partitionId, cacheKey.realValue(), + partition.getVisibleVersion(), partition.getVisibleVersionTime(), + fromCache, tooNew); + } else { + LOG.info("partition id {}, cacheKey {}, fromCache {}, tooNew {} ", partitionId, + cacheKey.realValue(), fromCache, tooNew); + } + } + } + + public enum KeyType { + DEFAULT, + LONG, + DATE, + DATETIME, + TIME + } + + public static class PartitionKeyType { + private SimpleDateFormat df8 = new SimpleDateFormat("yyyyMMdd"); + private SimpleDateFormat df10 = new SimpleDateFormat("yyyy-MM-dd"); + + public KeyType keyType = KeyType.DEFAULT; + public long value; + public Date date; + + public boolean init(Type type, String str) { + if (type.getPrimitiveType() == PrimitiveType.DATE) { + try { + date = df10.parse(str); + } catch (Exception e) { + LOG.warn("parse error str{}.", str); + return false; + } + keyType = KeyType.DATE; + } else { + value = Long.valueOf(str); + keyType = KeyType.LONG; + } + return true; + } + + public boolean init(Type type, LiteralExpr expr) { + switch (type.getPrimitiveType()) { + case BOOLEAN: + case TIME: + case DATETIME: + case FLOAT: + case DOUBLE: + case DECIMAL: + case DECIMALV2: + case CHAR: + case VARCHAR: + case LARGEINT: + LOG.info("PartitionCache not support such key type {}", type.toSql()); + return false; + case DATE: + date = getDateValue(expr); + keyType = KeyType.DATE; + break; + case TINYINT: + case SMALLINT: + case INT: + case BIGINT: + value = expr.getLongValue(); + keyType = KeyType.LONG; + break; + } + return true; + } + + public void clone(PartitionKeyType key) { + keyType = key.keyType; + value = key.value; + date = key.date; + } + + public boolean equals(PartitionKeyType key) { + return realValue() == key.realValue(); + } + + public void add(int num) { + if (keyType == KeyType.DATE) { + date = new Date(date.getTime() + num * 3600 * 24 * 1000); + } else { + value += num; + } + } + + public String toString() { + if (keyType == KeyType.DEFAULT) { + return ""; + } else if (keyType == KeyType.DATE) { + return df10.format(date); + } else { + return String.valueOf(value); + } + } + + public long realValue() { + if (keyType == KeyType.DATE) { + return Long.parseLong(df8.format(date)); + } else { + return value; + } + } + + private Date getDateValue(LiteralExpr expr) { + value = expr.getLongValue() / 1000000; + Date dt = null; + try { + dt = df8.parse(String.valueOf(value)); + } catch (Exception e) { + } + return dt; + } + } + + private CompoundPredicate partitionKeyPredicate; + private OlapTable olapTable; + private RangePartitionInfo rangePartitionInfo; + private Column partitionColumn; + private List partitionSingleList; + + public CompoundPredicate getPartitionKeyPredicate() { + return partitionKeyPredicate; + } + + public void setPartitionKeyPredicate(CompoundPredicate partitionKeyPredicate) { + this.partitionKeyPredicate = partitionKeyPredicate; + } + + public RangePartitionInfo getRangePartitionInfo() { + return rangePartitionInfo; + } + + public void setRangePartitionInfo(RangePartitionInfo rangePartitionInfo) { + this.rangePartitionInfo = rangePartitionInfo; + } + + public Column getPartitionColumn() { + return partitionColumn; + } + + public void setPartitionColumn(Column partitionColumn) { + this.partitionColumn = partitionColumn; + } + + public List getPartitionSingleList() { + return partitionSingleList; + } + + public PartitionRange() { + } + + public PartitionRange(CompoundPredicate partitionKeyPredicate, OlapTable olapTable, + RangePartitionInfo rangePartitionInfo) { + this.partitionKeyPredicate = partitionKeyPredicate; + this.olapTable = olapTable; + this.rangePartitionInfo = rangePartitionInfo; + this.partitionSingleList = Lists.newArrayList(); + } + + /** + * analytics PartitionKey and PartitionInfo + * + * @return + */ + public boolean analytics() { + if (rangePartitionInfo.getPartitionColumns().size() != 1) { + return false; + } + partitionColumn = rangePartitionInfo.getPartitionColumns().get(0); + PartitionColumnFilter filter = createPartitionFilter(this.partitionKeyPredicate, partitionColumn); + try { + if (!buildPartitionKeyRange(filter, partitionColumn)) { + return false; + } + getTablePartitionList(olapTable); + } catch (AnalysisException e) { + LOG.warn("get partition range failed, because:", e); + return false; + } + return true; + } + + public boolean setCacheFlag(long cacheKey) { + boolean find = false; + for (PartitionSingle single : partitionSingleList) { + if (single.getCacheKey().realValue() == cacheKey) { + single.setFromCache(true); + find = true; + break; + } + } + return find; + } + + public boolean setTooNewByID(long partitionId) { + boolean find = false; + for (PartitionSingle single : partitionSingleList) { + if (single.getPartition().getId() == partitionId) { + single.setTooNew(true); + find = true; + break; + } + } + return find; + } + + public boolean setTooNewByKey(long cacheKey) { + boolean find = false; + for (PartitionSingle single : partitionSingleList) { + if (single.getCacheKey().realValue() == cacheKey) { + single.setTooNew(true); + find = true; + break; + } + } + return find; + } + + /** + * Only the range query of the key of the partition is supported, and the separated partition key query is not supported. + * Because a query can only be divided into two parts, part1 get data from cache, part2 fetch_data by scan node from BE. + * Partion cache : 20191211-20191215 + * Hit cache parameter : [20191211 - 20191215], [20191212 - 20191214], [20191212 - 20191216],[20191210 - 20191215] + * Miss cache parameter: [20191210 - 20191216] + * So hit range is full, left or right, not support middle now + */ + public Cache.HitRange buildDiskPartitionRange(List rangeList) { + Cache.HitRange hitRange = Cache.HitRange.None; + if (partitionSingleList.size() == 0) { + return hitRange; + } + int begin = partitionSingleList.size() - 1; + int end = 0; + for (int i = 0; i < partitionSingleList.size(); i++) { + if (!partitionSingleList.get(i).isFromCache()) { + if (begin > i) { + begin = i; + } + if (end < i) { + end = i; + } + } + } + if (end < begin) { + hitRange = Cache.HitRange.Full; + return hitRange; + } + + if (end == partitionSingleList.size() - 1) { + hitRange = Cache.HitRange.Left; + } + if (begin == 0) { + hitRange = Cache.HitRange.Right; + } + + rangeList.add(partitionSingleList.get(begin)); + rangeList.add(partitionSingleList.get(end)); + LOG.info("the new range for scan be is [{},{}], hit range", rangeList.get(0).getCacheKey().realValue(), + rangeList.get(1).getCacheKey().realValue(), hitRange); + return hitRange; + } + + /** + * Gets the partition range that needs to be updated + * @return + */ + public List buildUpdatePartitionRange() { + List updateList = Lists.newArrayList(); + for (PartitionSingle single : partitionSingleList) { + if (!single.isFromCache() && !single.isTooNew()) { + updateList.add(single); + } + } + return updateList; + } + + public boolean rewritePredicate(CompoundPredicate predicate, List rangeList) { + if (predicate.getOp() != CompoundPredicate.Operator.AND) { + LOG.debug("predicate op {}", predicate.getOp().toString()); + return false; + } + for (Expr expr : predicate.getChildren()) { + if (expr instanceof BinaryPredicate) { + BinaryPredicate binPredicate = (BinaryPredicate) expr; + BinaryPredicate.Operator op = binPredicate.getOp(); + if (binPredicate.getChildren().size() != 2) { + LOG.info("binary predicate children size {}", binPredicate.getChildren().size()); + continue; + } + if (op == BinaryPredicate.Operator.NE) { + LOG.info("binary predicate op {}", op.toString()); + continue; + } + PartitionKeyType key = new PartitionKeyType(); + switch (op) { + case LE: //<= + key.clone(rangeList.get(1).getCacheKey()); + break; + case LT: //< + key.clone(rangeList.get(1).getCacheKey()); + key.add(1); + break; + case GE: //>= + key.clone(rangeList.get(0).getCacheKey()); + break; + case GT: //> + key.clone(rangeList.get(0).getCacheKey()); + key.add(-1); + break; + default: + break; + } + LiteralExpr newLiteral; + if (key.keyType == KeyType.DATE) { + try { + newLiteral = new DateLiteral(key.toString(), Type.DATE); + } catch (Exception e) { + LOG.warn("Date's format is error {},{}", key.toString(), e); + continue; + } + } else if (key.keyType == KeyType.LONG) { + newLiteral = new IntLiteral(key.realValue()); + } else { + LOG.warn("Partition cache not support type {}", key.keyType); + continue; + } + + if (binPredicate.getChild(1) instanceof LiteralExpr) { + binPredicate.removeNode(1); + binPredicate.addChild(newLiteral); + } else if (binPredicate.getChild(0) instanceof LiteralExpr) { + binPredicate.removeNode(0); + binPredicate.setChild(0, newLiteral); + } else { + continue; + } + } else if (expr instanceof InPredicate) { + InPredicate inPredicate = (InPredicate) expr; + if (!inPredicate.isLiteralChildren() || inPredicate.isNotIn()) { + continue; + } + } + } + return true; + } + + /** + * Get partition info from SQL Predicate and OlapTable + * Pair + * PARTITION BY RANGE(`olap_date`) + * ( PARTITION p20200101 VALUES [("20200101"), ("20200102")), + * PARTITION p20200102 VALUES [("20200102"), ("20200103")) ) + */ + private void getTablePartitionList(OlapTable table) { + Map> range = rangePartitionInfo.getIdToRange(false); + for (Map.Entry> entry : range.entrySet()) { + Long partId = entry.getKey(); + for (PartitionSingle single : partitionSingleList) { + if (entry.getValue().contains(single.getPartitionKey())) { + if (single.getPartitionId() == 0) { + single.setPartitionId(partId); + } + } + } + } + + for (PartitionSingle single : partitionSingleList) { + single.setPartition(table.getPartition(single.getPartitionId())); + } + } + + /** + * Get value range of partition column from predicate + */ + private boolean buildPartitionKeyRange(PartitionColumnFilter partitionColumnFilter, + Column partitionColumn) throws AnalysisException { + if (partitionColumnFilter.lowerBound == null || partitionColumnFilter.upperBound == null) { + LOG.info("filter is null"); + return false; + } + PartitionKeyType begin = new PartitionKeyType(); + PartitionKeyType end = new PartitionKeyType(); + begin.init(partitionColumn.getType(), partitionColumnFilter.lowerBound); + end.init(partitionColumn.getType(), partitionColumnFilter.upperBound); + + if (!partitionColumnFilter.lowerBoundInclusive) { + begin.add(1); + } + if (!partitionColumnFilter.upperBoundInclusive) { + end.add(-1); + } + if (begin.realValue() > end.realValue()) { + LOG.info("partition range begin {}, end {}", begin, end); + return false; + } + + if (end.realValue() - begin.realValue() > Config.cache_result_max_row_count) { + LOG.info("partition key range is too large, begin {}, end {}", begin.realValue(), end.realValue()); + return false; + } + + while (begin.realValue() <= end.realValue()) { + PartitionKey key = PartitionKey.createPartitionKey( + Lists.newArrayList(new PartitionValue(begin.toString())), + Lists.newArrayList(partitionColumn)); + PartitionSingle single = new PartitionSingle(); + single.setCacheKey(begin); + single.setPartitionKey(key); + partitionSingleList.add(single); + begin.add(1); + } + return true; + } + + private PartitionColumnFilter createPartitionFilter(CompoundPredicate partitionKeyPredicate, + Column partitionColumn) { + if (partitionKeyPredicate.getOp() != CompoundPredicate.Operator.AND) { + LOG.debug("not and op"); + return null; + } + PartitionColumnFilter partitionColumnFilter = new PartitionColumnFilter(); + ; + for (Expr expr : partitionKeyPredicate.getChildren()) { + if (expr instanceof BinaryPredicate) { + BinaryPredicate binPredicate = (BinaryPredicate) expr; + BinaryPredicate.Operator op = binPredicate.getOp(); + if (binPredicate.getChildren().size() != 2) { + LOG.warn("child size {}", binPredicate.getChildren().size()); + continue; + } + if (binPredicate.getOp() == BinaryPredicate.Operator.NE) { + LOG.debug("not support NE operator"); + continue; + } + Expr slotBinding; + if (binPredicate.getChild(1) instanceof LiteralExpr) { + slotBinding = binPredicate.getChild(1); + } else if (binPredicate.getChild(0) instanceof LiteralExpr) { + slotBinding = binPredicate.getChild(0); + } else { + LOG.debug("not find LiteralExpr"); + continue; + } + + LiteralExpr literal = (LiteralExpr) slotBinding; + switch (op) { + case EQ: //= + partitionColumnFilter.setLowerBound(literal, true); + partitionColumnFilter.setUpperBound(literal, true); + break; + case LE: //<= + partitionColumnFilter.setUpperBound(literal, true); + break; + case LT: //< + partitionColumnFilter.setUpperBound(literal, false); + break; + case GE: //>= + partitionColumnFilter.setLowerBound(literal, true); + + break; + case GT: //> + partitionColumnFilter.setLowerBound(literal, false); + break; + default: + break; + } + } else if (expr instanceof InPredicate) { + InPredicate inPredicate = (InPredicate) expr; + if (!inPredicate.isLiteralChildren() || inPredicate.isNotIn()) { + continue; + } + partitionColumnFilter.setInPredicate(inPredicate); + } + } + return partitionColumnFilter; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/RowBatchBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/RowBatchBuilder.java new file mode 100644 index 00000000000000..e7f3a3afe31871 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/RowBatchBuilder.java @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe.cache; + +import com.google.common.collect.Lists; +import org.apache.doris.analysis.Expr; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Type; +import org.apache.doris.qe.RowBatch; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; + +/** + * According to the query partition range and cache hit, the rowbatch to update the cache is constructed + */ +public class RowBatchBuilder { + private static final Logger LOG = LogManager.getLogger(RowBatchBuilder.class); + + private CacheBeProxy.UpdateCacheRequest updateRequest; + private CacheAnalyzer.CacheMode cacheMode; + private int keyIndex; + private Type keyType; + private HashMap cachePartMap; + private List rowList; + private int batchSize; + private int rowSize; + private int dataSize; + + public int getRowSize() { + return rowSize; + } + + public RowBatchBuilder(CacheAnalyzer.CacheMode model) { + cacheMode = model; + keyIndex = 0; + keyType = Type.INVALID; + rowList = Lists.newArrayList(); + cachePartMap = new HashMap<>(); + batchSize = 0; + rowSize = 0; + dataSize = 0; + } + + public void buildPartitionIndex(ArrayList resultExpr, + List columnLabel, Column partColumn, + List newSingleList) { + if (cacheMode != CacheAnalyzer.CacheMode.Partition) { + return; + } + + for (int i = 0; i < columnLabel.size(); i++) { + if (columnLabel.get(i).equalsIgnoreCase(partColumn.getName())) { + keyType = resultExpr.get(i).getType(); + keyIndex = i; + break; + } + } + if (newSingleList != null) { + for (PartitionRange.PartitionSingle single : newSingleList) { + cachePartMap.put(single.getCacheKey().realValue(), single); + } + } else { + LOG.info("no new partition single list "); + } + } + + public void copyRowData(RowBatch rowBatch) { + batchSize++; + rowSize += rowBatch.getBatch().getRowsSize(); + for (ByteBuffer buf : rowBatch.getBatch().getRows()) { + byte[] bytes = Arrays.copyOfRange(buf.array(), buf.position(), buf.limit()); + dataSize += bytes.length; + rowList.add(bytes); + } + } + + public CacheBeProxy.UpdateCacheRequest buildSqlUpdateRequest(String sql, long partitionKey, long lastVersion, long lastestTime) { + if (updateRequest == null) { + updateRequest = new CacheBeProxy.UpdateCacheRequest(sql); + } + updateRequest.addValue(partitionKey, lastVersion, lastestTime, rowList); + return updateRequest; + } + + public PartitionRange.PartitionKeyType getKeyFromRow(byte[] row, int index, Type type) { + PartitionRange.PartitionKeyType key = new PartitionRange.PartitionKeyType(); + ByteBuffer buf = ByteBuffer.wrap(row); + int len; + for (int i = 0; i <= index; i++) { + len = buf.get(); + if (i < index) { + buf.position(buf.position() + len); + } + if (i == index) { + byte[] content = Arrays.copyOfRange(buf.array(), buf.position(), buf.position() + len); + String str = new String(content); + key.init(type, str.toString()); + } + } + return key; + } + + /** + * Rowbatch split to Row + */ + public CacheBeProxy.UpdateCacheRequest buildPartitionUpdateRequest(String sql) { + if (updateRequest == null) { + updateRequest = new CacheBeProxy.UpdateCacheRequest(sql); + } + HashMap> partRowMap = new HashMap<>(); + List partitionRowList; + PartitionRange.PartitionKeyType cacheKey; + for (byte[] row : rowList) { + cacheKey = getKeyFromRow(row, keyIndex, keyType); + if (!cachePartMap.containsKey(cacheKey.realValue())) { + LOG.info("cant find partition key {}", cacheKey.realValue()); + continue; + } + if (!partRowMap.containsKey(cacheKey.realValue())) { + partitionRowList = Lists.newArrayList(); + partitionRowList.add(row); + partRowMap.put(cacheKey.realValue(), partitionRowList); + } else { + partRowMap.get(cacheKey).add(row); + } + } + + for (HashMap.Entry> entry : partRowMap.entrySet()) { + Long key = entry.getKey(); + PartitionRange.PartitionSingle partition = cachePartMap.get(key); + partitionRowList = entry.getValue(); + updateRequest.addValue(key, partition.getPartition().getVisibleVersion(), + partition.getPartition().getVisibleVersionTime(), partitionRowList); + } + return updateRequest; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/SqlCache.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/SqlCache.java new file mode 100644 index 00000000000000..dd67c6e87e364b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/SqlCache.java @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe.cache; + +import org.apache.doris.analysis.SelectStmt; +import org.apache.doris.common.Status; +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.metric.MetricRepo; +import org.apache.doris.qe.RowBatch; +import org.apache.doris.thrift.TUniqueId; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class SqlCache extends Cache { + private static final Logger LOG = LogManager.getLogger(SqlCache.class); + + public SqlCache(TUniqueId queryId, SelectStmt selectStmt) { + super(queryId, selectStmt); + } + + public void setCacheInfo(CacheAnalyzer.CacheTable latestTable) { + this.latestTable = latestTable; + } + + public CacheProxy.FetchCacheResult getCacheData(Status status) { + CacheProxy.FetchCacheRequest request = new CacheProxy.FetchCacheRequest(selectStmt.toSql()); + request.addParam(latestTable.latestPartitionId, latestTable.latestVersion, + latestTable.latestTime); + CacheProxy.FetchCacheResult cacheResult = proxy.fetchCache(request, 10000, status); + if (status.ok() && cacheResult != null) { + cacheResult.all_count = 1; + MetricRepo.COUNTER_CACHE_HIT_SQL.increase(1L); + hitRange = HitRange.Full; + } + return cacheResult; + } + + public SelectStmt getRewriteStmt() { + return null; + } + + public void copyRowBatch(RowBatch rowBatch) { + if (rowBatchBuilder == null) { + rowBatchBuilder = new RowBatchBuilder(CacheAnalyzer.CacheMode.Sql); + } + rowBatchBuilder.copyRowData(rowBatch); + } + + public void updateCache() { + if (!super.checkRowLimit()) { + return; + } + + CacheBeProxy.UpdateCacheRequest updateRequest = rowBatchBuilder.buildSqlUpdateRequest(selectStmt.toSql(), + latestTable.latestPartitionId, latestTable.latestVersion, latestTable.latestTime); + if (updateRequest.value_count > 0) { + CacheBeProxy proxy = new CacheBeProxy(); + Status status = new Status(); + proxy.updateCache(updateRequest, CacheProxy.UPDATE_TIMEOUT, status); + LOG.info("update cache model {}, queryid {}, sqlkey {}, value count {}, row count {}, data size {}", + CacheAnalyzer.CacheMode.Sql, DebugUtil.printId(queryId), DebugUtil.printId(updateRequest.sql_key), + updateRequest.value_count, updateRequest.row_count, updateRequest.data_size); + } + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/PartitionCacheTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/PartitionCacheTest.java new file mode 100644 index 00000000000000..5b3d2978e46c80 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/PartitionCacheTest.java @@ -0,0 +1,851 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe; + +import org.apache.doris.catalog.Type; +import org.apache.doris.common.Config; +import org.apache.doris.common.util.SqlParserUtils; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.UserException; +import org.apache.doris.thrift.TStorageType; + +import org.apache.doris.qe.ConnectScheduler; +import org.apache.doris.qe.cache.Cache; +import org.apache.doris.qe.cache.CacheCoordinator; +import org.apache.doris.qe.cache.PartitionCache; +import org.apache.doris.qe.cache.PartitionRange; +import org.apache.doris.qe.cache.CacheAnalyzer; +import org.apache.doris.qe.cache.CacheAnalyzer.CacheMode; +import org.apache.doris.qe.cache.RowBatchBuilder; +import org.apache.doris.analysis.StatementBase; +import org.apache.doris.analysis.SqlParser; +import org.apache.doris.analysis.SqlScanner; +import org.apache.doris.analysis.SelectStmt; +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.analysis.TupleId; +import org.apache.doris.analysis.SetPassVar; +import org.apache.doris.planner.ScanNode; +import org.apache.doris.planner.Planner; +import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.planner.OlapScanNode; +import org.apache.doris.system.Backend; +import org.apache.doris.metric.MetricRepo; +import org.apache.doris.service.FrontendOptions; +import org.apache.doris.proto.PUniqueId; +import org.apache.doris.alter.SchemaChangeHandler; +import org.apache.doris.catalog.BrokerMgr; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.KeysType; +import org.apache.doris.catalog.MaterializedIndex; +import org.apache.doris.catalog.MaterializedIndex.IndexState; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.RandomDistributionInfo; +import org.apache.doris.catalog.SinglePartitionInfo; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.PartitionInfo; +import org.apache.doris.catalog.RangePartitionInfo; +import org.apache.doris.catalog.DistributionInfo; +import org.apache.doris.load.Load; +import org.apache.doris.mysql.privilege.PaloAuth; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.mysql.privilege.MockedAuth; +import org.apache.doris.mysql.MysqlChannel; +import org.apache.doris.mysql.MysqlSerializer; +import org.apache.doris.persist.EditLog; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.thrift.TUniqueId; + +import mockit.Mocked; +import mockit.Tested; +import mockit.Injectable; +import mockit.Expectations; +import org.apache.doris.common.jmockit.Deencapsulation; + +import com.google.common.collect.Lists; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.StringReader; +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.List; + +public class PartitionCacheTest { + private static final Logger LOG = LogManager.getLogger(PartitionCacheTest.class); + public static String clusterName = "testCluster"; + public static String dbName = "testDb"; + public static String fullDbName = "testCluster:testDb"; + public static String tableName = "testTbl"; + public static String userName = "testUser"; + + private static ConnectContext context; + + private List newRangeList; + private Cache.HitRange hitRange; + private Analyzer analyzer; + private Database db; + + @Mocked + private PaloAuth auth; + @Mocked + private SystemInfoService service; + @Mocked + private Catalog catalog; + @Mocked + private ConnectContext ctx; + @Mocked + MysqlChannel channel; + @Mocked + ConnectScheduler scheduler; + + @BeforeClass + public static void start() { + MetricRepo.init(); + try { + FrontendOptions.init(); + context = new ConnectContext(null); + Config.cache_enable_sql_mode = true; + Config.cache_enable_partition_mode = true; + context.getSessionVariable().setEnableSqlCache(true); + context.getSessionVariable().setEnablePartitionCache(true); + Config.cache_last_version_interval_second = 7200; + } catch (UnknownHostException e) { + e.printStackTrace(); + } + } + + @Before + public void setUp() { + MockedAuth.mockedAuth(auth); + MockedAuth.mockedConnectContext(ctx, "root", "192.168.1.1"); + + db = new Database(1L, fullDbName); + + OlapTable tbl1 = createOrderTable(); + OlapTable tbl2 = createProfileTable(); + OlapTable tbl3 = createEventTable(); + db.createTable(tbl1); + db.createTable(tbl2); + db.createTable(tbl3); + + new Expectations(catalog) { + { + catalog.getAuth(); + minTimes = 0; + result = auth; + + Deencapsulation.invoke(Catalog.class, "getCurrentSystemInfo"); + minTimes = 0; + result = service; + + catalog.getDb(fullDbName); + minTimes = 0; + result = db; + + catalog.getDb(dbName); + minTimes = 0; + result = db; + + catalog.getDb(db.getId()); + minTimes = 0; + result = db; + + catalog.getDbNames(); + minTimes = 0; + result = Lists.newArrayList(fullDbName); + } + }; + + QueryState state = new QueryState(); + channel.reset(); + + new Expectations(ctx) { + { + ctx.getMysqlChannel(); + minTimes = 0; + result = channel; + + ctx.getClusterName(); + minTimes = 0; + result = clusterName; + + ctx.getSerializer(); + minTimes = 0; + result = MysqlSerializer.newInstance(); + + ctx.getCatalog(); + minTimes = 0; + result = catalog; + + ctx.getState(); + minTimes = 0; + result = state; + + ctx.getConnectScheduler(); + minTimes = 0; + result = scheduler; + + ctx.getConnectionId(); + minTimes = 0; + result = 1; + + ctx.getQualifiedUser(); + minTimes = 0; + result = userName; + + ctx.getForwardedStmtId(); + minTimes = 0; + result = 123L; + + ctx.setKilled(); + minTimes = 0; + ctx.updateReturnRows(anyInt); + minTimes = 0; + ctx.setQueryId((TUniqueId) any); + minTimes = 0; + + ctx.queryId(); + minTimes = 0; + result = new TUniqueId(); + + ctx.getStartTime(); + minTimes = 0; + result = 0L; + + ctx.getDatabase(); + minTimes = 0; + result = dbName; + + SessionVariable sessionVariable = new SessionVariable(); + ctx.getSessionVariable(); + minTimes = 0; + result = sessionVariable; + + ctx.setStmtId(anyLong); + minTimes = 0; + + ctx.getStmtId(); + minTimes = 0; + result = 1L; + } + }; + + analyzer = new Analyzer(catalog, ctx); + newRangeList = Lists.newArrayList(); + } + + private void test1() { + new Expectations(catalog) { + { + catalog.getAuth(); + result = auth; + } + }; + } + + private OlapTable createOrderTable() { + Column column1 = new Column("date", ScalarType.INT); + Column column2 = new Column("id", ScalarType.INT); + Column column3 = new Column("value", ScalarType.INT); + List columns = Lists.newArrayList(column1, column2, column3); + + MaterializedIndex baseIndex = new MaterializedIndex(10001, IndexState.NORMAL); + RandomDistributionInfo distInfo = new RandomDistributionInfo(10); + + PartitionInfo partInfo = new RangePartitionInfo(Lists.newArrayList(column1)); + + Partition part12 = new Partition(20200112, "p20200112", baseIndex, distInfo); + part12.setVisibleVersion(1,1578762000000L,1); //2020-01-12 1:00:00 + Partition part13 = new Partition(20200113, "p20200113", baseIndex, distInfo); + part13.setVisibleVersion(1,1578848400000L,1); //2020-01-13 1:00:00 + Partition part14 = new Partition(20200114, "p20200114", baseIndex, distInfo); + part14.setVisibleVersion(1,1578934800000L,1); //2020-01-14 1:00:00 + Partition part15 = new Partition(20200115, "p20200115", baseIndex, distInfo); + part15.setVisibleVersion(2,1579053661000L,2); //2020-01-15 10:01:01 + + OlapTable table = new OlapTable(10000L, "order", columns,KeysType.DUP_KEYS, partInfo, distInfo); + + short shortKeyColumnCount = 1; + table.setIndexMeta(10001, "group1", columns, 1, 1, shortKeyColumnCount,TStorageType.COLUMN, KeysType.DUP_KEYS); + + List idx_columns = Lists.newArrayList(); + idx_columns.add(column1); + table.setIndexMeta(new Long(1), "test", idx_columns, 1, 1, shortKeyColumnCount, TStorageType.COLUMN, KeysType.DUP_KEYS); + Deencapsulation.setField(table, "baseIndexId", 1000); + + table.addPartition(part12); + table.addPartition(part13); + table.addPartition(part14); + table.addPartition(part15); + + return table; + } + + private ScanNode createOrderScanNode() { + OlapTable table = createOrderTable(); + TupleDescriptor desc = new TupleDescriptor(new TupleId(10004)); + desc.setTable(table); + ScanNode node = new OlapScanNode(new PlanNodeId(10008), desc, "ordernode"); + return node; + } + + private OlapTable createProfileTable() { + Column column2 = new Column("eventdate", ScalarType.DATE); + Column column3 = new Column("userid", ScalarType.INT); + Column column4 = new Column("country", ScalarType.INT); + List columns = Lists.newArrayList(column2, column3, column4); + + MaterializedIndex baseIndex = new MaterializedIndex(20001, IndexState.NORMAL); + RandomDistributionInfo distInfo = new RandomDistributionInfo(10); + + PartitionInfo partInfo = new RangePartitionInfo(Lists.newArrayList(column2)); + + Partition part12 = new Partition(2020112, "p20200112", baseIndex, distInfo); + part12.setVisibleVersion(1,1578762000000L,1); //2020-01-12 1:00:00 + Partition part13 = new Partition(2020113, "p20200113", baseIndex, distInfo); + part13.setVisibleVersion(1,1578848400000L,1); //2020-01-13 1:00:00 + Partition part14 = new Partition(2020114, "p20200114", baseIndex, distInfo); + part14.setVisibleVersion(1,1578934800000L,1); //2020-01-14 1:00:00 + Partition part15 = new Partition(2020115, "p20200115", baseIndex, distInfo); + part15.setVisibleVersion(2,1579021200000L,2); //2020-01-15 1:00:00 + + OlapTable table = new OlapTable(20000L, "userprofile", columns,KeysType.AGG_KEYS, partInfo, distInfo); + + short shortKeyColumnCount = 1; + table.setIndexMeta(20001, "group1", columns, 1, 1, shortKeyColumnCount, TStorageType.COLUMN, KeysType.AGG_KEYS); + + List idx_columns = Lists.newArrayList(); + idx_columns.add(column2); + table.setIndexMeta(new Long(2), "test", idx_columns, 1, 1, shortKeyColumnCount, TStorageType.COLUMN, KeysType.AGG_KEYS); + + Deencapsulation.setField(table, "baseIndexId", 1000); + + table.addPartition(part12); + table.addPartition(part13); + table.addPartition(part14); + table.addPartition(part15); + + return table; + } + + private ScanNode createProfileScanNode(){ + OlapTable table = createProfileTable(); + TupleDescriptor desc = new TupleDescriptor(new TupleId(20004)); + desc.setTable(table); + ScanNode node = new OlapScanNode(new PlanNodeId(20008), desc, "userprofilenode"); + return node; + } + + /** + * table appevent(date(pk), userid, eventid, eventtime), stream load every 5 miniutes + */ + private OlapTable createEventTable() { + Column column1 = new Column("eventdate", ScalarType.DATE); + Column column2 = new Column("userid", ScalarType.INT); + Column column3 = new Column("eventid", ScalarType.INT); + Column column4 = new Column("eventtime", ScalarType.DATETIME); + List columns = Lists.newArrayList(column1, column2, column3,column4); + PartitionInfo partInfo = new RangePartitionInfo(Lists.newArrayList(column1)); + MaterializedIndex baseIndex = new MaterializedIndex(30001, IndexState.NORMAL); + RandomDistributionInfo distInfo = new RandomDistributionInfo(10); + + Partition part12 = new Partition(20200112, "p20200112", baseIndex, distInfo); + part12.setVisibleVersion(1,1578762000000L,1); //2020-01-12 1:00:00 + Partition part13 = new Partition(20200113, "p20200113", baseIndex, distInfo); + part13.setVisibleVersion(1,1578848400000L,1); //2020-01-13 1:00:00 + Partition part14 = new Partition(20200114, "p20200114", baseIndex, distInfo); + part14.setVisibleVersion(1,1578934800000L,1); //2020-01-14 1:00:00 + Partition part15 = new Partition(20200115, "p20200115", baseIndex, distInfo); + part15.setVisibleVersion(2,1579053661000L,2); //2020-01-15 10:01:01 + + OlapTable table = new OlapTable(30000L, "appevent", columns,KeysType.DUP_KEYS, partInfo, distInfo); + + short shortKeyColumnCount = 1; + table.setIndexMeta(30001, "group1", columns, 1,1,shortKeyColumnCount,TStorageType.COLUMN, KeysType.AGG_KEYS); + + List column = Lists.newArrayList(); + column.add(column1); + + table.setIndexMeta(new Long(2), "test", column, 1, 1, shortKeyColumnCount, TStorageType.COLUMN, KeysType.AGG_KEYS); + Deencapsulation.setField(table, "baseIndexId", 1000); + + table.addPartition(part12); + table.addPartition(part13); + table.addPartition(part14); + table.addPartition(part15); + + return table; + } + + private ScanNode createEventScanNode(){ + OlapTable table = createEventTable(); + TupleDescriptor desc = new TupleDescriptor(new TupleId(30002)); + desc.setTable(table); + ScanNode node = new OlapScanNode(new PlanNodeId(30004), desc, "appeventnode"); + return node; + } + + private StatementBase parseSql(String sql){ + SqlParser parser = new SqlParser(new SqlScanner(new StringReader(sql))); + StatementBase parseStmt = null; + try { + parseStmt = SqlParserUtils.getFirstStmt(parser); + parseStmt.analyze(analyzer); + } catch (AnalysisException e) { + LOG.warn("Part,an_ex={}", e); + Assert.fail(e.getMessage()); + } catch (UserException e) { + LOG.warn("Part,ue_ex={}", e); + Assert.fail(e.getMessage()); + } catch (Exception e) { + LOG.warn("Part,cm_ex={}", e); + Assert.fail(e.getMessage()); + } + return parseStmt; + } + + @Test + public void testCacheNode() throws Exception { + CacheCoordinator cp = CacheCoordinator.getInstance(); + cp.DebugModel = true; + Backend bd1 = new Backend(1, "", 1000); + bd1.updateOnce(0,0,0); + Backend bd2 = new Backend(2, "", 2000); + bd2.updateOnce(0,0,0); + Backend bd3 = new Backend(3, "", 3000); + bd3.updateOnce(0,0,0); + cp.addBackend(bd1); + cp.addBackend(bd2); + cp.addBackend(bd3); + + PUniqueId key1 = new PUniqueId(); + key1.hi = 1L; + key1.lo = 1L; + Backend bk = cp.findBackend(key1); + Assert.assertNotNull(bk); + Assert.assertEquals(bk.getId(),3); + + key1.hi = 669560558156283345L; + key1.lo = 1L; + bk = cp.findBackend(key1); + Assert.assertNotNull(bk); + Assert.assertEquals(bk.getId(),1); + } + + @Test + public void testCacheModeNone() throws Exception { + StatementBase parseStmt = parseSql("select @@version_comment limit 1"); + List scanNodes = Lists.newArrayList(); + CacheAnalyzer ca = new CacheAnalyzer(context, parseStmt, scanNodes); + ca.checkCacheMode(0); + Assert.assertEquals(ca.getCacheMode(), CacheMode.NoNeed); + } + + @Test + public void testCacheModeTable() throws Exception { + StatementBase parseStmt = parseSql( + "SELECT country, COUNT(userid) FROM userprofile GROUP BY country" + ); + List scanNodes = Lists.newArrayList(createProfileScanNode()); + CacheAnalyzer ca = new CacheAnalyzer(context,parseStmt, scanNodes); + ca.checkCacheMode(0); + Assert.assertEquals(ca.getCacheMode(), CacheMode.Sql); + } + + @Test + public void testWithinMinTime() throws Exception { + StatementBase parseStmt = parseSql( + "SELECT country, COUNT(userid) FROM userprofile GROUP BY country" + ); + List scanNodes = Lists.newArrayList(createProfileScanNode()); + CacheAnalyzer ca = new CacheAnalyzer(context,parseStmt, scanNodes); + ca.checkCacheMode(1579024800000L); //2020-1-15 02:00:00 + Assert.assertEquals(ca.getCacheMode(), CacheMode.None); + } + + @Test + public void testPartitionModel() throws Exception { + StatementBase parseStmt = parseSql( + "SELECT eventdate, COUNT(DISTINCT userid) FROM appevent WHERE eventdate>=\"2020-01-12\" and " + + "eventdate<=\"2020-01-15\" GROUP BY eventdate" + ); + List scanNodes = Lists.newArrayList(createEventScanNode()); + CacheAnalyzer ca = new CacheAnalyzer(context,parseStmt, scanNodes); + ca.checkCacheMode(1579053661000L); //2020-1-15 10:01:01 + Assert.assertEquals(ca.getCacheMode(), CacheMode.Partition); + } + + @Test + public void testParseByte() throws Exception { + RowBatchBuilder sb = new RowBatchBuilder(CacheMode.Partition); + byte[] buffer = new byte[]{10, 50, 48, 50, 48, 45, 48, 51, 45, 49, 48, 1, 51, 2, 67, 78}; + PartitionRange.PartitionKeyType key1 = sb.getKeyFromRow(buffer, 0, Type.DATE); + LOG.info("real value key1 {}",key1.realValue()); + Assert.assertEquals(key1.realValue(), 20200310); + PartitionRange.PartitionKeyType key2 = sb.getKeyFromRow(buffer, 1, Type.INT); + LOG.info("real value key2 {}",key2.realValue()); + Assert.assertEquals(key2.realValue(), 3); + } + + @Test + public void testPartitionIntTypeSql() throws Exception { + StatementBase parseStmt = parseSql( + "SELECT `date`, COUNT(id) FROM `order` WHERE `date`>=20200112 and `date`<=20200115 GROUP BY date" + ); + List scanNodes = Lists.newArrayList(createOrderScanNode()); + CacheAnalyzer ca = new CacheAnalyzer(context,parseStmt, scanNodes); + ca.checkCacheMode(1579053661000L); //2020-1-15 10:01:01 + Assert.assertEquals(ca.getCacheMode(), CacheMode.Partition); //assert cache model first + try { + PartitionCache cache = (PartitionCache) ca.getCache(); + cache.rewriteSelectStmt(null); + Assert.assertEquals(cache.getNokeyStmt().getWhereClause(), null); + + PartitionRange range = cache.getPartitionRange(); + boolean flag = range.analytics(); + Assert.assertEquals(flag, true); + + int size = range.getPartitionSingleList().size(); + LOG.warn("Rewrite partition range size={}", size); + Assert.assertEquals(size, 4); + + String sql; + range.setCacheFlag(20200112L); //get data from cache + range.setCacheFlag(20200113L); //get data from cache + + hitRange = range.buildDiskPartitionRange(newRangeList); + Assert.assertEquals(hitRange, Cache.HitRange.Left); + Assert.assertEquals(newRangeList.size(), 2); + Assert.assertEquals(newRangeList.get(0).getCacheKey().realValue(), 20200114); + Assert.assertEquals(newRangeList.get(1).getCacheKey().realValue(), 20200115); + + cache.rewriteSelectStmt(newRangeList); + sql = ca.getRewriteStmt().getWhereClause().toSql(); + Assert.assertEquals(sql, "(`date` >= 20200114) AND (`date` <= 20200115)"); + } catch (Exception e) { + LOG.warn("ex={}", e); + Assert.fail(e.getMessage()); + } + } + + @Test + public void testSimpleCacheSql() throws Exception { + StatementBase parseStmt = parseSql( + "SELECT eventdate, COUNT(userid) FROM appevent WHERE eventdate>=\"2020-01-12\" and eventdate<=\"2020-01-15\" GROUP BY eventdate" + ); + List scanNodes = Lists.newArrayList(createEventScanNode()); + CacheAnalyzer ca = new CacheAnalyzer(context,parseStmt, scanNodes); + ca.checkCacheMode(1579053661000L); //2020-1-15 10:01:01 + Assert.assertEquals(ca.getCacheMode(), CacheMode.Partition); //assert cache model first + SelectStmt selectStmt = (SelectStmt) parseStmt; + + try{ + PartitionCache cache = (PartitionCache) ca.getCache(); + cache.rewriteSelectStmt(null); + Assert.assertEquals(cache.getNokeyStmt().getWhereClause(),null); + + PartitionRange range = cache.getPartitionRange(); + boolean flag = range.analytics(); + Assert.assertEquals(flag,true); + + int size = range.getPartitionSingleList().size(); + LOG.warn("Rewrite partition range size={}", size); + Assert.assertEquals(size, 4); + + String sql; + range.setCacheFlag(20200112L); //get data from cache + range.setCacheFlag(20200113L); //get data from cache + + hitRange = range.buildDiskPartitionRange(newRangeList); + cache.rewriteSelectStmt(newRangeList); + sql = ca.getRewriteStmt().getWhereClause().toSql(); + Assert.assertEquals(sql,"(`eventdate` >= '2020-01-14') AND (`eventdate` <= '2020-01-15')"); + } catch(Exception e){ + LOG.warn("ex={}",e); + Assert.fail(e.getMessage()); + } + } + + @Test + public void testHitPartPartition() throws Exception { + StatementBase parseStmt = parseSql( + "SELECT eventdate, COUNT(userid) FROM appevent WHERE eventdate>=\"2020-01-12\" and eventdate<=\"2020-01-14\" GROUP BY eventdate" + ); + List scanNodes = Lists.newArrayList(createEventScanNode()); + CacheAnalyzer ca = new CacheAnalyzer(context, parseStmt, scanNodes); + ca.checkCacheMode(1579053661000L); //2020-1-15 10:01:01 + Assert.assertEquals(ca.getCacheMode(), CacheMode.Partition); //assert cache model first + + try { + PartitionCache cache = (PartitionCache) ca.getCache(); + + cache.rewriteSelectStmt(null); + Assert.assertEquals(cache.getNokeyStmt().getWhereClause(), null); + + PartitionRange range = cache.getPartitionRange(); + boolean flag = range.analytics(); + Assert.assertEquals(flag, true); + + int size = range.getPartitionSingleList().size(); + LOG.warn("Rewrite partition range size={}", size); + Assert.assertEquals(size, 3); + + String sql; + range.setCacheFlag(20200113); + range.setCacheFlag(20200114); + + hitRange = range.buildDiskPartitionRange(newRangeList); + Assert.assertEquals(hitRange,Cache.HitRange.Right); + Assert.assertEquals(newRangeList.size(), 2); + Assert.assertEquals(newRangeList.get(0).getCacheKey().realValue(), 20200112); + Assert.assertEquals(newRangeList.get(1).getCacheKey().realValue(), 20200112); + + List updateRangeList = range.buildUpdatePartitionRange(); + Assert.assertEquals(updateRangeList.size(), 1); + Assert.assertEquals(updateRangeList.get(0).getCacheKey().realValue(), 20200112); + } catch (Exception e) { + LOG.warn("ex={}", e); + Assert.fail(e.getMessage()); + } + } + + @Test + public void testNoUpdatePartition() throws Exception { + StatementBase parseStmt = parseSql( + "SELECT eventdate, COUNT(userid) FROM appevent WHERE eventdate>=\"2020-01-12\" and eventdate<=\"2020-01-14\" GROUP BY eventdate" + ); + List scanNodes = Lists.newArrayList(createEventScanNode()); + CacheAnalyzer ca = new CacheAnalyzer(context, parseStmt, scanNodes); + ca.checkCacheMode(1579053661000L); //2020-1-15 10:01:01 + Assert.assertEquals(ca.getCacheMode(), CacheMode.Partition); //assert cache model first + + try { + PartitionCache cache = (PartitionCache) ca.getCache(); + + cache.rewriteSelectStmt(null); + Assert.assertEquals(cache.getNokeyStmt().getWhereClause(), null); + + PartitionRange range = cache.getPartitionRange(); + boolean flag = range.analytics(); + Assert.assertEquals(flag, true); + + int size = range.getPartitionSingleList().size(); + LOG.warn("Rewrite partition range size={}", size); + Assert.assertEquals(size, 3); + + String sql; + range.setCacheFlag(20200112); //get data from cache + range.setCacheFlag(20200113); + range.setCacheFlag(20200114); + + hitRange = range.buildDiskPartitionRange(newRangeList); + Assert.assertEquals(hitRange, Cache.HitRange.Full); + Assert.assertEquals(newRangeList.size(), 0); + } catch (Exception e) { + LOG.warn("ex={}", e); + Assert.fail(e.getMessage()); + } + } + + + @Test + public void testUpdatePartition() throws Exception { + StatementBase parseStmt = parseSql( + "SELECT eventdate, COUNT(userid) FROM appevent WHERE eventdate>=\"2020-01-12\" and eventdate<=\"2020-01-15\" GROUP BY eventdate" + ); + List scanNodes = Lists.newArrayList(createEventScanNode()); + CacheAnalyzer ca = new CacheAnalyzer(context,parseStmt, scanNodes); + ca.checkCacheMode(1579053661000L); //2020-1-15 10:01:01 + Assert.assertEquals(ca.getCacheMode(), CacheMode.Partition); //assert cache model first + + try { + PartitionCache cache = (PartitionCache) ca.getCache(); + + cache.rewriteSelectStmt(null); + Assert.assertEquals(cache.getNokeyStmt().getWhereClause(), null); + + PartitionRange range = cache.getPartitionRange(); + boolean flag = range.analytics(); + Assert.assertEquals(flag, true); + + int size = range.getPartitionSingleList().size(); + LOG.warn("Rewrite partition range size={}", size); + Assert.assertEquals(size, 4); + + String sql; + range.setCacheFlag(20200112L); //get data from cache + range.setTooNewByKey(20200115); + + range.buildDiskPartitionRange(newRangeList); + Assert.assertEquals(newRangeList.size(), 2); + cache.rewriteSelectStmt(newRangeList); + + sql = ca.getRewriteStmt().getWhereClause().toSql(); + Assert.assertEquals(sql, "(`eventdate` >= '2020-01-13') AND (`eventdate` <= '2020-01-15')"); + + List updateRangeList = range.buildUpdatePartitionRange(); + Assert.assertEquals(updateRangeList.size(), 2); + Assert.assertEquals(updateRangeList.get(0).getCacheKey().realValue(), 20200113); + Assert.assertEquals(updateRangeList.get(1).getCacheKey().realValue(), 20200114); + } catch (Exception e) { + LOG.warn("ex={}", e); + Assert.fail(e.getMessage()); + } + } + + @Test + public void testRewriteMultiPredicate1() throws Exception { + StatementBase parseStmt = parseSql( + "SELECT eventdate, COUNT(userid) FROM appevent WHERE eventdate>\"2020-01-11\" and eventdate<\"2020-01-16\"" + + " and eventid=1 GROUP BY eventdate" + ); + List scanNodes = Lists.newArrayList(createEventScanNode()); + CacheAnalyzer ca = new CacheAnalyzer(context,parseStmt, scanNodes); + ca.checkCacheMode(1579053661000L); //2020-1-15 10:01:01 + Assert.assertEquals(ca.getCacheMode(), CacheMode.Partition); //assert cache model first + try{ + PartitionCache cache = (PartitionCache) ca.getCache(); + + cache.rewriteSelectStmt(null); + LOG.warn("Nokey multi={}", cache.getNokeyStmt().getWhereClause().toSql()); + Assert.assertEquals(cache.getNokeyStmt().getWhereClause().toSql(),"`eventid` = 1"); + + PartitionRange range = cache.getPartitionRange(); + boolean flag = range.analytics(); + Assert.assertEquals(flag,true); + + int size = range.getPartitionSingleList().size(); + Assert.assertEquals(size, 4); + + String sql; + range.setCacheFlag(20200112L); //get data from cache + range.setCacheFlag(20200113L); //get data from cache + + range.buildDiskPartitionRange(newRangeList); + + cache.rewriteSelectStmt(newRangeList); + sql = ca.getRewriteStmt().getWhereClause().toSql(); + LOG.warn("MultiPredicate={}", sql); + Assert.assertEquals(sql,"((`eventdate` > '2020-01-13') AND (`eventdate` < '2020-01-16')) AND (`eventid` = 1)"); + } catch(Exception e){ + LOG.warn("multi ex={}",e); + Assert.fail(e.getMessage()); + } + } + + @Test + public void testRewriteJoin() throws Exception { + StatementBase parseStmt = parseSql( + "SELECT appevent.eventdate, country, COUNT(appevent.userid) FROM appevent" + + " INNER JOIN userprofile ON appevent.userid = userprofile.userid" + + " WHERE appevent.eventdate>=\"2020-01-12\" and appevent.eventdate<=\"2020-01-15\"" + + " and eventid=1 GROUP BY appevent.eventdate, country" + ); + List scanNodes = Lists.newArrayList(createEventScanNode()); + CacheAnalyzer ca = new CacheAnalyzer(context,parseStmt, scanNodes); + ca.checkCacheMode(1579053661000L); //2020-1-15 10:01:01 + Assert.assertEquals(ca.getCacheMode(), CacheMode.Partition); //assert cache model first + try{ + PartitionCache cache = (PartitionCache) ca.getCache(); + cache.rewriteSelectStmt(null); + LOG.warn("Join nokey={}", cache.getNokeyStmt().getWhereClause().toSql()); + Assert.assertEquals(cache.getNokeyStmt().getWhereClause().toSql(),"`eventid` = 1"); + + PartitionRange range = cache.getPartitionRange(); + boolean flag = range.analytics(); + Assert.assertEquals(flag,true); + + int size = range.getPartitionSingleList().size(); + Assert.assertEquals(size, 4); + + String sql; + range.setCacheFlag(20200112L); //get data from cache + range.setCacheFlag(20200113L); //get data from cache + + range.buildDiskPartitionRange(newRangeList); + + cache.rewriteSelectStmt(newRangeList); + sql = ca.getRewriteStmt().getWhereClause().toSql(); + LOG.warn("Join rewrite={}", sql); + Assert.assertEquals(sql,"((`appevent`.`eventdate` >= '2020-01-14')" + + " AND (`appevent`.`eventdate` <= '2020-01-15')) AND (`eventid` = 1)"); + } catch(Exception e){ + LOG.warn("Join ex={}",e); + Assert.fail(e.getMessage()); + } + } + + @Test + public void testSubSelect() throws Exception { + StatementBase parseStmt = parseSql( + "SELECT eventdate, sum(pv) FROM (SELECT eventdate, COUNT(userid) AS pv FROM appevent WHERE eventdate>\"2020-01-11\" AND eventdate<\"2020-01-16\"" + + " AND eventid=1 GROUP BY eventdate) tbl GROUP BY eventdate" + ); + List scanNodes = Lists.newArrayList(createEventScanNode()); + CacheAnalyzer ca = new CacheAnalyzer(context,parseStmt, scanNodes); + ca.checkCacheMode(1579053661000L); //2020-1-15 10:01:01 + Assert.assertEquals(ca.getCacheMode(), CacheMode.Partition); //assert cache model first + try{ + PartitionCache cache = (PartitionCache) ca.getCache(); + + cache.rewriteSelectStmt(null); + LOG.warn("Sub nokey={}", cache.getNokeyStmt().toSql()); + Assert.assertEquals(cache.getNokeyStmt().toSql(),"SELECT `eventdate` AS `eventdate`, sum(`pv`) AS `sum(``pv``)` FROM (" + + "SELECT `eventdate` AS `eventdate`, count(`userid`) AS `pv` FROM `testCluster:testDb`.`appevent` WHERE `eventid` = 1" + + " GROUP BY `eventdate`) tbl GROUP BY `eventdate`"); + + PartitionRange range = cache.getPartitionRange(); + boolean flag = range.analytics(); + Assert.assertEquals(flag,true); + + int size = range.getPartitionSingleList().size(); + Assert.assertEquals(size, 4); + + String sql; + range.setCacheFlag(20200112L); //get data from cache + range.setCacheFlag(20200113L); //get data from cache + + range.buildDiskPartitionRange(newRangeList); + + cache.rewriteSelectStmt(newRangeList); + sql = ca.getRewriteStmt().toSql(); + LOG.warn("Sub rewrite={}", sql); + Assert.assertEquals(sql,"SELECT `eventdate` AS `eventdate`, sum(`pv`) AS `sum(``pv``)` FROM (" + + "SELECT `eventdate` AS `eventdate`, count(`userid`) AS `pv` FROM `testCluster:testDb`.`appevent` WHERE " + + "((`eventdate` > '2020-01-13') AND (`eventdate` < '2020-01-16')) AND (`eventid` = 1) GROUP BY `eventdate`) tbl GROUP BY `eventdate`"); + } catch(Exception e){ + LOG.warn("sub ex={}",e); + Assert.fail(e.getMessage()); + } + } +} +