From a189c93efc7309c74895eb28ba9ff1f84d01b786 Mon Sep 17 00:00:00 2001 From: MingyuChen Date: Sun, 1 Oct 2017 18:09:45 +0800 Subject: [PATCH] fix add frontend problem, to make it more safe reorganize the debug log of selecting rollup --- fe/src/com/baidu/palo/analysis/Analyzer.java | 4 +- fe/src/com/baidu/palo/catalog/Catalog.java | 53 +++++--- fe/src/com/baidu/palo/persist/EditLog.java | 2 +- .../com/baidu/palo/planner/OlapScanNode.java | 123 +++++++++--------- 4 files changed, 102 insertions(+), 80 deletions(-) diff --git a/fe/src/com/baidu/palo/analysis/Analyzer.java b/fe/src/com/baidu/palo/analysis/Analyzer.java index bb65eff488aa78..591fdeb71b1da0 100644 --- a/fe/src/com/baidu/palo/analysis/Analyzer.java +++ b/fe/src/com/baidu/palo/analysis/Analyzer.java @@ -809,8 +809,8 @@ public void createAuxEquivPredicate(Expr lhs, Expr rhs) { // create an eq predicate between lhs and rhs BinaryPredicate p = new BinaryPredicate(BinaryPredicate.Operator.EQ, lhs, rhs); p.setIsAuxExpr(); - if (LOG.isTraceEnabled()) { - LOG.trace("register equiv predicate: " + p.toSql() + " " + p.debugString()); + if (LOG.isDebugEnabled()) { + LOG.debug("register equiv predicate: " + p.toSql() + " " + p.debugString()); } registerConjunct(p); } diff --git a/fe/src/com/baidu/palo/catalog/Catalog.java b/fe/src/com/baidu/palo/catalog/Catalog.java index a5720e3781f393..f555b89408884f 100644 --- a/fe/src/com/baidu/palo/catalog/Catalog.java +++ b/fe/src/com/baidu/palo/catalog/Catalog.java @@ -281,16 +281,15 @@ public class Catalog { private MetaReplayState metaReplayState; - // TODO(zc): private PullLoadJobMgr pullLoadJobMgr; private BrokerMgr brokerMgr; public List getFrontends() { - return frontends; + return Lists.newArrayList(frontends); } public List getRemovedFrontends() { - return removedFrontends; + return Lists.newArrayList(removedFrontends); } public JournalObservable getJournalObservable() { @@ -505,10 +504,18 @@ private void getClusterIdAndRole() throws IOException { System.exit(-1); } + // ATTN: + // If the version file and role file does not exist and the helper node is itself, + // this should be the very beginning startup of the cluster, so we create ROLE and VERSION file, + // set isFirstTimeStartUp to true, and add itself to frontends list. + // If ROLE and VERSION file is deleted for some reason, we may arbitrarily start this node as + // FOLLOWER, which may cause UNDEFINED behavior. + // Everything may be OK if the origin role is exactly FOLLOWER, + // but if not, FE process will exit somehow. Storage storage = new Storage(IMAGE_DIR); if (!roleFile.exists()) { // The very first time to start the first node of the cluster. - // It should be one REPLICA node. + // It should became a Master node (Master node's role is also FOLLOWER, which means electable) storage.writeFrontendRole(FrontendNodeType.FOLLOWER); role = FrontendNodeType.FOLLOWER; } else { @@ -525,18 +532,12 @@ private void getClusterIdAndRole() throws IOException { Storage.newToken() : Config.auth_token; storage = new Storage(clusterId, token, IMAGE_DIR); storage.writeClusterIdAndToken(); - // If the version file and role file does not exist and the - // helper node is itself, - // it must be the very beginning startup of the cluster + isFirstTimeStartUp = true; Frontend self = new Frontend(role, selfNode.first, selfNode.second); - // In normal case, checkFeExist will return null. - // However, if version file and role file are deleted by some - // reason, - // the self node may already added to the frontends list. - if (checkFeExist(selfNode.first, selfNode.second) == null) { - frontends.add(self); - } + // We don't need to check if frontends collection already contains self. + // frontends collection must be empty cause no image is loaded and no journal is replayed yet. + frontends.add(self); } else { clusterId = storage.getClusterID(); if (storage.getToken() == null) { @@ -800,7 +801,7 @@ private void transferToNonMaster() { canWrite = false; isMaster = false; - // donot set canRead + // do not set canRead // let canRead be what it was if (formerFeType == FrontendNodeType.OBSERVER && feType == FrontendNodeType.UNKNOWN) { @@ -1026,7 +1027,7 @@ public long loadFrontends(DataInputStream dis, long checksum) throws IOException long newChecksum = checksum ^ size; for (int i = 0; i < size; i++) { Frontend fe = Frontend.read(dis); - frontends.add(fe); + addFrontendWithCheck(fe); } size = dis.readInt(); @@ -3679,11 +3680,25 @@ public void replayDeleteReplica(ReplicaPersistInfo info) { } } - public void replayAddFrontend(Frontend fe) { + public void addFrontendWithCheck(Frontend fe) { writeLock(); try { - if (checkFeExist(fe.getHost(), fe.getPort()) != null) { - LOG.warn("Fe {} already exist.", fe); + Frontend existFe = checkFeExist(fe.getHost(), fe.getPort()); + if (existFe != null) { + LOG.warn("fe {} already exist.", existFe); + if (existFe.getRole() != fe.getRole()) { + /* + * This may happen if: + * 1. first, add a FE as OBSERVER. + * 2. This OBSERVER is restarted with ROLE and VERSION file being DELETED. + * In this case, this OBSERVER will be started as a FOLLOWER, and add itself to the frontends. + * 3. this "FOLLOWER" begin to load image or replay journal, + * then find the origin OBSERVER in image or journal. + * This will cause UNDEFINED behavior, so it is better to exit and fix it manually. + */ + LOG.error("Try to add an already exist FE with different role: {}", fe.getRole()); + System.exit(-1); + } return; } frontends.add(fe); diff --git a/fe/src/com/baidu/palo/persist/EditLog.java b/fe/src/com/baidu/palo/persist/EditLog.java index 87c05623ee0174..de33531111b36b 100644 --- a/fe/src/com/baidu/palo/persist/EditLog.java +++ b/fe/src/com/baidu/palo/persist/EditLog.java @@ -403,7 +403,7 @@ public static void loadJournal(Catalog catalog, JournalEntity journal) { case OperationType.OP_ADD_FIRST_FRONTEND: case OperationType.OP_ADD_FRONTEND: { Frontend fe = (Frontend) journal.getData(); - catalog.replayAddFrontend(fe); + catalog.addFrontendWithCheck(fe); break; } case OperationType.OP_REMOVE_FRONTEND: { diff --git a/fe/src/com/baidu/palo/planner/OlapScanNode.java b/fe/src/com/baidu/palo/planner/OlapScanNode.java index 3ef4037a64f981..6b1a9ee5eac00e 100644 --- a/fe/src/com/baidu/palo/planner/OlapScanNode.java +++ b/fe/src/com/baidu/palo/planner/OlapScanNode.java @@ -56,6 +56,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Range; +import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -67,6 +68,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; /** * Full scan of an Olap table. @@ -157,57 +159,58 @@ public void finalize(Analyzer analyzer) throws InternalException { // } private List selectRollupIndex(Partition partition) throws InternalException { - ArrayList containTupleIndices = Lists.newArrayList(); - if (olapTable.getKeysType() == KeysType.DUP_KEYS) { isPreAggregation = true; } - // 4.1 find table has tuple column List allIndices = Lists.newArrayList(); allIndices.add(partition.getBaseIndex()); allIndices.addAll(partition.getRollupIndices()); - LOG.debug("rollup size={} isPreAggregation={}", allIndices.size(), isPreAggregation); + LOG.debug("num of rollup(base included): {}, pre aggr: {}", allIndices.size(), isPreAggregation); + // 1. find all rollup indexes which contains all tuple columns + List containTupleIndexes = Lists.newArrayList(); List baseIndexKeyColumns = olapTable.getKeyColumnsByIndexId(partition.getBaseIndex().getId()); for (MaterializedIndex index : allIndices) { - LOG.debug("index id = " + index.getId()); - HashSet indexColumns = new HashSet(); + Set indexColNames = Sets.newHashSet(); for (Column col : olapTable.getSchemaByIndexId(index.getId())) { - indexColumns.add(col.getName()); + indexColNames.add(col.getName()); } - if (indexColumns.containsAll(tupleColumns)) { + if (indexColNames.containsAll(tupleColumns)) { // If preAggregation is off, so that we only can use base table - // or those rollup tables whose key columns is the same with base table + // or those rollup tables which key columns is the same with base table // (often in different order) if (isPreAggregation) { - containTupleIndices.add(index); + LOG.debug("preAggregation is on. add index {} which contains all tuple columns", index.getId()); + containTupleIndexes.add(index); } else if (olapTable.getKeyColumnsByIndexId(index.getId()).size() == baseIndexKeyColumns.size()) { - LOG.debug("preAggregation is off, but index id (" + index.getId() - + ") have same key columns with base index."); - containTupleIndices.add(index); + LOG.debug("preAggregation is off, but index {} have same key columns with base index.", + index.getId()); + containTupleIndexes.add(index); } + } else { + LOG.debug("exclude index {} because it does not contain all tuple columns", index.getId()); } } - if (containTupleIndices.isEmpty()) { + if (containTupleIndexes.isEmpty()) { throw new InternalException("Failed to select index, no match index"); } - // 4.2 find table match index - ArrayList predicateIndexMatchIndices = new ArrayList(); - int maxIndexMatchCount = 0; - int indexMatchCount = 0; - for (MaterializedIndex index : containTupleIndices) { - LOG.debug("containTupleIndex: " + index.getId()); - indexMatchCount = 0; + // 2. find all indexes which match the prefix most based on predicate/sort/in predicate columns + // from containTupleIndices. + List prefixMatchedIndexes = Lists.newArrayList(); + int maxPrefixMatchCount = 0; + int prefixMatchCount = 0; + for (MaterializedIndex index : containTupleIndexes) { + prefixMatchCount = 0; for (Column col : olapTable.getSchemaByIndexId(index.getId())) { if (sortColumn != null) { if (inPredicateColumns.contains(col.getName())) { - indexMatchCount++; + prefixMatchCount++; } else if (sortColumn.equals(col.getName())) { - indexMatchCount++; + prefixMatchCount++; break; } else { break; @@ -218,65 +221,69 @@ private List selectRollupIndex(Partition partition) throws In } } } - if (indexMatchCount == maxIndexMatchCount) { - predicateIndexMatchIndices.add(index); - } else if (indexMatchCount > maxIndexMatchCount) { - maxIndexMatchCount = indexMatchCount; - predicateIndexMatchIndices.clear(); - predicateIndexMatchIndices.add(index); + if (prefixMatchCount == maxPrefixMatchCount) { + LOG.debug("s2: find a equal prefix match index {}. match count: {}", index.getId(), prefixMatchCount); + prefixMatchedIndexes.add(index); + } else if (prefixMatchCount > maxPrefixMatchCount) { + LOG.debug("s2: find a better prefix match index {}. match count: {}", index.getId(), prefixMatchCount); + maxPrefixMatchCount = prefixMatchCount; + prefixMatchedIndexes.clear(); + prefixMatchedIndexes.add(index); } } - ArrayList eqJoinIndexMatchIndices = new ArrayList(); - maxIndexMatchCount = 0; - indexMatchCount = 0; - for (MaterializedIndex index : containTupleIndices) { - indexMatchCount = 0; + // 3. find all indexes which match the prefix most based on equal join columns + // from containTupleIndices. + List eqJoinPrefixMatchedIndexes = Lists.newArrayList(); + maxPrefixMatchCount = 0; + for (MaterializedIndex index : containTupleIndexes) { + prefixMatchCount = 0; for (Column col : olapTable.getSchemaByIndexId(index.getId())) { if (eqJoinColumns.contains(col.getName()) || predicateColumns.contains(col.getName())) { - indexMatchCount++; + prefixMatchCount++; } else { break; } } - if (indexMatchCount == maxIndexMatchCount) { - eqJoinIndexMatchIndices.add(index); - } else if (indexMatchCount > maxIndexMatchCount) { - maxIndexMatchCount = indexMatchCount; - eqJoinIndexMatchIndices.clear(); - eqJoinIndexMatchIndices.add(index); + if (prefixMatchCount == maxPrefixMatchCount) { + LOG.debug("s3: find a equal prefix match index {}. match count: {}", index.getId(), prefixMatchCount); + eqJoinPrefixMatchedIndexes.add(index); + } else if (prefixMatchCount > maxPrefixMatchCount) { + LOG.debug("s3: find a better prefix match index {}. match count: {}", index.getId(), prefixMatchCount); + maxPrefixMatchCount = prefixMatchCount; + eqJoinPrefixMatchedIndexes.clear(); + eqJoinPrefixMatchedIndexes.add(index); } } - ArrayList indexMatchIndices = new ArrayList(); - for (MaterializedIndex index : predicateIndexMatchIndices) { - LOG.debug("predicateIndexMatchIndex: " + index.getId()); - for (MaterializedIndex oneIndex : eqJoinIndexMatchIndices) { + // 4. find the intersection of prefixMatchIndices and eqJoinPrefixMatchIndices as candidate indexes + List finalCandidateIndexes = Lists.newArrayList(); + for (MaterializedIndex index : prefixMatchedIndexes) { + for (MaterializedIndex oneIndex : eqJoinPrefixMatchedIndexes) { if (oneIndex.getId() == index.getId()) { - indexMatchIndices.add(index); - LOG.debug("Add indexMatchId: " + index.getId()); + finalCandidateIndexes.add(index); + LOG.debug("find a matched index {} in intersection of " + + "prefixMatchIndices and eqJoinPrefixMatchIndices", + index.getId()); } } } - - if (indexMatchIndices.isEmpty()) { - indexMatchIndices = predicateIndexMatchIndices; - } - - // 4.3 return all the candidate index - List selectedIndex = new ArrayList(); - for (MaterializedIndex table : indexMatchIndices) { - selectedIndex.add(table); + // maybe there is no intersection between prefixMatchIndices and eqJoinPrefixMatchIndices. + // in this case, use prefixMatchIndices; + if (finalCandidateIndexes.isEmpty()) { + finalCandidateIndexes = prefixMatchedIndexes; } - Collections.sort(selectedIndex, new Comparator() { + // 5. sorted the final candidate indexes by index id + // this is to make sure that candidate indexes find in all partitions will be returned in same order + Collections.sort(finalCandidateIndexes, new Comparator() { @Override public int compare(MaterializedIndex index1, MaterializedIndex index2) { return (int) (index1.getId() - index2.getId()); } }); - return selectedIndex; + return finalCandidateIndexes; } private void normalizePredicate(Analyzer analyzer) throws InternalException {