Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions fe/src/com/baidu/palo/analysis/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -809,8 +809,8 @@ public void createAuxEquivPredicate(Expr lhs, Expr rhs) {
// create an eq predicate between lhs and rhs
BinaryPredicate p = new BinaryPredicate(BinaryPredicate.Operator.EQ, lhs, rhs);
p.setIsAuxExpr();
if (LOG.isTraceEnabled()) {
LOG.trace("register equiv predicate: " + p.toSql() + " " + p.debugString());
if (LOG.isDebugEnabled()) {
LOG.debug("register equiv predicate: " + p.toSql() + " " + p.debugString());
}
registerConjunct(p);
}
Expand Down
53 changes: 34 additions & 19 deletions fe/src/com/baidu/palo/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -281,16 +281,15 @@ public class Catalog {

private MetaReplayState metaReplayState;

// TODO(zc):
private PullLoadJobMgr pullLoadJobMgr;
private BrokerMgr brokerMgr;

public List<Frontend> getFrontends() {
return frontends;
return Lists.newArrayList(frontends);
}

public List<Frontend> getRemovedFrontends() {
return removedFrontends;
return Lists.newArrayList(removedFrontends);
}

public JournalObservable getJournalObservable() {
Expand Down Expand Up @@ -505,10 +504,18 @@ private void getClusterIdAndRole() throws IOException {
System.exit(-1);
}

// ATTN:
// If the version file and role file does not exist and the helper node is itself,
// this should be the very beginning startup of the cluster, so we create ROLE and VERSION file,
// set isFirstTimeStartUp to true, and add itself to frontends list.
// If ROLE and VERSION file is deleted for some reason, we may arbitrarily start this node as
// FOLLOWER, which may cause UNDEFINED behavior.
// Everything may be OK if the origin role is exactly FOLLOWER,
// but if not, FE process will exit somehow.
Storage storage = new Storage(IMAGE_DIR);
if (!roleFile.exists()) {
// The very first time to start the first node of the cluster.
// It should be one REPLICA node.
// It should became a Master node (Master node's role is also FOLLOWER, which means electable)
storage.writeFrontendRole(FrontendNodeType.FOLLOWER);
role = FrontendNodeType.FOLLOWER;
} else {
Expand All @@ -525,18 +532,12 @@ private void getClusterIdAndRole() throws IOException {
Storage.newToken() : Config.auth_token;
storage = new Storage(clusterId, token, IMAGE_DIR);
storage.writeClusterIdAndToken();
// If the version file and role file does not exist and the
// helper node is itself,
// it must be the very beginning startup of the cluster

isFirstTimeStartUp = true;
Frontend self = new Frontend(role, selfNode.first, selfNode.second);
// In normal case, checkFeExist will return null.
// However, if version file and role file are deleted by some
// reason,
// the self node may already added to the frontends list.
if (checkFeExist(selfNode.first, selfNode.second) == null) {
frontends.add(self);
}
// We don't need to check if frontends collection already contains self.
// frontends collection must be empty cause no image is loaded and no journal is replayed yet.
frontends.add(self);
} else {
clusterId = storage.getClusterID();
if (storage.getToken() == null) {
Expand Down Expand Up @@ -800,7 +801,7 @@ private void transferToNonMaster() {
canWrite = false;
isMaster = false;

// donot set canRead
// do not set canRead
// let canRead be what it was

if (formerFeType == FrontendNodeType.OBSERVER && feType == FrontendNodeType.UNKNOWN) {
Expand Down Expand Up @@ -1026,7 +1027,7 @@ public long loadFrontends(DataInputStream dis, long checksum) throws IOException
long newChecksum = checksum ^ size;
for (int i = 0; i < size; i++) {
Frontend fe = Frontend.read(dis);
frontends.add(fe);
addFrontendWithCheck(fe);
}

size = dis.readInt();
Expand Down Expand Up @@ -3679,11 +3680,25 @@ public void replayDeleteReplica(ReplicaPersistInfo info) {
}
}

public void replayAddFrontend(Frontend fe) {
public void addFrontendWithCheck(Frontend fe) {
writeLock();
try {
if (checkFeExist(fe.getHost(), fe.getPort()) != null) {
LOG.warn("Fe {} already exist.", fe);
Frontend existFe = checkFeExist(fe.getHost(), fe.getPort());
if (existFe != null) {
LOG.warn("fe {} already exist.", existFe);
if (existFe.getRole() != fe.getRole()) {
/*
* This may happen if:
* 1. first, add a FE as OBSERVER.
* 2. This OBSERVER is restarted with ROLE and VERSION file being DELETED.
* In this case, this OBSERVER will be started as a FOLLOWER, and add itself to the frontends.
* 3. this "FOLLOWER" begin to load image or replay journal,
* then find the origin OBSERVER in image or journal.
* This will cause UNDEFINED behavior, so it is better to exit and fix it manually.
*/
LOG.error("Try to add an already exist FE with different role: {}", fe.getRole());
System.exit(-1);
}
return;
}
frontends.add(fe);
Expand Down
2 changes: 1 addition & 1 deletion fe/src/com/baidu/palo/persist/EditLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ public static void loadJournal(Catalog catalog, JournalEntity journal) {
case OperationType.OP_ADD_FIRST_FRONTEND:
case OperationType.OP_ADD_FRONTEND: {
Frontend fe = (Frontend) journal.getData();
catalog.replayAddFrontend(fe);
catalog.addFrontendWithCheck(fe);
break;
}
case OperationType.OP_REMOVE_FRONTEND: {
Expand Down
123 changes: 65 additions & 58 deletions fe/src/com/baidu/palo/planner/OlapScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -67,6 +68,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* Full scan of an Olap table.
Expand Down Expand Up @@ -157,57 +159,58 @@ public void finalize(Analyzer analyzer) throws InternalException {
// }

private List<MaterializedIndex> selectRollupIndex(Partition partition) throws InternalException {
ArrayList<MaterializedIndex> containTupleIndices = Lists.newArrayList();

if (olapTable.getKeysType() == KeysType.DUP_KEYS) {
isPreAggregation = true;
}

// 4.1 find table has tuple column
List<MaterializedIndex> allIndices = Lists.newArrayList();
allIndices.add(partition.getBaseIndex());
allIndices.addAll(partition.getRollupIndices());
LOG.debug("rollup size={} isPreAggregation={}", allIndices.size(), isPreAggregation);
LOG.debug("num of rollup(base included): {}, pre aggr: {}", allIndices.size(), isPreAggregation);

// 1. find all rollup indexes which contains all tuple columns
List<MaterializedIndex> containTupleIndexes = Lists.newArrayList();
List<Column> baseIndexKeyColumns = olapTable.getKeyColumnsByIndexId(partition.getBaseIndex().getId());
for (MaterializedIndex index : allIndices) {
LOG.debug("index id = " + index.getId());
HashSet<String> indexColumns = new HashSet<String>();
Set<String> indexColNames = Sets.newHashSet();
for (Column col : olapTable.getSchemaByIndexId(index.getId())) {
indexColumns.add(col.getName());
indexColNames.add(col.getName());
}

if (indexColumns.containsAll(tupleColumns)) {
if (indexColNames.containsAll(tupleColumns)) {
// If preAggregation is off, so that we only can use base table
// or those rollup tables whose key columns is the same with base table
// or those rollup tables which key columns is the same with base table
// (often in different order)
if (isPreAggregation) {
containTupleIndices.add(index);
LOG.debug("preAggregation is on. add index {} which contains all tuple columns", index.getId());
containTupleIndexes.add(index);
} else if (olapTable.getKeyColumnsByIndexId(index.getId()).size() == baseIndexKeyColumns.size()) {
LOG.debug("preAggregation is off, but index id (" + index.getId()
+ ") have same key columns with base index.");
containTupleIndices.add(index);
LOG.debug("preAggregation is off, but index {} have same key columns with base index.",
index.getId());
containTupleIndexes.add(index);
}
} else {
LOG.debug("exclude index {} because it does not contain all tuple columns", index.getId());
}
}

if (containTupleIndices.isEmpty()) {
if (containTupleIndexes.isEmpty()) {
throw new InternalException("Failed to select index, no match index");
}

// 4.2 find table match index
ArrayList<MaterializedIndex> predicateIndexMatchIndices = new ArrayList<MaterializedIndex>();
int maxIndexMatchCount = 0;
int indexMatchCount = 0;
for (MaterializedIndex index : containTupleIndices) {
LOG.debug("containTupleIndex: " + index.getId());
indexMatchCount = 0;
// 2. find all indexes which match the prefix most based on predicate/sort/in predicate columns
// from containTupleIndices.
List<MaterializedIndex> prefixMatchedIndexes = Lists.newArrayList();
int maxPrefixMatchCount = 0;
int prefixMatchCount = 0;
for (MaterializedIndex index : containTupleIndexes) {
prefixMatchCount = 0;
for (Column col : olapTable.getSchemaByIndexId(index.getId())) {
if (sortColumn != null) {
if (inPredicateColumns.contains(col.getName())) {
indexMatchCount++;
prefixMatchCount++;
} else if (sortColumn.equals(col.getName())) {
indexMatchCount++;
prefixMatchCount++;
break;
} else {
break;
Expand All @@ -218,65 +221,69 @@ private List<MaterializedIndex> selectRollupIndex(Partition partition) throws In
}
}
}
if (indexMatchCount == maxIndexMatchCount) {
predicateIndexMatchIndices.add(index);
} else if (indexMatchCount > maxIndexMatchCount) {
maxIndexMatchCount = indexMatchCount;
predicateIndexMatchIndices.clear();
predicateIndexMatchIndices.add(index);
if (prefixMatchCount == maxPrefixMatchCount) {
LOG.debug("s2: find a equal prefix match index {}. match count: {}", index.getId(), prefixMatchCount);
prefixMatchedIndexes.add(index);
} else if (prefixMatchCount > maxPrefixMatchCount) {
LOG.debug("s2: find a better prefix match index {}. match count: {}", index.getId(), prefixMatchCount);
maxPrefixMatchCount = prefixMatchCount;
prefixMatchedIndexes.clear();
prefixMatchedIndexes.add(index);
}
}

ArrayList<MaterializedIndex> eqJoinIndexMatchIndices = new ArrayList<MaterializedIndex>();
maxIndexMatchCount = 0;
indexMatchCount = 0;
for (MaterializedIndex index : containTupleIndices) {
indexMatchCount = 0;
// 3. find all indexes which match the prefix most based on equal join columns
// from containTupleIndices.
List<MaterializedIndex> eqJoinPrefixMatchedIndexes = Lists.newArrayList();
maxPrefixMatchCount = 0;
for (MaterializedIndex index : containTupleIndexes) {
prefixMatchCount = 0;
for (Column col : olapTable.getSchemaByIndexId(index.getId())) {
if (eqJoinColumns.contains(col.getName()) || predicateColumns.contains(col.getName())) {
indexMatchCount++;
prefixMatchCount++;
} else {
break;
}
}
if (indexMatchCount == maxIndexMatchCount) {
eqJoinIndexMatchIndices.add(index);
} else if (indexMatchCount > maxIndexMatchCount) {
maxIndexMatchCount = indexMatchCount;
eqJoinIndexMatchIndices.clear();
eqJoinIndexMatchIndices.add(index);
if (prefixMatchCount == maxPrefixMatchCount) {
LOG.debug("s3: find a equal prefix match index {}. match count: {}", index.getId(), prefixMatchCount);
eqJoinPrefixMatchedIndexes.add(index);
} else if (prefixMatchCount > maxPrefixMatchCount) {
LOG.debug("s3: find a better prefix match index {}. match count: {}", index.getId(), prefixMatchCount);
maxPrefixMatchCount = prefixMatchCount;
eqJoinPrefixMatchedIndexes.clear();
eqJoinPrefixMatchedIndexes.add(index);
}
}

ArrayList<MaterializedIndex> indexMatchIndices = new ArrayList<MaterializedIndex>();
for (MaterializedIndex index : predicateIndexMatchIndices) {
LOG.debug("predicateIndexMatchIndex: " + index.getId());
for (MaterializedIndex oneIndex : eqJoinIndexMatchIndices) {
// 4. find the intersection of prefixMatchIndices and eqJoinPrefixMatchIndices as candidate indexes
List<MaterializedIndex> finalCandidateIndexes = Lists.newArrayList();
for (MaterializedIndex index : prefixMatchedIndexes) {
for (MaterializedIndex oneIndex : eqJoinPrefixMatchedIndexes) {
if (oneIndex.getId() == index.getId()) {
indexMatchIndices.add(index);
LOG.debug("Add indexMatchId: " + index.getId());
finalCandidateIndexes.add(index);
LOG.debug("find a matched index {} in intersection of "
+ "prefixMatchIndices and eqJoinPrefixMatchIndices",
index.getId());
}
}
}

if (indexMatchIndices.isEmpty()) {
indexMatchIndices = predicateIndexMatchIndices;
}

// 4.3 return all the candidate index
List<MaterializedIndex> selectedIndex = new ArrayList<MaterializedIndex>();
for (MaterializedIndex table : indexMatchIndices) {
selectedIndex.add(table);
// maybe there is no intersection between prefixMatchIndices and eqJoinPrefixMatchIndices.
// in this case, use prefixMatchIndices;
if (finalCandidateIndexes.isEmpty()) {
finalCandidateIndexes = prefixMatchedIndexes;
}

Collections.sort(selectedIndex, new Comparator<MaterializedIndex>() {
// 5. sorted the final candidate indexes by index id
// this is to make sure that candidate indexes find in all partitions will be returned in same order
Collections.sort(finalCandidateIndexes, new Comparator<MaterializedIndex>() {
@Override
public int compare(MaterializedIndex index1, MaterializedIndex index2)
{
return (int) (index1.getId() - index2.getId());
}
});
return selectedIndex;
return finalCandidateIndexes;
}

private void normalizePredicate(Analyzer analyzer) throws InternalException {
Expand Down