Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,15 @@ public boolean isRunning() {
return state == State.RUNNING;
}

public synchronized void shutdown() {
public void shutdown() {
shutdown(false);
}

/**
* Shut down the server instance
* @param fullyShutDown true if another server using the same database will not replace this one in the same process
*/
public synchronized void shutdown(boolean fullyShutDown) {
if (!canShutdown()) {
LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!");
return;
Expand All @@ -544,9 +552,15 @@ public synchronized void shutdown() {
if (firstProcessor != null) {
firstProcessor.shutdown();
}
if (zkDb != null) {

if (fullyShutDown && zkDb != null) {
zkDb.clear();
}
// else there is no need to clear the database
// * When a new quorum is established we can still apply the diff
// on top of the same zkDb data
// * If we fetch a new snapshot from leader, the zkDb will be
// cleared anyway before loading the snapshot

unregisterJMX();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public void runFromConfig(ServerConfig config)
secureCnxnFactory.join();
}
if (zkServer.canShutdown()) {
zkServer.shutdown();
zkServer.shutdown(true);
}
} catch (InterruptedException e) {
// warn, but generally this is ok
Expand Down
37 changes: 24 additions & 13 deletions src/java/main/org/apache/zookeeper/server/quorum/Learner.java
Original file line number Diff line number Diff line change
Expand Up @@ -358,12 +358,16 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception{

QuorumVerifier newLeaderQV = null;

readPacket(qp);
// In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot
// For SNAP and TRUNC the snapshot is needed to save that history
boolean snapshotNeeded = true;
readPacket(qp);
LinkedList<Long> packetsCommitted = new LinkedList<Long>();
LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>();
synchronized (zk) {
if (qp.getType() == Leader.DIFF) {
LOG.info("Getting a diff from the leader 0x" + Long.toHexString(qp.getZxid()));
LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
snapshotNeeded = false;
}
else if (qp.getType() == Leader.SNAP) {
LOG.info("Getting a snapshot from leader");
Expand Down Expand Up @@ -400,10 +404,13 @@ else if (qp.getType() == Leader.SNAP) {

long lastQueued = 0;

// in V1.0 we take a snapshot when we get the NEWLEADER message, but in pre V1.0
// we take the snapshot at the UPDATE, since V1.0 also gets the UPDATE (after the NEWLEADER)
// in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0
// we take the snapshot on the UPDATE message, since Zab V1.0 also gets the UPDATE (after the NEWLEADER)
// we need to make sure that we don't take the snapshot twice.
boolean snapshotTaken = false;
boolean isPreZAB1_0 = true;
//If we are not going to take the snapshot be sure the transactions are not applied in memory
// but written out to the transaction log
boolean writeToTxnLog = !snapshotNeeded;
// we are now going to start getting transactions to apply followed by an UPTODATE
outerLoop:
while (self.isRunning()) {
Expand Down Expand Up @@ -440,7 +447,7 @@ else if (qp.getType() == Leader.SNAP) {
throw new Exception("changes proposed in reconfig");
}
}
if (!snapshotTaken) {
if (!writeToTxnLog) {
if (pif.hdr.getZxid() != qp.getZxid()) {
LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid());
} else {
Expand Down Expand Up @@ -479,8 +486,7 @@ else if (qp.getType() == Leader.SNAP) {
}
lastQueued = packet.hdr.getZxid();
}

if (!snapshotTaken) {
if (!writeToTxnLog) {
// Apply to db directly if we haven't taken the snapshot
zk.processTxn(packet.hdr, packet.rec);
} else {
Expand All @@ -498,14 +504,15 @@ else if (qp.getType() == Leader.SNAP) {
throw new Exception("changes proposed in reconfig");
}
}
if (!snapshotTaken) { // true for the pre v1.0 case
zk.takeSnapshot();
if (isPreZAB1_0) {
zk.takeSnapshot();
self.setCurrentEpoch(newEpoch);
}
self.setZooKeeperServer(zk);
self.adminServer.setZooKeeperServer(zk);
break outerLoop;
case Leader.NEWLEADER: // it will be NEWLEADER in v1.0
case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery
// means this is Zab 1.0
LOG.info("Learner received NEWLEADER message");
if (qp.getData()!=null && qp.getData().length > 1) {
try {
Expand All @@ -516,10 +523,14 @@ else if (qp.getType() == Leader.SNAP) {
e.printStackTrace();
}
}

if (snapshotNeeded) {
zk.takeSnapshot();
}

zk.takeSnapshot();
self.setCurrentEpoch(newEpoch);
snapshotTaken = true;
writeToTxnLog = true; //Anything after this needs to go to the transaction log, not applied directly in memory
isPreZAB1_0 = false;
writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
break;
}
Expand Down
29 changes: 28 additions & 1 deletion src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
package org.apache.zookeeper.server.quorum;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;

import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
Expand Down Expand Up @@ -652,6 +655,8 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa,
tmpDir.mkdir();
File logDir = f.fzk.getTxnLogFactory().getDataDir().getParentFile();
File snapDir = f.fzk.getTxnLogFactory().getSnapDir().getParentFile();
//Spy on ZK so we can check if a snapshot happened or not.
f.zk = spy(f.zk);
try {
Assert.assertEquals(0, f.self.getAcceptedEpoch());
Assert.assertEquals(0, f.self.getCurrentEpoch());
Expand Down Expand Up @@ -694,6 +699,10 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa,
oa.writeRecord(qp, null);
zkDb.serializeSnapshot(oa);
oa.writeString("BenWasHere", null);
Thread.sleep(10); //Give it some time to process the snap
//No Snapshot taken yet, the SNAP was applied in memory
verify(f.zk, never()).takeSnapshot();

qp.setType(Leader.NEWLEADER);
qp.setZxid(ZxidUtils.makeZxid(1, 0));
oa.writeRecord(qp, null);
Expand All @@ -704,7 +713,8 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa,
Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
Assert.assertEquals(1, f.self.getAcceptedEpoch());
Assert.assertEquals(1, f.self.getCurrentEpoch());

//Make sure that we did take the snapshot now
verify(f.zk).takeSnapshot();
Assert.assertEquals(firstZxid, f.fzk.getLastProcessedZxid());

// Make sure the data was recorded in the filesystem ok
Expand Down Expand Up @@ -780,6 +790,8 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa,
tmpDir.mkdir();
File logDir = f.fzk.getTxnLogFactory().getDataDir().getParentFile();
File snapDir = f.fzk.getTxnLogFactory().getSnapDir().getParentFile();
//Spy on ZK so we can check if a snapshot happened or not.
f.zk = spy(f.zk);
try {
Assert.assertEquals(0, f.self.getAcceptedEpoch());
Assert.assertEquals(0, f.self.getCurrentEpoch());
Expand Down Expand Up @@ -847,13 +859,28 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa,
Assert.assertEquals(1, f.self.getAcceptedEpoch());
Assert.assertEquals(1, f.self.getCurrentEpoch());

//Wait for the transactions to be written out. The thread that writes them out
// does not send anything back when it is done.
long start = System.currentTimeMillis();
while (createSessionZxid != f.fzk.getLastProcessedZxid() && (System.currentTimeMillis() - start) < 50) {
Thread.sleep(1);
}

Assert.assertEquals(createSessionZxid, f.fzk.getLastProcessedZxid());

// Make sure the data was recorded in the filesystem ok
ZKDatabase zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir));
start = System.currentTimeMillis();
zkDb2.loadDataBase();
while (zkDb2.getSessionWithTimeOuts().isEmpty() && (System.currentTimeMillis() - start) < 50) {
Thread.sleep(1);
zkDb2.loadDataBase();
}
LOG.info("zkdb2 sessions:" + zkDb2.getSessions());
LOG.info("zkdb2 with timeouts:" + zkDb2.getSessionWithTimeOuts());
Assert.assertNotNull(zkDb2.getSessionWithTimeOuts().get(4L));
//Snapshot was never taken during very simple sync
verify(f.zk, never()).takeSnapshot();
} finally {
TestUtils.deleteFileRecursively(tmpDir);
}
Expand Down