From f4c5b0e2749777cad3eacf992dc9e3e84f09d8bd Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Thu, 19 Jan 2017 13:50:32 -0600 Subject: [PATCH 1/4] ZOOKEEPER-2678: Discovery and Sync can take a very long time on large DBs --- .../zookeeper/server/ZooKeeperServer.java | 9 ++-- .../zookeeper/server/quorum/Learner.java | 41 +++++++++++++------ .../zookeeper/server/quorum/Zab1_0Test.java | 13 ++++++ 3 files changed, 47 insertions(+), 16 deletions(-) diff --git a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java index 079d80930b4..88e69324ef6 100644 --- a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java @@ -544,9 +544,12 @@ public synchronized void shutdown() { if (firstProcessor != null) { firstProcessor.shutdown(); } - if (zkDb != null) { - zkDb.clear(); - } + + // There is no need to clear the database + // * When a new quorum is established we can still apply the diff + // on top of the same zkDb data + // * If we fetch a new snapshot from leader, the zkDb will be + // cleared anyway before loading the snapshot unregisterJMX(); } diff --git a/src/java/main/org/apache/zookeeper/server/quorum/Learner.java b/src/java/main/org/apache/zookeeper/server/quorum/Learner.java index 9803197c57d..021e56eb4ce 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/Learner.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/Learner.java @@ -358,12 +358,16 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception{ QuorumVerifier newLeaderQV = null; - readPacket(qp); + //In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot + // For SNAP and TRUNC the snapshot is needed to save that history + boolean snapshotNeeded = true; + readPacket(qp); LinkedList packetsCommitted = new LinkedList(); LinkedList packetsNotCommitted = new LinkedList(); synchronized (zk) { if (qp.getType() == Leader.DIFF) { - LOG.info("Getting a diff from the leader 0x" + Long.toHexString(qp.getZxid())); + LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid())); + snapshotNeeded = false; } else if (qp.getType() == Leader.SNAP) { LOG.info("Getting a snapshot from leader"); @@ -400,10 +404,13 @@ else if (qp.getType() == Leader.SNAP) { long lastQueued = 0; - // in V1.0 we take a snapshot when we get the NEWLEADER message, but in pre V1.0 - // we take the snapshot at the UPDATE, since V1.0 also gets the UPDATE (after the NEWLEADER) + // in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0 + // we take the snapshot on the UPDATE message, since Zab V1.0 also gets the UPDATE (after the NEWLEADER) // we need to make sure that we don't take the snapshot twice. - boolean snapshotTaken = false; + boolean isPreZAB1_0 = true; + //If we are not going to take the snapshot be sure the transactions are not applied in memory + // but written out to the transaction log + boolean writeToTxnLog = !snapshotNeeded; // we are now going to start getting transactions to apply followed by an UPTODATE outerLoop: while (self.isRunning()) { @@ -440,7 +447,7 @@ else if (qp.getType() == Leader.SNAP) { throw new Exception("changes proposed in reconfig"); } } - if (!snapshotTaken) { + if (!writeToTxnLog) { if (pif.hdr.getZxid() != qp.getZxid()) { LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid()); } else { @@ -479,8 +486,7 @@ else if (qp.getType() == Leader.SNAP) { } lastQueued = packet.hdr.getZxid(); } - - if (!snapshotTaken) { + if (!writeToTxnLog) { // Apply to db directly if we haven't taken the snapshot zk.processTxn(packet.hdr, packet.rec); } else { @@ -498,14 +504,19 @@ else if (qp.getType() == Leader.SNAP) { throw new Exception("changes proposed in reconfig"); } } - if (!snapshotTaken) { // true for the pre v1.0 case - zk.takeSnapshot(); + if (isPreZAB1_0) { + zk.takeSnapshot(); self.setCurrentEpoch(newEpoch); } self.setZooKeeperServer(zk); self.adminServer.setZooKeeperServer(zk); break outerLoop; - case Leader.NEWLEADER: // it will be NEWLEADER in v1.0 + case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery + // means this is Zab 1.0 + // Create updatingEpoch file and remove it after current + // epoch is set. QuorumPeer.loadDataBase() uses this file to + // detect the case where the server was terminated after + // taking a snapshot but before setting the current epoch. LOG.info("Learner received NEWLEADER message"); if (qp.getData()!=null && qp.getData().length > 1) { try { @@ -516,10 +527,14 @@ else if (qp.getType() == Leader.SNAP) { e.printStackTrace(); } } + + if (snapshotNeeded) { + zk.takeSnapshot(); + } - zk.takeSnapshot(); self.setCurrentEpoch(newEpoch); - snapshotTaken = true; + writeToTxnLog = true; //Anything after this needs to go to the transaction log, not applied directly in memory + isPreZAB1_0 = false; writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true); break; } diff --git a/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java b/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java index 8c92e8f4993..ff9fc4b9bea 100644 --- a/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java +++ b/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java @@ -847,12 +847,25 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, Assert.assertEquals(1, f.self.getAcceptedEpoch()); Assert.assertEquals(1, f.self.getCurrentEpoch()); + //Wait for the transactions to be written out. The thread that writes them out + // does not send anything back when it is done. + long start = System.currentTimeMillis(); + while (createSessionZxid != f.fzk.getLastProcessedZxid() && (System.currentTimeMillis() - start) < 50) { + Thread.sleep(1); + } + Assert.assertEquals(createSessionZxid, f.fzk.getLastProcessedZxid()); // Make sure the data was recorded in the filesystem ok ZKDatabase zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir)); + start = System.currentTimeMillis(); zkDb2.loadDataBase(); + while (zkDb2.getSessionWithTimeOuts().isEmpty() && (System.currentTimeMillis() - start) < 50) { + Thread.sleep(1); + zkDb2.loadDataBase(); + } LOG.info("zkdb2 sessions:" + zkDb2.getSessions()); + LOG.info("zkdb2 with timeouts:" + zkDb2.getSessionWithTimeOuts()); Assert.assertNotNull(zkDb2.getSessionWithTimeOuts().get(4L)); } finally { TestUtils.deleteFileRecursively(tmpDir); From 742367e6e88a97782744514365f7c77746e511c0 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Tue, 31 Jan 2017 14:47:09 -0600 Subject: [PATCH 2/4] Addressed review comments --- .../apache/zookeeper/server/ZooKeeperServer.java | 15 +++++++++++++-- .../zookeeper/server/ZooKeeperServerMain.java | 2 +- .../apache/zookeeper/server/quorum/Learner.java | 2 +- 3 files changed, 15 insertions(+), 4 deletions(-) diff --git a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java index 88e69324ef6..92d3c1366db 100644 --- a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java @@ -526,7 +526,15 @@ public boolean isRunning() { return state == State.RUNNING; } - public synchronized void shutdown() { + public void shutdown() { + shutdown(false); + } + + /** + * Shut down the server instance + * @param fullyShutDown true if another server using the same database will not replace this one in the same process + */ + public synchronized void shutdown(boolean fullyShutDown) { if (!canShutdown()) { LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!"); return; @@ -545,7 +553,10 @@ public synchronized void shutdown() { firstProcessor.shutdown(); } - // There is no need to clear the database + if (fullyShutDown && zkDb != null) { + zkDb.clear(); + } + // else there is no need to clear the database // * When a new quorum is established we can still apply the diff // on top of the same zkDb data // * If we fetch a new snapshot from leader, the zkDb will be diff --git a/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java b/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java index 5bfeed3e647..372c78a0a06 100644 --- a/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java +++ b/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java @@ -167,7 +167,7 @@ public void runFromConfig(ServerConfig config) secureCnxnFactory.join(); } if (zkServer.canShutdown()) { - zkServer.shutdown(); + zkServer.shutdown(true); } } catch (InterruptedException e) { // warn, but generally this is ok diff --git a/src/java/main/org/apache/zookeeper/server/quorum/Learner.java b/src/java/main/org/apache/zookeeper/server/quorum/Learner.java index 021e56eb4ce..bddccaa7e19 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/Learner.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/Learner.java @@ -358,7 +358,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception{ QuorumVerifier newLeaderQV = null; - //In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot + // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot // For SNAP and TRUNC the snapshot is needed to save that history boolean snapshotNeeded = true; readPacket(qp); From a43264270f364ea1aa3b2bc31e68cfd6105c93ce Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Mon, 6 Feb 2017 09:32:55 -0600 Subject: [PATCH 3/4] ZOOKEEPER-2678: Improved test to verify snapshot times --- .../zookeeper/server/quorum/Zab1_0Test.java | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java b/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java index ff9fc4b9bea..e7dc90bd9d3 100644 --- a/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java +++ b/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java @@ -19,6 +19,9 @@ package org.apache.zookeeper.server.quorum; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import java.io.BufferedReader; import java.io.ByteArrayOutputStream; @@ -652,6 +655,8 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, tmpDir.mkdir(); File logDir = f.fzk.getTxnLogFactory().getDataDir().getParentFile(); File snapDir = f.fzk.getTxnLogFactory().getSnapDir().getParentFile(); + //Spy on ZK so we can check if a snapshot happened or not. + f.zk = spy(f.zk); try { Assert.assertEquals(0, f.self.getAcceptedEpoch()); Assert.assertEquals(0, f.self.getCurrentEpoch()); @@ -694,6 +699,10 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, oa.writeRecord(qp, null); zkDb.serializeSnapshot(oa); oa.writeString("BenWasHere", null); + Thread.sleep(10); //Give it some time to process the snap + //No Snapshot taken yet, the SNAP was applied in memory + verify(f.zk, never()).takeSnapshot(); + qp.setType(Leader.NEWLEADER); qp.setZxid(ZxidUtils.makeZxid(1, 0)); oa.writeRecord(qp, null); @@ -704,7 +713,8 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); Assert.assertEquals(1, f.self.getAcceptedEpoch()); Assert.assertEquals(1, f.self.getCurrentEpoch()); - + //Make sure that we did take the snapshot now + verify(f.zk).takeSnapshot(); Assert.assertEquals(firstZxid, f.fzk.getLastProcessedZxid()); // Make sure the data was recorded in the filesystem ok @@ -780,6 +790,8 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, tmpDir.mkdir(); File logDir = f.fzk.getTxnLogFactory().getDataDir().getParentFile(); File snapDir = f.fzk.getTxnLogFactory().getSnapDir().getParentFile(); + //Spy on ZK so we can check if a snapshot happened or not. + f.zk = spy(f.zk); try { Assert.assertEquals(0, f.self.getAcceptedEpoch()); Assert.assertEquals(0, f.self.getCurrentEpoch()); @@ -867,6 +879,8 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, LOG.info("zkdb2 sessions:" + zkDb2.getSessions()); LOG.info("zkdb2 with timeouts:" + zkDb2.getSessionWithTimeOuts()); Assert.assertNotNull(zkDb2.getSessionWithTimeOuts().get(4L)); + //Snapshot was never taken during very simple sync + verify(f.zk, never()).takeSnapshot(); } finally { TestUtils.deleteFileRecursively(tmpDir); } From 69fbe196a2f00703d1ab03d664f92d72c58edff9 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Fri, 10 Feb 2017 09:06:34 -0600 Subject: [PATCH 4/4] ZOOKEEPER-2678: Addressed review comments --- src/java/main/org/apache/zookeeper/server/quorum/Learner.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/java/main/org/apache/zookeeper/server/quorum/Learner.java b/src/java/main/org/apache/zookeeper/server/quorum/Learner.java index bddccaa7e19..f048da8a4d9 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/Learner.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/Learner.java @@ -513,10 +513,6 @@ else if (qp.getType() == Leader.SNAP) { break outerLoop; case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery // means this is Zab 1.0 - // Create updatingEpoch file and remove it after current - // epoch is set. QuorumPeer.loadDataBase() uses this file to - // detect the case where the server was terminated after - // taking a snapshot but before setting the current epoch. LOG.info("Learner received NEWLEADER message"); if (qp.getData()!=null && qp.getData().length > 1) { try {