Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 33 additions & 32 deletions fe/src/com/baidu/palo/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,8 @@ public class Catalog {
private String metaDir;
private EditLog editLog;
private int clusterId;
private long replayedJournalId; // For checkpoint and observer memory
// replayed marker
// For checkpoint and observer memory replayed marker
private AtomicLong replayedJournalId;

private static Catalog CHECKPOINT = null;
private static long checkpointThreadId = -1;
Expand Down Expand Up @@ -329,7 +329,7 @@ private Catalog() {

this.canWrite = false;
this.canRead = false;
this.replayedJournalId = 0;
this.replayedJournalId = new AtomicLong(0L);
this.isMaster = false;
this.isElectable = false;
this.synchronizedTimeMs = 0;
Expand Down Expand Up @@ -464,14 +464,15 @@ public void initialize(String[] args) throws Exception {
}
}

// 3. get cluster id and role (Observer or Replica)
// 3. get cluster id and role (Observer or Follower)
getClusterIdAndRole();

// 4. Load image first and replay edits
this.editLog = new EditLog();
loadImage(IMAGE_DIR); // load image file
editLog.open(); // open bdb env or local output stream
this.userPropertyMgr.setEditLog(editLog);

// 5. start load label cleaner thread
createCleaner();
cleaner.setName("labelCleaner");
Expand Down Expand Up @@ -532,10 +533,9 @@ private void getClusterIdAndRole() throws IOException {
frontends.add(self);
}
} else {
storage = new Storage(IMAGE_DIR);
clusterId = storage.getClusterID();
}
} else {
} else {
// Designate one helper node. Get the roll and version info
// from the helper node
Storage storage = null;
Expand Down Expand Up @@ -566,19 +566,19 @@ private void getClusterIdAndRole() throws IOException {
storage.writeFrontendRole(role);
}
if (!versionFile.exists()) {
// If the version file doesn't exist, download it from helper
// node
// If the version file doesn't exist, download it from helper node
if (!getVersionFile()) {
LOG.error("fail to download version file from " + helperNode.first + " will exit.");
System.exit(-1);
}

// NOTE: cluster_id will be init when Storage object is constructed,
// so we new one.
storage = new Storage(IMAGE_DIR);
clusterId = storage.getClusterID();
} else {
// If the version file exist, read the cluster id and check the
// id with helper node
// to make sure they are identical
// id with helper node to make sure they are identical
clusterId = storage.getClusterID();
try {
URL idURL = new URL("http://" + helperNode.first + ":" + Config.http_port + "/check");
Expand All @@ -593,7 +593,8 @@ private void getClusterIdAndRole() throws IOException {
System.exit(-1);
}
} catch (Exception e) {
LOG.warn(e);
LOG.warn("fail to check cluster_id from helper node.", e);
System.exit(-1);
}
}

Expand Down Expand Up @@ -884,7 +885,7 @@ public void loadImage(String imageDir) throws IOException, DdlException {
LOG.info("image does not exist: {}", curFile.getAbsolutePath());
return;
}
replayedJournalId = storage.getImageSeq();
replayedJournalId.set(storage.getImageSeq());
LOG.info("start load image from {}. is ckpt: {}", curFile.getAbsolutePath(), Catalog.isCheckpointThread());
long loadImageStartTime = System.currentTimeMillis();
DataInputStream dis = new DataInputStream(new BufferedInputStream(new FileInputStream(curFile)));
Expand Down Expand Up @@ -1253,9 +1254,9 @@ public long loadRecycleBin(DataInputStream dis, long checksum) throws IOExceptio
public void saveImage() throws IOException {
// Write image.ckpt
Storage storage = new Storage(IMAGE_DIR);
File curFile = storage.getImageFile(replayedJournalId);
File curFile = storage.getImageFile(replayedJournalId.get());
File ckpt = new File(IMAGE_DIR, Storage.IMAGE_NEW);
saveImage(ckpt, replayedJournalId);
saveImage(ckpt, replayedJournalId.get());

// Move image.ckpt to image.dataVersion
LOG.info("Move " + ckpt.getAbsolutePath() + " to " + curFile.getAbsolutePath());
Expand Down Expand Up @@ -1761,14 +1762,14 @@ public synchronized boolean replayJournal(long toJournalId) {
if (toJournalId == -1) {
toJournalId = getMaxJournalId();
}
if (toJournalId <= replayedJournalId) {
if (toJournalId <= replayedJournalId.get()) {
return false;
}

LOG.info("replayed journal id is {}, replay to journal id is {}", replayedJournalId, toJournalId);
JournalCursor cursor = editLog.read(replayedJournalId + 1, toJournalId);
JournalCursor cursor = editLog.read(replayedJournalId.get() + 1, toJournalId);
if (cursor == null) {
LOG.warn("failed to get cursor from {} to {}", replayedJournalId + 1, toJournalId);
LOG.warn("failed to get cursor from {} to {}", replayedJournalId.get() + 1, toJournalId);
return false;
}

Expand All @@ -1781,10 +1782,10 @@ public synchronized boolean replayJournal(long toJournalId) {
}
hasLog = true;
EditLog.loadJournal(this, entity);
replayedJournalId++;
replayedJournalId.incrementAndGet();
LOG.debug("journal {} replayed.", replayedJournalId);
if (!isMaster) {
journalObservable.notifyObservers(replayedJournalId);
journalObservable.notifyObservers(replayedJournalId.get());
}
}
long cost = System.currentTimeMillis() - startTime;
Expand Down Expand Up @@ -3699,13 +3700,13 @@ public List<String> getClusterDbNames(String clusterName) throws AnalysisExcepti
final Cluster cluster = nameToCluster.get(clusterName);
if (cluster == null) {
throw new AnalysisException("No cluster selected");
}
}
List<String> dbNames = Lists.newArrayList(cluster.getDbNames());
return dbNames;
} finally {
readUnlock();
}
}
}
}

