From ea2f7df3a3d3f5f1f0faf92071f9e532ec512bba Mon Sep 17 00:00:00 2001 From: morningman Date: Thu, 3 Sep 2020 15:20:17 +0800 Subject: [PATCH 1/3] [Bug] Fix bug that query meta fields has been sent twice --- .../org/apache/doris/qe/StmtExecutor.java | 72 ++++++++++++------- 1 file changed, 48 insertions(+), 24 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 86331081529311..4884699959c82b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -589,41 +589,62 @@ private void handleSetStmt() { context.getState().setOk(); } - private void sendChannel(MysqlChannel channel, List cacheValues, boolean isEos) + // send values from cache. + // return true if the meta fields has been sent, otherwise, return false. + // the meta fields must be sent right before the first batch of data(or eos flag). + // so if it has data(or eos is true), this method must return true. + private boolean sendCachedValues(MysqlChannel channel, List cacheValues, + SelectStmt selectStmt, boolean isSendFields, boolean isEos) throws Exception { RowBatch batch = null; + boolean isSend = isSendFields; for (CacheBeProxy.CacheValue value : cacheValues) { batch = value.getRowBatch(); + if (!isSend) { + // send meta fields before sending first data batch. + sendFields(selectStmt.getColLabels(), selectStmt.getResultExprs()); + isSend = true; + } for (ByteBuffer row : batch.getBatch().getRows()) { channel.sendOnePacket(row); } context.updateReturnRows(batch.getBatch().getRows().size()); } + if (isEos) { if (batch != null) { statisticsForAuditLog = batch.getQueryStatistics(); } + if (!isSend) { + sendFields(selectStmt.getColLabels(), selectStmt.getResultExprs()); + isSend = true; + } context.getState().setEof(); - return; } + return isSend; } - private boolean handleCacheStmt(CacheAnalyzer cacheAnalyzer,MysqlChannel channel) throws Exception { + /** + * Handle the SelectStmt via Cache. + */ + private void handleCacheStmt(CacheAnalyzer cacheAnalyzer, MysqlChannel channel, SelectStmt selectStmt) throws Exception { RowBatch batch = null; CacheBeProxy.FetchCacheResult cacheResult = cacheAnalyzer.getCacheData(); CacheMode mode = cacheAnalyzer.getCacheMode(); + SelectStmt newSelectStmt = selectStmt; + boolean isSendFields = false; if (cacheResult != null) { isCached = true; if (cacheAnalyzer.getHitRange() == Cache.HitRange.Full) { - sendChannel(channel, cacheResult.getValueList(), true); - return true; + sendCachedValues(channel, cacheResult.getValueList(), newSelectStmt, isSendFields, true); + return; } - //rewrite sql + // rewrite sql if (mode == CacheMode.Partition) { if (cacheAnalyzer.getHitRange() == Cache.HitRange.Left) { - sendChannel(channel, cacheResult.getValueList(), false); + isSendFields = sendCachedValues(channel, cacheResult.getValueList(), newSelectStmt, isSendFields, false); } - SelectStmt newSelectStmt = cacheAnalyzer.getRewriteStmt(); + newSelectStmt = cacheAnalyzer.getRewriteStmt(); newSelectStmt.reset(); analyzer = new Analyzer(context.getCatalog(), context); newSelectStmt.analyze(analyzer); @@ -652,13 +673,19 @@ private boolean handleCacheStmt(CacheAnalyzer cacheAnalyzer,MysqlChannel channel } if (cacheResult != null && cacheAnalyzer.getHitRange() == Cache.HitRange.Right) { - sendChannel(channel, cacheResult.getValueList(), false); + isSendFields = sendCachedValues(channel, cacheResult.getValueList(), newSelectStmt, isSendFields, false); } cacheAnalyzer.updateCache(); + + if (!isSendFields) { + sendFields(newSelectStmt.getColLabels(), newSelectStmt.getResultExprs()); + isSendFields = true; + } + statisticsForAuditLog = batch.getQueryStatistics(); context.getState().setEof(); - return false; + return; } // Process a select statement. @@ -682,6 +709,17 @@ private void handleQueryStmt() throws Exception { return; } + RowBatch batch; + MysqlChannel channel = context.getMysqlChannel(); + boolean isOutfileQuery = queryStmt.hasOutFileClause(); + + // Sql and PartitionCache + CacheAnalyzer cacheAnalyzer = new CacheAnalyzer(context, parsedStmt, planner); + if (cacheAnalyzer.enableCache() && !isOutfileQuery && queryStmt instanceof SelectStmt) { + handleCacheStmt(cacheAnalyzer, channel, (SelectStmt) queryStmt); + return; + } + // send result // 1. If this is a query with OUTFILE clause, eg: select * from tbl1 into outfile xxx, // We will not send real query result to client. Instead, we only send OK to client with @@ -690,21 +728,7 @@ private void handleQueryStmt() throws Exception { // Query OK, 10 rows affected (0.01 sec) // // 2. If this is a query, send the result expr fields first, and send result data back to client. - RowBatch batch; - MysqlChannel channel = context.getMysqlChannel(); - boolean isOutfileQuery = queryStmt.hasOutFileClause(); boolean isSendFields = false; - if (!isOutfileQuery) { - sendFields(queryStmt.getColLabels(), queryStmt.getResultExprs()); - } - - //Sql and PartitionCache - CacheAnalyzer cacheAnalyzer = new CacheAnalyzer(context, parsedStmt, planner); - if (cacheAnalyzer.enableCache()) { - handleCacheStmt(cacheAnalyzer, channel); - return; - } - coord = new Coordinator(context, analyzer, planner); QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord)); From 263d8465faffba54c608decbecaf2355a0219f0c Mon Sep 17 00:00:00 2001 From: morningman Date: Thu, 3 Sep 2020 15:46:51 +0800 Subject: [PATCH 2/3] add logs --- .../doris/common/util/ProfileManager.java | 6 +-- .../org/apache/doris/qe/StmtExecutor.java | 4 ++ .../apache/doris/qe/cache/CacheAnalyzer.java | 37 +++++++++++-------- 3 files changed, 28 insertions(+), 19 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java index d1d621d45de73e..11e3ef36fbd7b0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java @@ -21,8 +21,8 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.Arrays; @@ -59,8 +59,8 @@ public class ProfileManager { public static final String SQL_STATEMENT = "Sql Statement"; public static final String USER = "User"; public static final String DEFAULT_DB = "Default Db"; - public static final String IS_CACHED = "IsCached"; - + public static final String IS_CACHED = "Is Cached"; + public static final ArrayList PROFILE_HEADERS = new ArrayList( Arrays.asList(QUERY_ID, USER, DEFAULT_DB, SQL_STATEMENT, QUERY_TYPE, START_TIME, END_TIME, TOTAL_TIME, QUERY_STATE)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 4884699959c82b..5674661774dba0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -662,6 +662,10 @@ private void handleCacheStmt(CacheAnalyzer cacheAnalyzer, MysqlChannel channel, batch = coord.getNext(); if (batch.getBatch() != null) { cacheAnalyzer.copyRowBatch(batch); + if (!isSendFields) { + sendFields(newSelectStmt.getColLabels(), newSelectStmt.getResultExprs()); + isSendFields = true; + } for (ByteBuffer row : batch.getBatch().getRows()) { channel.sendOnePacket(row); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java index 2f2fba993f69c8..32e287846bcccb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java @@ -28,11 +28,13 @@ import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.StatementBase; import org.apache.doris.analysis.TableRef; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.OlapTable; -import org.apache.doris.catalog.RangePartitionInfo; -import org.apache.doris.catalog.PartitionType; import org.apache.doris.catalog.Partition; -import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.PartitionType; +import org.apache.doris.catalog.RangePartitionInfo; +import org.apache.doris.common.Config; +import org.apache.doris.common.Status; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.metric.MetricRepo; import org.apache.doris.planner.OlapScanNode; @@ -40,11 +42,10 @@ import org.apache.doris.planner.ScanNode; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.RowBatch; -import org.apache.doris.common.Config; -import org.apache.doris.common.Status; +import org.apache.doris.thrift.TUniqueId; import com.google.common.collect.Lists; -import org.apache.doris.thrift.TUniqueId; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -179,9 +180,11 @@ public void checkCacheMode(long now) { private CacheMode innerCheckCacheMode(long now) { if (!enableCache()) { + LOG.debug("cache is disabled. queryid {}", DebugUtil.printId(queryId)); return CacheMode.NoNeed; } if (!(parsedStmt instanceof SelectStmt) || scanNodes.size() == 0) { + LOG.debug("not a select stmt or no scan node. queryid {}", DebugUtil.printId(queryId)); return CacheMode.NoNeed; } MetricRepo.COUNTER_QUERY_TABLE.increase(1L); @@ -192,6 +195,7 @@ private CacheMode innerCheckCacheMode(long now) { for (int i = 0; i < scanNodes.size(); i++) { ScanNode node = scanNodes.get(i); if (!(node instanceof OlapScanNode)) { + LOG.debug("query contains non-olap table. queryid {}", DebugUtil.printId(queryId)); return CacheMode.None; } OlapScanNode oNode = (OlapScanNode) node; @@ -217,6 +221,7 @@ private CacheMode innerCheckCacheMode(long now) { } if (!enablePartitionCache()) { + LOG.debug("partition query cache is disabled. queryid {}", DebugUtil.printId(queryId)); return CacheMode.None; } @@ -224,33 +229,34 @@ private CacheMode innerCheckCacheMode(long now) { //Only one table can be updated in Config.cache_last_version_interval_second range for (int i = 1; i < tblTimeList.size(); i++) { if ((now - tblTimeList.get(i).latestTime) < Config.cache_last_version_interval_second * 1000) { - LOG.info("the time of other tables is newer than {}", Config.cache_last_version_interval_second); + LOG.debug("the time of other tables is newer than {} s, queryid {}", + Config.cache_last_version_interval_second, DebugUtil.printId(queryId)); return CacheMode.None; } } olapTable = latestTable.olapTable; if (olapTable.getPartitionInfo().getType() != PartitionType.RANGE) { - LOG.debug("the partition of OlapTable not RANGE type"); + LOG.debug("the partition of OlapTable not RANGE type, queryid {}", DebugUtil.printId(queryId)); return CacheMode.None; } partitionInfo = (RangePartitionInfo) olapTable.getPartitionInfo(); List columns = partitionInfo.getPartitionColumns(); //Partition key has only one column if (columns.size() != 1) { - LOG.info("the size of columns for partition key is {}", columns.size()); + LOG.debug("more than one partition column, queryid {}", columns.size(), DebugUtil.printId(queryId)); return CacheMode.None; } partColumn = columns.get(0); //Check if group expr contain partition column if (!checkGroupByPartitionKey(this.selectStmt, partColumn)) { - LOG.info("not group by partition key, key {}", partColumn.getName()); + LOG.debug("group by columns does not contains all partition column, queryid {}", DebugUtil.printId(queryId)); return CacheMode.None; } //Check if whereClause have one CompoundPredicate of partition column List compoundPredicates = Lists.newArrayList(); getPartitionKeyFromSelectStmt(this.selectStmt, partColumn, compoundPredicates); if (compoundPredicates.size() != 1) { - LOG.info("the predicate size include partition key has {}", compoundPredicates.size()); + LOG.debug("empty or more than one predicates contain partition column, queryid {}", DebugUtil.printId(queryId)); return CacheMode.None; } partitionPredicate = compoundPredicates.get(0); @@ -265,22 +271,21 @@ public CacheBeProxy.FetchCacheResult getCacheData() { CacheProxy.FetchCacheResult cacheResult = null; cacheMode = innerCheckCacheMode(0); if (cacheMode == CacheMode.NoNeed) { - return cacheResult; + return null; } if (cacheMode == CacheMode.None) { - LOG.info("check cache mode {}, queryid {}", cacheMode, DebugUtil.printId(queryId)); - return cacheResult; + return null; } Status status = new Status(); cacheResult = cache.getCacheData(status); if (status.ok() && cacheResult != null) { - LOG.info("hit cache, mode {}, queryid {}, all count {}, value count {}, row count {}, data size {}", + LOG.debug("hit cache, mode {}, queryid {}, all count {}, value count {}, row count {}, data size {}", cacheMode, DebugUtil.printId(queryId), cacheResult.all_count, cacheResult.value_count, cacheResult.row_count, cacheResult.data_size); } else { - LOG.info("miss cache, mode {}, queryid {}, code {}, msg {}", cacheMode, + LOG.debug("miss cache, mode {}, queryid {}, code {}, msg {}", cacheMode, DebugUtil.printId(queryId), status.getErrorCode(), status.getErrorMsg()); cacheResult = null; } From c81df79b80abf6dd32c66675eae5236d191411b2 Mon Sep 17 00:00:00 2001 From: morningman Date: Thu, 3 Sep 2020 16:35:06 +0800 Subject: [PATCH 3/3] fix ut bug --- .../src/main/java/org/apache/doris/common/util/DebugUtil.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugUtil.java index 9348447aea2027..012a5ee3e2b13e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugUtil.java @@ -120,6 +120,9 @@ public static Pair getByteUint(long value) { } public static String printId(final TUniqueId id) { + if (id == null) { + return ""; + } StringBuilder builder = new StringBuilder(); builder.append(Long.toHexString(id.hi)).append("-").append(Long.toHexString(id.lo)); return builder.toString();