From 917ae6c96b31185052c45ecac13224027a7b1b9a Mon Sep 17 00:00:00 2001 From: Brian Nixon Date: Tue, 2 Jul 2019 07:34:21 +0200 Subject: [PATCH 1/2] =?UTF-8?q?ZOOKEEPER-3240:=20Close=20socket=20on=20Lea?= =?UTF-8?q?rner=20shutdown=20to=20avoid=20dangling=20so=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …cket Author: Brian Nixon Reviewers: hanm@apache.org, eolivelli@apache.org, andor@apache.org Closes #996 from enixon/learner-close-socket and squashes the following commits: 57f5e891 [Brian Nixon] delete ReconfigFailureCasesTest::testLeaderTimesoutOnNewQuorum as it relies on the fixed buggy behavior 1598b3ad [Brian Nixon] remove extraneous check on socket status 381889da [Brian Nixon] ZOOKEEPER-3240: Close socket on Learner shutdown to avoid dangling socket --- .../zookeeper/server/quorum/Follower.java | 6 +-- .../zookeeper/server/quorum/Learner.java | 11 +++++ .../zookeeper/server/quorum/Observer.java | 6 +-- .../server/quorum/QuorumPeerMainTest.java | 3 ++ .../quorum/ReconfigFailureCasesTest.java | 43 ------------------- 5 files changed, 16 insertions(+), 53 deletions(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java index 65086017aac..b79f5702c6a 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java @@ -94,11 +94,7 @@ void followLeader() throws InterruptedException { } } catch (Exception e) { LOG.warn("Exception when following the leader", e); - try { - sock.close(); - } catch (IOException e1) { - e1.printStackTrace(); - } + closeSocket(); // clear pending revalidations pendingRevalidations.clear(); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index 2da5add4e12..9c8a019a060 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -658,6 +658,7 @@ public void shutdown() { self.setZooKeeperServer(null); self.closeAllConnections(); self.adminServer.setZooKeeperServer(null); + closeSocket(); // shutdown previous zookeeper if (zk != null) { zk.shutdown(); @@ -667,4 +668,14 @@ public void shutdown() { boolean isRunning() { return self.isRunning() && zk.isRunning(); } + + void closeSocket() { + try { + if (sock != null) { + sock.close(); + } + } catch (IOException e) { + LOG.warn("Ignoring error closing connection to leader", e); + } + } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java index f0f724e5e6f..050582d623f 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java @@ -79,11 +79,7 @@ void observeLeader() throws Exception { } } catch (Exception e) { LOG.warn("Exception when observing the leader", e); - try { - sock.close(); - } catch (IOException e1) { - e1.printStackTrace(); - } + closeSocket(); // clear pending revalidations pendingRevalidations.clear(); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java index b3e1e07d2d3..ff42204db74 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java @@ -970,7 +970,10 @@ public void testFailedTxnAsPartOfQuorumLoss() throws Exception { int leader = servers.findLeader(); Map outstanding = servers.mt[leader].main.quorumPeer.leader.outstandingProposals; // increase the tick time to delay the leader going to looking + int previousTick = servers.mt[leader].main.quorumPeer.tickTime; servers.mt[leader].main.quorumPeer.tickTime = LEADER_TIMEOUT_MS; + // let the previous tick on the leader exhaust itself so the new tick time takes effect + Thread.sleep(previousTick); LOG.warn("LEADER {}", leader); for (int i = 0; i < SERVER_COUNT; i++) { diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigFailureCasesTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigFailureCasesTest.java index 8120d0fa28a..e41849094b5 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigFailureCasesTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigFailureCasesTest.java @@ -158,49 +158,6 @@ public void testReconfigVersionConditionFails() throws Exception { ReconfigTest.closeAllHandles(zkArr, zkAdminArr); } - /* - * Tests that if a quorum of a new config is synced with the leader and a reconfig - * is allowed to start but then the new quorum is lost, the leader will time out and - * we go to leader election. - */ - @Test - public void testLeaderTimesoutOnNewQuorum() throws Exception { - qu = new QuorumUtil(1); // create 3 servers - qu.disableJMXTest = true; - qu.startAll(); - ZooKeeper[] zkArr = ReconfigTest.createHandles(qu); - ZooKeeperAdmin[] zkAdminArr = ReconfigTest.createAdminHandles(qu); - - List leavingServers = new ArrayList(); - leavingServers.add("3"); - qu.shutdown(2); - try { - // Since we just shut down server 2, its still considered "synced" - // by the leader, which allows us to start the reconfig - // (PrepRequestProcessor checks that a quorum of the new - // config is synced before starting a reconfig). - // We try to remove server 3, which requires a quorum of {1,2,3} - // (we have that) and of {1,2}, but 2 is down so we won't get a - // quorum of new config ACKs. - zkAdminArr[1].reconfigure(null, leavingServers, null, -1, null); - Assert.fail("Reconfig should have failed since we don't have quorum of new config"); - } catch (KeeperException.ConnectionLossException e) { - // We expect leader to lose quorum of proposed config and time out - } catch (Exception e) { - Assert.fail("Should have been ConnectionLossException!"); - } - - // The leader should time out and remaining servers should go into - // LOOKING state. A new leader won't be established since that - // would require completing the reconfig, which is not possible while - // 2 is down. - Assert.assertEquals(QuorumStats.Provider.LOOKING_STATE, - qu.getPeer(1).peer.getServerState()); - Assert.assertEquals(QuorumStats.Provider.LOOKING_STATE, - qu.getPeer(3).peer.getServerState()); - ReconfigTest.closeAllHandles(zkArr, zkAdminArr); - } - /* * Converting an observer into a participant may sometimes fail with a * NewConfigNoQuorum exception. This test-case demonstrates the scenario. From de08845c51ed1f1f4c644561080564da8ca0e255 Mon Sep 17 00:00:00 2001 From: fanyang Date: Fri, 26 Mar 2021 19:28:02 +0800 Subject: [PATCH 2/2] ZOOKEEPER-4262: Backport ZOOKEEPER-3911 to branch-3.5.9 ZOOKEEPER-3911 requires ZOOKEEPER-3240 to pass the unit test. --- .../zookeeper/server/ZooKeeperServer.java | 16 +- .../zookeeper/server/quorum/Learner.java | 18 +- .../quorum/DIFFSyncConsistencyTest.java | 293 ++++++++++++++++++ 3 files changed, 323 insertions(+), 4 deletions(-) create mode 100644 zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncConsistencyTest.java diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java index 4e4feaf6807..23591c27251 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -467,6 +467,19 @@ public void startdata() } public synchronized void startup() { + startupWithServerState(State.RUNNING); + } + + public synchronized void startupWithoutServing() { + startupWithServerState(State.INITIAL); + } + + public synchronized void startServing() { + setState(State.RUNNING); + notifyAll(); + } + + private void startupWithServerState(State state) { if (sessionTracker == null) { createSessionTracker(); } @@ -475,7 +488,8 @@ public synchronized void startup() { registerJMX(); - setState(State.RUNNING); + setState(state); + notifyAll(); } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index 9c8a019a060..3f282cac23b 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -559,8 +559,21 @@ else if (qp.getType() == Leader.SNAP) { } self.setCurrentEpoch(newEpoch); - writeToTxnLog = true; //Anything after this needs to go to the transaction log, not applied directly in memory + writeToTxnLog = true; + //Anything after this needs to go to the transaction log, not applied directly in memory isPreZAB1_0 = false; + + // ZOOKEEPER-3911: make sure sync the uncommitted logs before commit them (ACK NEWLEADER). + sock.setSoTimeout(self.tickTime * self.syncLimit); + zk.startupWithoutServing(); + if (zk instanceof FollowerZooKeeperServer) { + FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; + for (PacketInFlight p : packetsNotCommitted) { + fzk.logRequest(p.hdr, p.rec); + } + packetsNotCommitted.clear(); + } + writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true); break; } @@ -568,8 +581,7 @@ else if (qp.getType() == Leader.SNAP) { } ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0)); writePacket(ack, true); - sock.setSoTimeout(self.tickTime * self.syncLimit); - zk.startup(); + zk.startServing(); /* * Update the election vote here to ensure that all members of the * ensemble report the same vote to new servers that start up and diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncConsistencyTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncConsistencyTest.java new file mode 100644 index 00000000000..103cbe35268 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncConsistencyTest.java @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import static org.apache.zookeeper.server.quorum.QuorumPeerMainTest.waitForOne; +import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.util.Map; +import javax.security.sasl.SaslException; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.PortAssignment; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooDefs.OpCode; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.ZooKeeper.States; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import org.apache.zookeeper.server.quorum.Leader.Proposal; +import org.apache.zookeeper.test.ClientBase; +import org.apache.zookeeper.test.ClientBase.CountdownWatcher; +import org.junit.After; +import org.junit.Test; + +public class DIFFSyncConsistencyTest extends QuorumPeerTestBase { + + private static int SERVER_COUNT = 3; + private MainThread[] mt = new MainThread[SERVER_COUNT]; + + @Test(timeout = 120 * 1000) + public void testInconsistentDueToUncommittedLog() throws Exception { + final int LEADER_TIMEOUT_MS = 10_000; + final int[] clientPorts = new int[SERVER_COUNT]; + + StringBuilder sb = new StringBuilder(); + String server; + for (int i = 0; i < SERVER_COUNT; i++) { + clientPorts[i] = PortAssignment.unique(); + server = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + + ":participant;127.0.0.1:" + clientPorts[i]; + sb.append(server + "\n"); + } + String currentQuorumCfgSection = sb.toString(); + + for (int i = 0; i < SERVER_COUNT; i++) { + mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, false) { + @Override + public TestQPMain getTestQPMain() { + return new MockTestQPMain(); + } + }; + mt[i].start(); + } + + for (int i = 0; i < SERVER_COUNT; i++) { + assertTrue("waiting for server " + i + " being up", + ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], CONNECTION_TIMEOUT)); + } + + int leader = findLeader(mt); + CountdownWatcher watch = new CountdownWatcher(); + ZooKeeper zk = new ZooKeeper("127.0.0.1:" + clientPorts[leader], ClientBase.CONNECTION_TIMEOUT, watch); + watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT); + + Map outstanding = mt[leader].main.quorumPeer.leader.outstandingProposals; + // Increase the tick time to delay the leader going to looking to allow us proposal a transaction while other + // followers are offline. + int previousTick = mt[leader].main.quorumPeer.tickTime; + mt[leader].main.quorumPeer.tickTime = LEADER_TIMEOUT_MS; + // Let the previous tick on the leader exhaust itself so the new tick time takes effect + Thread.sleep(previousTick); + + LOG.info("LEADER ELECTED {}", leader); + + // Shutdown followers to make sure we don't accidentally send the proposal we are going to make to follower. + // In other words, we want to make sure the followers get the proposal later through DIFF sync. + for (int i = 0; i < SERVER_COUNT; i++) { + if (i != leader) { + mt[i].shutdown(); + } + } + + // Send a create request to old leader and make sure it's synced to disk. + try { + zk.create("/zk" + leader, "zk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + fail("create /zk" + leader + " should have failed"); + } catch (KeeperException e) { + } + + // Make sure that we actually did get it in process at the leader; there can be extra sessionClose proposals. + assertTrue(outstanding.size() > 0); + Proposal p = findProposalOfType(outstanding, OpCode.create); + LOG.info("Old leader id: {}. All proposals: {}", leader, outstanding); + assertNotNull("Old leader doesn't have 'create' proposal", p); + + // Make sure leader sync the proposal to disk. + int sleepTime = 0; + Long longLeader = (long) leader; + while (!p.qvAcksetPairs.get(0).getAckset().contains(longLeader)) { + if (sleepTime > 2000) { + fail("Transaction not synced to disk within 1 second " + p.qvAcksetPairs.get(0).getAckset() + " expected " + leader); + } + Thread.sleep(100); + sleepTime += 100; + } + + // Start controlled followers where we deliberately make the follower fail once follower receive the UPTODATE + // message from leader. Because followers only persist proposals from DIFF sync after UPTODATE, this can + // deterministically simulate the situation where followers ACK NEWLEADER (which makes leader think she has the + // quorum support, but actually not afterwards) but immediately fail afterwards without persisting the proposals + // from DIFF sync. + for (int i = 0; i < SERVER_COUNT; i++) { + if (i == leader) { + continue; + } + + mt[i].start(); + int sleepCount = 0; + while (mt[i].getQuorumPeer() == null) { + ++sleepCount; + if (sleepCount > 100) { + fail("Can't start follower " + i + " !"); + } + Thread.sleep(100); + } + + ((CustomQuorumPeer) mt[i].getQuorumPeer()).setInjectError(true); + LOG.info("Follower {} started.", i); + } + + // Verify leader can see it. The fact that leader can see it implies that + // leader should, at this point in time, get a quorum of ACK of NEWLEADER + // from two followers so leader can start serving requests; this also implies + // that DIFF sync from leader to followers are finished at this point in time. + // We then verify later that followers should have the same view after we shutdown + // this leader, otherwise it's a violation of ZAB / sequential consistency. + int c = 0; + while (c < 100) { + ++c; + try { + Stat stat = zk.exists("/zk" + leader, false); + assertNotNull("server " + leader + " should have /zk", stat); + break; + } catch (KeeperException.ConnectionLossException e) { + + } + Thread.sleep(100); + } + + // Shutdown all servers + for (int i = 0; i < SERVER_COUNT; i++) { + mt[i].shutdown(); + } + waitForOne(zk, States.CONNECTING); + + // Now restart all servers except the old leader. Only old leader has the transaction sync to disk. + // The old followers only had in memory view of the transaction, and they didn't have a chance + // to sync to disk because we made them fail at UPTODATE. + for (int i = 0; i < SERVER_COUNT; i++) { + if (i == leader) { + continue; + } + mt[i].start(); + int sleepCount = 0; + while (mt[i].getQuorumPeer() == null) { + ++sleepCount; + if (sleepCount > 100) { + fail("Can't start follower " + i + " !"); + } + Thread.sleep(100); + } + + ((CustomQuorumPeer) mt[i].getQuorumPeer()).setInjectError(false); + LOG.info("Follower {} started again.", i); + } + + int newLeader = findLeader(mt); + assertNotEquals("new leader is still the old leader " + leader + " !!", newLeader, leader); + + // This simulates the case where clients connected to the old leader had a view of the data + // "/zkX", but clients connect to the new leader does not have the same view of data (missing "/zkX"). + // This inconsistent view of the quorum exposed from leaders is a violation of ZAB. + for (int i = 0; i < SERVER_COUNT; i++) { + if (i != newLeader) { + continue; + } + zk.close(); + zk = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, watch); + watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT); + Stat val = zk.exists("/zk" + leader, false); + assertNotNull("Data inconsistency detected! Server " + i + " should have a view of /zk" + leader + "!", + val); + } + + zk.close(); + } + + @After + public void tearDown() { + for (int i = 0; i < mt.length; i++) { + try { + mt[i].shutdown(); + } catch (InterruptedException e) { + LOG.warn("Quorum Peer interrupted while shutting it down", e); + } + } + } + + static class CustomQuorumPeer extends QuorumPeer { + + private volatile boolean injectError = false; + + public CustomQuorumPeer() throws SaslException { + + } + + @Override + protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException { + return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.getZkDb())) { + + @Override + void readPacket(QuorumPacket pp) throws IOException { + /** + * In real scenario got SocketTimeoutException while reading + * the packet from leader because of network problem, but + * here throwing SocketTimeoutException based on whether + * error is injected or not + */ + super.readPacket(pp); + if (injectError && pp.getType() == Leader.UPTODATE) { + String type = LearnerHandler.packetToString(pp); + throw new SocketTimeoutException("Socket timeout while reading the packet for operation " + + type); + } + } + + }; + } + + public void setInjectError(boolean injectError) { + this.injectError = injectError; + } + + } + + static class MockTestQPMain extends TestQPMain { + + @Override + protected QuorumPeer getQuorumPeer() throws SaslException { + return new CustomQuorumPeer(); + } + + } + + private Proposal findProposalOfType(Map proposals, int type) { + for (Proposal proposal : proposals.values()) { + if (proposal.request.getHdr().getType() == type) { + return proposal; + } + } + return null; + } + + private int findLeader(MainThread[] mt) { + for (int i = 0; i < mt.length; i++) { + if (mt[i].main.quorumPeer.leader != null) { + return i; + } + } + return -1; + } +}