diff --git a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java index 06848d29943..b84c418ad7a 100644 --- a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java @@ -543,9 +543,12 @@ public synchronized void shutdown() { if (firstProcessor != null) { firstProcessor.shutdown(); } - if (zkDb != null) { - zkDb.clear(); - } + + // There is no need to clear the database + // * When a new quorum is established we can still apply the diff + // on top of the same zkDb data + // * If we fetch a new snapshot from leader, the zkDb will be + // cleared anyway before loading the snapshot unregisterJMX(); } diff --git a/src/java/main/org/apache/zookeeper/server/quorum/Learner.java b/src/java/main/org/apache/zookeeper/server/quorum/Learner.java index 9803197c57d..021e56eb4ce 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/Learner.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/Learner.java @@ -358,12 +358,16 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception{ QuorumVerifier newLeaderQV = null; - readPacket(qp); + //In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot + // For SNAP and TRUNC the snapshot is needed to save that history + boolean snapshotNeeded = true; + readPacket(qp); LinkedList packetsCommitted = new LinkedList(); LinkedList packetsNotCommitted = new LinkedList(); synchronized (zk) { if (qp.getType() == Leader.DIFF) { - LOG.info("Getting a diff from the leader 0x" + Long.toHexString(qp.getZxid())); + LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid())); + snapshotNeeded = false; } else if (qp.getType() == Leader.SNAP) { LOG.info("Getting a snapshot from leader"); @@ -400,10 +404,13 @@ else if (qp.getType() == Leader.SNAP) { long lastQueued = 0; - // in V1.0 we take a snapshot when we get the NEWLEADER message, but in pre V1.0 - // we take the snapshot at the UPDATE, since V1.0 also gets the UPDATE (after the NEWLEADER) + // in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0 + // we take the snapshot on the UPDATE message, since Zab V1.0 also gets the UPDATE (after the NEWLEADER) // we need to make sure that we don't take the snapshot twice. - boolean snapshotTaken = false; + boolean isPreZAB1_0 = true; + //If we are not going to take the snapshot be sure the transactions are not applied in memory + // but written out to the transaction log + boolean writeToTxnLog = !snapshotNeeded; // we are now going to start getting transactions to apply followed by an UPTODATE outerLoop: while (self.isRunning()) { @@ -440,7 +447,7 @@ else if (qp.getType() == Leader.SNAP) { throw new Exception("changes proposed in reconfig"); } } - if (!snapshotTaken) { + if (!writeToTxnLog) { if (pif.hdr.getZxid() != qp.getZxid()) { LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid()); } else { @@ -479,8 +486,7 @@ else if (qp.getType() == Leader.SNAP) { } lastQueued = packet.hdr.getZxid(); } - - if (!snapshotTaken) { + if (!writeToTxnLog) { // Apply to db directly if we haven't taken the snapshot zk.processTxn(packet.hdr, packet.rec); } else { @@ -498,14 +504,19 @@ else if (qp.getType() == Leader.SNAP) { throw new Exception("changes proposed in reconfig"); } } - if (!snapshotTaken) { // true for the pre v1.0 case - zk.takeSnapshot(); + if (isPreZAB1_0) { + zk.takeSnapshot(); self.setCurrentEpoch(newEpoch); } self.setZooKeeperServer(zk); self.adminServer.setZooKeeperServer(zk); break outerLoop; - case Leader.NEWLEADER: // it will be NEWLEADER in v1.0 + case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery + // means this is Zab 1.0 + // Create updatingEpoch file and remove it after current + // epoch is set. QuorumPeer.loadDataBase() uses this file to + // detect the case where the server was terminated after + // taking a snapshot but before setting the current epoch. LOG.info("Learner received NEWLEADER message"); if (qp.getData()!=null && qp.getData().length > 1) { try { @@ -516,10 +527,14 @@ else if (qp.getType() == Leader.SNAP) { e.printStackTrace(); } } + + if (snapshotNeeded) { + zk.takeSnapshot(); + } - zk.takeSnapshot(); self.setCurrentEpoch(newEpoch); - snapshotTaken = true; + writeToTxnLog = true; //Anything after this needs to go to the transaction log, not applied directly in memory + isPreZAB1_0 = false; writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true); break; } diff --git a/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java b/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java index 37f6cc24988..d32d57c23ae 100644 --- a/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java +++ b/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java @@ -846,12 +846,25 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, Assert.assertEquals(1, f.self.getAcceptedEpoch()); Assert.assertEquals(1, f.self.getCurrentEpoch()); + //Wait for the transactions to be written out. The thread that writes them out + // does not send anything back when it is done. + long start = System.currentTimeMillis(); + while (createSessionZxid != f.fzk.getLastProcessedZxid() && (System.currentTimeMillis() - start) < 50) { + Thread.sleep(1); + } + Assert.assertEquals(createSessionZxid, f.fzk.getLastProcessedZxid()); // Make sure the data was recorded in the filesystem ok ZKDatabase zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir)); + start = System.currentTimeMillis(); zkDb2.loadDataBase(); + while (zkDb2.getSessionWithTimeOuts().isEmpty() && (System.currentTimeMillis() - start) < 50) { + Thread.sleep(1); + zkDb2.loadDataBase(); + } LOG.info("zkdb2 sessions:" + zkDb2.getSessions()); + LOG.info("zkdb2 with timeouts:" + zkDb2.getSessionWithTimeOuts()); Assert.assertNotNull(zkDb2.getSessionWithTimeOuts().get(4L)); } finally { TestUtils.deleteFileRecursively(tmpDir);