public List<Long> getDbIds() {
readLock();
Expand Down Expand Up @@ -3866,7 +3867,7 @@ public Clone getCloneInstance() {
}

public long getReplayedJournalId() {
return this.replayedJournalId;
return this.replayedJournalId.get();
}

public HAProtocol getHaProtocol() {
Expand Down Expand Up @@ -4268,7 +4269,7 @@ public void replayRenameColumn(TableInfo tableInfo) throws DdlException {
}

/*
* used for handling AlterClusterStmt
* used for handling AlterClusterStmt
* (for client is the ALTER CLUSTER command).
*/
public void alterCluster(AlterSystemStmt stmt) throws DdlException, InternalException {
Expand Down Expand Up @@ -4603,7 +4604,7 @@ public void processModifyCluster(AlterClusterStmt stmt) throws DdlException {
.append(backend.getHeartbeatPort()).toString());
}

// here we reuse the process of decommission backends. but set backend's decommission type to
// here we reuse the process of decommission backends. but set backend's decommission type to
// ClusterDecommission, which means this backend will not be removed from the system
// after decommission is done.
final DecommissionBackendClause clause = new DecommissionBackendClause(hostPortList);
Expand Down Expand Up @@ -4717,7 +4718,7 @@ public void migrateDb(MigrateDbStmt stmt) throws DdlException {

/**
* return max replicationNum of a db
*
*
* @param db
* @return
*/
Expand Down Expand Up @@ -4940,10 +4941,10 @@ public long loadCluster(DataInputStream dis, long checksum) throws IOException,
final Cluster cluster = new Cluster();
cluster.readFields(dis);
checksum ^= cluster.getId();
// BE is in default_cluster when added , therefore it is possible that the BE
// in default_cluster are not the latest because cluster cant't be updated when
// loadCluster is after loadBackend. Because of forgeting to remove BE's id in

// BE is in default_cluster when added , therefore it is possible that the BE
// in default_cluster are not the latest because cluster cant't be updated when
// loadCluster is after loadBackend. Because of forgeting to remove BE's id in
// cluster when drop BE or decommission in latest versions, need to update cluster's
// BE.
List<Long> latestBackendIds = systemInfo.getClusterBackendIds(cluster.getName());
Expand Down Expand Up @@ -5075,11 +5076,11 @@ public void replayUpdateClusterAndBackends(BackendIdsUpdateInfo info) {
final Cluster cluster = nameToCluster.get(backend.getOwnerClusterName());
cluster.removeBackend(id);
} finally {
writeUnlock();
writeUnlock();
}
backend.setDecommissioned(false);
backend.clearClusterName();
backend.setBackendState(BackendState.free);
backend.setBackendState(BackendState.free);
}
}

Expand Down