diff --git a/fe/src/com/baidu/palo/catalog/Catalog.java b/fe/src/com/baidu/palo/catalog/Catalog.java index 7731693bf8f44b..f7d8dfb9608d1f 100644 --- a/fe/src/com/baidu/palo/catalog/Catalog.java +++ b/fe/src/com/baidu/palo/catalog/Catalog.java @@ -255,8 +255,8 @@ public class Catalog { private String metaDir; private EditLog editLog; private int clusterId; - private long replayedJournalId; // For checkpoint and observer memory - // replayed marker + // For checkpoint and observer memory replayed marker + private AtomicLong replayedJournalId; private static Catalog CHECKPOINT = null; private static long checkpointThreadId = -1; @@ -329,7 +329,7 @@ private Catalog() { this.canWrite = false; this.canRead = false; - this.replayedJournalId = 0; + this.replayedJournalId = new AtomicLong(0L); this.isMaster = false; this.isElectable = false; this.synchronizedTimeMs = 0; @@ -464,7 +464,7 @@ public void initialize(String[] args) throws Exception { } } - // 3. get cluster id and role (Observer or Replica) + // 3. get cluster id and role (Observer or Follower) getClusterIdAndRole(); // 4. Load image first and replay edits @@ -472,6 +472,7 @@ public void initialize(String[] args) throws Exception { loadImage(IMAGE_DIR); // load image file editLog.open(); // open bdb env or local output stream this.userPropertyMgr.setEditLog(editLog); + // 5. start load label cleaner thread createCleaner(); cleaner.setName("labelCleaner"); @@ -532,10 +533,9 @@ private void getClusterIdAndRole() throws IOException { frontends.add(self); } } else { - storage = new Storage(IMAGE_DIR); clusterId = storage.getClusterID(); } - } else { + } else { // Designate one helper node. Get the roll and version info // from the helper node Storage storage = null; @@ -566,19 +566,19 @@ private void getClusterIdAndRole() throws IOException { storage.writeFrontendRole(role); } if (!versionFile.exists()) { - // If the version file doesn't exist, download it from helper - // node + // If the version file doesn't exist, download it from helper node if (!getVersionFile()) { LOG.error("fail to download version file from " + helperNode.first + " will exit."); System.exit(-1); } + // NOTE: cluster_id will be init when Storage object is constructed, + // so we new one. storage = new Storage(IMAGE_DIR); clusterId = storage.getClusterID(); } else { // If the version file exist, read the cluster id and check the - // id with helper node - // to make sure they are identical + // id with helper node to make sure they are identical clusterId = storage.getClusterID(); try { URL idURL = new URL("http://" + helperNode.first + ":" + Config.http_port + "/check"); @@ -593,7 +593,8 @@ private void getClusterIdAndRole() throws IOException { System.exit(-1); } } catch (Exception e) { - LOG.warn(e); + LOG.warn("fail to check cluster_id from helper node.", e); + System.exit(-1); } } @@ -884,7 +885,7 @@ public void loadImage(String imageDir) throws IOException, DdlException { LOG.info("image does not exist: {}", curFile.getAbsolutePath()); return; } - replayedJournalId = storage.getImageSeq(); + replayedJournalId.set(storage.getImageSeq()); LOG.info("start load image from {}. is ckpt: {}", curFile.getAbsolutePath(), Catalog.isCheckpointThread()); long loadImageStartTime = System.currentTimeMillis(); DataInputStream dis = new DataInputStream(new BufferedInputStream(new FileInputStream(curFile))); @@ -1253,9 +1254,9 @@ public long loadRecycleBin(DataInputStream dis, long checksum) throws IOExceptio public void saveImage() throws IOException { // Write image.ckpt Storage storage = new Storage(IMAGE_DIR); - File curFile = storage.getImageFile(replayedJournalId); + File curFile = storage.getImageFile(replayedJournalId.get()); File ckpt = new File(IMAGE_DIR, Storage.IMAGE_NEW); - saveImage(ckpt, replayedJournalId); + saveImage(ckpt, replayedJournalId.get()); // Move image.ckpt to image.dataVersion LOG.info("Move " + ckpt.getAbsolutePath() + " to " + curFile.getAbsolutePath()); @@ -1761,14 +1762,14 @@ public synchronized boolean replayJournal(long toJournalId) { if (toJournalId == -1) { toJournalId = getMaxJournalId(); } - if (toJournalId <= replayedJournalId) { + if (toJournalId <= replayedJournalId.get()) { return false; } LOG.info("replayed journal id is {}, replay to journal id is {}", replayedJournalId, toJournalId); - JournalCursor cursor = editLog.read(replayedJournalId + 1, toJournalId); + JournalCursor cursor = editLog.read(replayedJournalId.get() + 1, toJournalId); if (cursor == null) { - LOG.warn("failed to get cursor from {} to {}", replayedJournalId + 1, toJournalId); + LOG.warn("failed to get cursor from {} to {}", replayedJournalId.get() + 1, toJournalId); return false; } @@ -1781,10 +1782,10 @@ public synchronized boolean replayJournal(long toJournalId) { } hasLog = true; EditLog.loadJournal(this, entity); - replayedJournalId++; + replayedJournalId.incrementAndGet(); LOG.debug("journal {} replayed.", replayedJournalId); if (!isMaster) { - journalObservable.notifyObservers(replayedJournalId); + journalObservable.notifyObservers(replayedJournalId.get()); } } long cost = System.currentTimeMillis() - startTime; @@ -3699,13 +3700,13 @@ public List getClusterDbNames(String clusterName) throws AnalysisExcepti final Cluster cluster = nameToCluster.get(clusterName); if (cluster == null) { throw new AnalysisException("No cluster selected"); - } + } List dbNames = Lists.newArrayList(cluster.getDbNames()); return dbNames; } finally { readUnlock(); - } - } + } + } public List getDbIds() { readLock(); @@ -3866,7 +3867,7 @@ public Clone getCloneInstance() { } public long getReplayedJournalId() { - return this.replayedJournalId; + return this.replayedJournalId.get(); } public HAProtocol getHaProtocol() { @@ -4268,7 +4269,7 @@ public void replayRenameColumn(TableInfo tableInfo) throws DdlException { } /* - * used for handling AlterClusterStmt + * used for handling AlterClusterStmt * (for client is the ALTER CLUSTER command). */ public void alterCluster(AlterSystemStmt stmt) throws DdlException, InternalException { @@ -4603,7 +4604,7 @@ public void processModifyCluster(AlterClusterStmt stmt) throws DdlException { .append(backend.getHeartbeatPort()).toString()); } - // here we reuse the process of decommission backends. but set backend's decommission type to + // here we reuse the process of decommission backends. but set backend's decommission type to // ClusterDecommission, which means this backend will not be removed from the system // after decommission is done. final DecommissionBackendClause clause = new DecommissionBackendClause(hostPortList); @@ -4717,7 +4718,7 @@ public void migrateDb(MigrateDbStmt stmt) throws DdlException { /** * return max replicationNum of a db - * + * * @param db * @return */ @@ -4940,10 +4941,10 @@ public long loadCluster(DataInputStream dis, long checksum) throws IOException, final Cluster cluster = new Cluster(); cluster.readFields(dis); checksum ^= cluster.getId(); - - // BE is in default_cluster when added , therefore it is possible that the BE - // in default_cluster are not the latest because cluster cant't be updated when - // loadCluster is after loadBackend. Because of forgeting to remove BE's id in + + // BE is in default_cluster when added , therefore it is possible that the BE + // in default_cluster are not the latest because cluster cant't be updated when + // loadCluster is after loadBackend. Because of forgeting to remove BE's id in // cluster when drop BE or decommission in latest versions, need to update cluster's // BE. List latestBackendIds = systemInfo.getClusterBackendIds(cluster.getName()); @@ -5075,11 +5076,11 @@ public void replayUpdateClusterAndBackends(BackendIdsUpdateInfo info) { final Cluster cluster = nameToCluster.get(backend.getOwnerClusterName()); cluster.removeBackend(id); } finally { - writeUnlock(); + writeUnlock(); } backend.setDecommissioned(false); backend.clearClusterName(); - backend.setBackendState(BackendState.free); + backend.setBackendState(BackendState.free); } }