From 5aa25620e0189b28d7040305272be2fda28126fb Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Thu, 19 Jan 2017 13:50:32 -0600 Subject: [PATCH 1/5] ZOOKEEPER-2678: Discovery and Sync can take a very long time on large DBs --- .../zookeeper/server/ZooKeeperServer.java | 9 ++++-- .../zookeeper/server/quorum/Learner.java | 31 ++++++++++++------- .../zookeeper/server/quorum/Zab1_0Test.java | 7 +++++ 3 files changed, 33 insertions(+), 14 deletions(-) diff --git a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java index 62ac466bb1a..2aee59f6845 100644 --- a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java @@ -507,9 +507,12 @@ public synchronized void shutdown() { if (firstProcessor != null) { firstProcessor.shutdown(); } - if (zkDb != null) { - zkDb.clear(); - } + + // There is no need to clear the database + // * When a new quorum is established we can still apply the diff + // on top of the same zkDb data + // * If we fetch a new snapshot from leader, the zkDb will be + // cleared anyway before loading the snapshot unregisterJMX(); } diff --git a/src/java/main/org/apache/zookeeper/server/quorum/Learner.java b/src/java/main/org/apache/zookeeper/server/quorum/Learner.java index 749b2741728..7d33cfb60c1 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/Learner.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/Learner.java @@ -321,13 +321,16 @@ protected void syncWithLeader(long newLeaderZxid) throws IOException, Interrupte QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null); QuorumPacket qp = new QuorumPacket(); long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid); - - readPacket(qp); + //In the DIFF case we don't need to do a snapshot because the edits will sync on top of any existing snapshot + // For SNAP and TRUNC the snapshot is needed to save that history + boolean snapshotNeeded = true; + readPacket(qp); LinkedList packetsCommitted = new LinkedList(); LinkedList packetsNotCommitted = new LinkedList(); synchronized (zk) { if (qp.getType() == Leader.DIFF) { - LOG.info("Getting a diff from the leader 0x" + Long.toHexString(qp.getZxid())); + LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid())); + snapshotNeeded = false; } else if (qp.getType() == Leader.SNAP) { LOG.info("Getting a snapshot from leader"); @@ -364,10 +367,12 @@ else if (qp.getType() == Leader.SNAP) { long lastQueued = 0; - // in V1.0 we take a snapshot when we get the NEWLEADER message, but in pre V1.0 + // in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0 // we take the snapshot at the UPDATE, since V1.0 also gets the UPDATE (after the NEWLEADER) // we need to make sure that we don't take the snapshot twice. - boolean snapshotTaken = false; + boolean isPreZAB1_0 = true; + //If we are not going to take the snapshot be sure the edits are not applied in memory + boolean writeToEditLog = !snapshotNeeded; // we are now going to start getting transactions to apply followed by an UPTODATE outerLoop: while (self.isRunning()) { @@ -387,7 +392,7 @@ else if (qp.getType() == Leader.SNAP) { packetsNotCommitted.add(pif); break; case Leader.COMMIT: - if (!snapshotTaken) { + if (!writeToEditLog) { pif = packetsNotCommitted.peekFirst(); if (pif.hdr.getZxid() != qp.getZxid()) { LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid()); @@ -415,7 +420,7 @@ else if (qp.getType() == Leader.SNAP) { + Long.toHexString(lastQueued + 1)); } lastQueued = packet.hdr.getZxid(); - if (!snapshotTaken) { + if (!writeToEditLog) { // Apply to db directly if we haven't taken the snapshot zk.processTxn(packet.hdr, packet.rec); } else { @@ -424,13 +429,14 @@ else if (qp.getType() == Leader.SNAP) { } break; case Leader.UPTODATE: - if (!snapshotTaken) { // true for the pre v1.0 case + if (isPreZAB1_0) { zk.takeSnapshot(); self.setCurrentEpoch(newEpoch); } self.cnxnFactory.setZooKeeperServer(zk); break outerLoop; - case Leader.NEWLEADER: // it will be NEWLEADER in v1.0 + case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery + // means this is Zab 1.0 // Create updatingEpoch file and remove it after current // epoch is set. QuorumPeer.loadDataBase() uses this file to // detect the case where the server was terminated after @@ -441,13 +447,16 @@ else if (qp.getType() == Leader.SNAP) { throw new IOException("Failed to create " + updating.toString()); } - zk.takeSnapshot(); + if (snapshotNeeded) { + zk.takeSnapshot(); + } self.setCurrentEpoch(newEpoch); if (!updating.delete()) { throw new IOException("Failed to delete " + updating.toString()); } - snapshotTaken = true; + writeToEditLog = true; //Anything after this needs to go to the edit log, not applied directly in memory + isPreZAB1_0 = false; writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true); break; } diff --git a/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java b/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java index 52e7d279c7e..b5cfdf3a143 100644 --- a/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java +++ b/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java @@ -839,6 +839,13 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, Assert.assertEquals(1, f.self.getAcceptedEpoch()); Assert.assertEquals(1, f.self.getCurrentEpoch()); + //Wait for the edits to be written out + long start = System.currentTimeMillis(); + while (createSessionZxid != f.fzk.getLastProcessedZxid() && (System.currentTimeMillis() - start) < 50) { + Thread.sleep(1); + } + LOG.info("Took < {}ms to sync all edits", System.currentTimeMillis() - start); + Assert.assertEquals(createSessionZxid, f.fzk.getLastProcessedZxid()); // Make sure the data was recorded in the filesystem ok From f7052931a4cfb31523599bd04ed31a846c1d360a Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Mon, 30 Jan 2017 09:41:35 -0600 Subject: [PATCH 2/5] ZOOKEEPER-2678: Addressed review comments --- .../apache/zookeeper/server/quorum/Learner.java | 15 ++++++++------- .../zookeeper/server/quorum/Zab1_0Test.java | 4 ++-- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/src/java/main/org/apache/zookeeper/server/quorum/Learner.java b/src/java/main/org/apache/zookeeper/server/quorum/Learner.java index 7d33cfb60c1..ac37948de79 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/Learner.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/Learner.java @@ -321,7 +321,7 @@ protected void syncWithLeader(long newLeaderZxid) throws IOException, Interrupte QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null); QuorumPacket qp = new QuorumPacket(); long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid); - //In the DIFF case we don't need to do a snapshot because the edits will sync on top of any existing snapshot + //In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot // For SNAP and TRUNC the snapshot is needed to save that history boolean snapshotNeeded = true; readPacket(qp); @@ -368,11 +368,12 @@ else if (qp.getType() == Leader.SNAP) { long lastQueued = 0; // in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0 - // we take the snapshot at the UPDATE, since V1.0 also gets the UPDATE (after the NEWLEADER) + // we take the snapshot on the UPDATE message, since Zab V1.0 also gets the UPDATE (after the NEWLEADER) // we need to make sure that we don't take the snapshot twice. boolean isPreZAB1_0 = true; - //If we are not going to take the snapshot be sure the edits are not applied in memory - boolean writeToEditLog = !snapshotNeeded; + //If we are not going to take the snapshot be sure the transactions are not applied in memory + // but written out to the transaction log + boolean writeToTxnLog = !snapshotNeeded; // we are now going to start getting transactions to apply followed by an UPTODATE outerLoop: while (self.isRunning()) { @@ -392,7 +393,7 @@ else if (qp.getType() == Leader.SNAP) { packetsNotCommitted.add(pif); break; case Leader.COMMIT: - if (!writeToEditLog) { + if (!writeToTxnLog) { pif = packetsNotCommitted.peekFirst(); if (pif.hdr.getZxid() != qp.getZxid()) { LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid()); @@ -420,7 +421,7 @@ else if (qp.getType() == Leader.SNAP) { + Long.toHexString(lastQueued + 1)); } lastQueued = packet.hdr.getZxid(); - if (!writeToEditLog) { + if (!writeToTxnLog) { // Apply to db directly if we haven't taken the snapshot zk.processTxn(packet.hdr, packet.rec); } else { @@ -455,7 +456,7 @@ else if (qp.getType() == Leader.SNAP) { throw new IOException("Failed to delete " + updating.toString()); } - writeToEditLog = true; //Anything after this needs to go to the edit log, not applied directly in memory + writeToTxnLog = true; //Anything after this needs to go to the transaction log, not applied directly in memory isPreZAB1_0 = false; writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true); break; diff --git a/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java b/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java index b5cfdf3a143..cc0c3064cfa 100644 --- a/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java +++ b/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java @@ -839,12 +839,12 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, Assert.assertEquals(1, f.self.getAcceptedEpoch()); Assert.assertEquals(1, f.self.getCurrentEpoch()); - //Wait for the edits to be written out + //Wait for the transactions to be written out. The thread that writes them out + // does not send anything back when it is done. long start = System.currentTimeMillis(); while (createSessionZxid != f.fzk.getLastProcessedZxid() && (System.currentTimeMillis() - start) < 50) { Thread.sleep(1); } - LOG.info("Took < {}ms to sync all edits", System.currentTimeMillis() - start); Assert.assertEquals(createSessionZxid, f.fzk.getLastProcessedZxid()); From f57c3842a284b314da172d5968c66923b74619b6 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Mon, 30 Jan 2017 09:58:46 -0600 Subject: [PATCH 3/5] ZOOKEEPER-2678: Fixed another race --- .../test/org/apache/zookeeper/server/quorum/Zab1_0Test.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java b/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java index cc0c3064cfa..a7695b0a59a 100644 --- a/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java +++ b/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java @@ -850,8 +850,14 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, // Make sure the data was recorded in the filesystem ok ZKDatabase zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir)); + start = System.currentTimeMillis(); zkDb2.loadDataBase(); + while (zkDb2.getSessionWithTimeOuts().isEmpty() && (System.currentTimeMillis() - start) < 50) { + Thread.sleep(1); + zkDb2.loadDataBase(); + } LOG.info("zkdb2 sessions:" + zkDb2.getSessions()); + LOG.info("zkdb2 with timeouts:" + zkDb2.getSessionWithTimeOuts()); Assert.assertNotNull(zkDb2.getSessionWithTimeOuts().get(4L)); } finally { recursiveDelete(tmpDir); From dcbf325912a3f307a98904c107db10c36474ae61 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Tue, 31 Jan 2017 14:47:09 -0600 Subject: [PATCH 4/5] Addressed review comments --- .../apache/zookeeper/server/ZooKeeperServer.java | 15 +++++++++++++-- .../zookeeper/server/ZooKeeperServerMain.java | 2 +- .../apache/zookeeper/server/quorum/Learner.java | 2 +- 3 files changed, 15 insertions(+), 4 deletions(-) diff --git a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java index 2aee59f6845..b0a834102aa 100644 --- a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java @@ -489,7 +489,15 @@ public boolean isRunning() { return state == State.RUNNING; } - public synchronized void shutdown() { + public void shutdown() { + shutdown(false); + } + + /** + * Shut down the server instance + * @param fullyShutDown true if another server using the same database will not replace this one in the same process + */ + public synchronized void shutdown(boolean fullyShutDown) { if (!canShutdown()) { LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!"); return; @@ -508,7 +516,10 @@ public synchronized void shutdown() { firstProcessor.shutdown(); } - // There is no need to clear the database + if (fullyShutDown && zkDb != null) { + zkDb.clear(); + } + // else there is no need to clear the database // * When a new quorum is established we can still apply the diff // on top of the same zkDb data // * If we fetch a new snapshot from leader, the zkDb will be diff --git a/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java b/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java index 612d2276b84..1b0f59fe383 100644 --- a/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java +++ b/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java @@ -124,7 +124,7 @@ public void runFromConfig(ServerConfig config) throws IOException { cnxnFactory.join(); if (zkServer.canShutdown()) { - zkServer.shutdown(); + zkServer.shutdown(true); } } catch (InterruptedException e) { // warn, but generally this is ok diff --git a/src/java/main/org/apache/zookeeper/server/quorum/Learner.java b/src/java/main/org/apache/zookeeper/server/quorum/Learner.java index ac37948de79..c54f6e6b797 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/Learner.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/Learner.java @@ -321,7 +321,7 @@ protected void syncWithLeader(long newLeaderZxid) throws IOException, Interrupte QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null); QuorumPacket qp = new QuorumPacket(); long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid); - //In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot + // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot // For SNAP and TRUNC the snapshot is needed to save that history boolean snapshotNeeded = true; readPacket(qp); From d079617bc84d5ce53e5f437588483ecc3f3b3fe1 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Mon, 6 Feb 2017 09:32:55 -0600 Subject: [PATCH 5/5] ZOOKEEPER-2678: Improved test to verify snapshot times --- .../zookeeper/server/quorum/Zab1_0Test.java | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java b/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java index a7695b0a59a..3ed6097b31f 100644 --- a/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java +++ b/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java @@ -19,6 +19,9 @@ package org.apache.zookeeper.server.quorum; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import java.io.BufferedInputStream; import java.io.BufferedReader; @@ -645,6 +648,8 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, tmpDir.mkdir(); File logDir = f.fzk.getTxnLogFactory().getDataDir().getParentFile(); File snapDir = f.fzk.getTxnLogFactory().getSnapDir().getParentFile(); + //Spy on ZK so we can check if a snapshot happened or not. + f.zk = spy(f.zk); try { Assert.assertEquals(0, f.self.getAcceptedEpoch()); Assert.assertEquals(0, f.self.getCurrentEpoch()); @@ -687,6 +692,10 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, oa.writeRecord(qp, null); zkDb.serializeSnapshot(oa); oa.writeString("BenWasHere", null); + Thread.sleep(10); //Give it some time to process the snap + //No Snapshot taken yet, the SNAP was applied in memory + verify(f.zk, never()).takeSnapshot(); + qp.setType(Leader.NEWLEADER); qp.setZxid(ZxidUtils.makeZxid(1, 0)); oa.writeRecord(qp, null); @@ -697,7 +706,8 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); Assert.assertEquals(1, f.self.getAcceptedEpoch()); Assert.assertEquals(1, f.self.getCurrentEpoch()); - + //Make sure that we did take the snapshot now + verify(f.zk).takeSnapshot(); Assert.assertEquals(firstZxid, f.fzk.getLastProcessedZxid()); // Make sure the data was recorded in the filesystem ok @@ -773,6 +783,8 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, tmpDir.mkdir(); File logDir = f.fzk.getTxnLogFactory().getDataDir().getParentFile(); File snapDir = f.fzk.getTxnLogFactory().getSnapDir().getParentFile(); + //Spy on ZK so we can check if a snapshot happened or not. + f.zk = spy(f.zk); try { Assert.assertEquals(0, f.self.getAcceptedEpoch()); Assert.assertEquals(0, f.self.getCurrentEpoch()); @@ -859,6 +871,8 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, LOG.info("zkdb2 sessions:" + zkDb2.getSessions()); LOG.info("zkdb2 with timeouts:" + zkDb2.getSessionWithTimeOuts()); Assert.assertNotNull(zkDb2.getSessionWithTimeOuts().get(4L)); + //Snapshot was never taken during very simple sync + verify(f.zk, never()).takeSnapshot(); } finally { recursiveDelete(tmpDir); }