diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ObserverBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ObserverBean.java index 167c96d27d8..e648f155393 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ObserverBean.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ObserverBean.java @@ -21,6 +21,7 @@ import org.apache.zookeeper.server.quorum.Observer; import org.apache.zookeeper.server.quorum.ObserverMXBean; import org.apache.zookeeper.server.quorum.QuorumPeer; +import java.net.InetSocketAddress; /** * ObserverBean @@ -52,7 +53,8 @@ public String getLearnerMaster() { if (learnerMaster == null || learnerMaster.addr == null) { return "Unknown"; } - return learnerMaster.addr.getAddress().getHostAddress() + ":" + learnerMaster.addr.getPort(); + InetSocketAddress address = learnerMaster.addr.getReachableOrOne(); + return address.getAddress().getHostAddress() + ":" + address.getPort(); } public void setLearnerMaster(String learnerMaster) { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java index 01f5f9a148b..c3ecac5333b 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -1186,7 +1186,7 @@ public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOE // there might be a race condition that it enabled recv after // processing request and then disabled when check throttling. // - // Be aware that we're actually checking the global outstanding + // Be aware that we're actually checking the global outstanding // request before this request. // // It's fine if the IOException thrown before we decrease the count diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java index 933cbfd486c..a3f9aa74b65 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketException; import java.nio.ByteBuffer; @@ -44,8 +45,6 @@ import org.apache.zookeeper.jmx.MBeanRegistry; import org.apache.zookeeper.server.ZooKeeperThread; -import org.apache.zookeeper.server.quorum.Election; -import org.apache.zookeeper.server.quorum.Vote; import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState; @@ -732,8 +731,8 @@ private void process(ToSend m) { } for (QuorumServer server : self.getVotingView().values()) { - InetSocketAddress saddr = new InetSocketAddress(server.addr - .getAddress(), port); + InetAddress address = server.addr.getReachableOrOne().getAddress(); + InetSocketAddress saddr = new InetSocketAddress(address, port); addrChallengeMap.put(saddr, new ConcurrentHashMap()); } @@ -763,7 +762,7 @@ public AuthFastLeaderElection(QuorumPeer self) { private void starter(QuorumPeer self) { this.self = self; - port = self.getVotingView().get(self.getId()).electionAddr.getPort(); + port = self.getVotingView().get(self.getId()).electionAddr.getAllPorts().get(0); proposedLeader = -1; proposedZxid = -1; @@ -786,11 +785,10 @@ private void leaveInstance() { private void sendNotifications() { for (QuorumServer server : self.getView().values()) { - + InetSocketAddress address = self.getView().get(server.id).electionAddr.getReachableOrOne(); ToSend notmsg = new ToSend(ToSend.mType.notification, AuthFastLeaderElection.sequencer++, proposedLeader, - proposedZxid, logicalclock.get(), QuorumPeer.ServerState.LOOKING, - self.getView().get(server.id).electionAddr); + proposedZxid, logicalclock.get(), QuorumPeer.ServerState.LOOKING, address); sendqueue.offer(notmsg); } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java index 49280d3dd3c..12d5964c785 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java @@ -19,7 +19,6 @@ package org.apache.zookeeper.server.quorum; import java.io.IOException; -import java.net.InetSocketAddress; import java.nio.ByteBuffer; import org.apache.jute.Record; @@ -76,13 +75,14 @@ void followLeader() throws InterruptedException { self.start_fle = 0; self.end_fle = 0; fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean); + try { QuorumServer leaderServer = findLeader(); try { connectToLeader(leaderServer.addr, leaderServer.hostname); long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO); if (self.isReconfigStateChange()) - throw new Exception("learned about role change"); + throw new Exception("learned about role change"); //check to see if the leader zxid is lower than ours //this should never happen but is just a safety check long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid); @@ -119,7 +119,6 @@ void followLeader() throws InterruptedException { } catch (IOException e1) { e1.printStackTrace(); } - // clear pending revalidations pendingRevalidations.clear(); } @@ -127,7 +126,7 @@ void followLeader() throws InterruptedException { if (om != null) { om.stop(); } - zk.unregisterJMX((Learner)this); + zk.unregisterJMX((Learner) this); } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java index c284debfc49..fc58d478ac8 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java @@ -25,6 +25,7 @@ import java.io.DataOutputStream; import java.io.IOException; import java.net.BindException; +import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketAddress; @@ -35,13 +36,20 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import javax.security.sasl.SaslException; @@ -257,41 +265,44 @@ public boolean isQuorumSynced(QuorumVerifier qv) { return qv.containsQuorum(ids); } - private final ServerSocket ss; + private List serverSockets; Leader(QuorumPeer self,LeaderZooKeeperServer zk) throws IOException { this.self = self; this.proposalStats = new BufferStats(); + + Set addresses; + if (self.getQuorumListenOnAllIPs()) { + addresses = self.getQuorumAddress().getWildcardAddresses(); + } else { + addresses = self.getQuorumAddress().getAllAddresses(); + } + + serverSockets = new LinkedList<>(); + for (InetSocketAddress address : addresses) + serverSockets.add(createServerSocket(address, self.shouldUsePortUnification(), self.isSslQuorum())); + + this.zk = zk; + this.learnerSnapshotThrottler = createLearnerSnapshotThrottler( + maxConcurrentSnapshots, maxConcurrentSnapshotTimeout); + } + + ServerSocket createServerSocket(InetSocketAddress address, boolean portUnification, boolean sslQuorum) + throws IOException { + ServerSocket serverSocket; try { - if (self.shouldUsePortUnification() || self.isSslQuorum()) { - boolean allowInsecureConnection = self.shouldUsePortUnification(); - if (self.getQuorumListenOnAllIPs()) { - ss = new UnifiedServerSocket(self.getX509Util(), allowInsecureConnection, self.getQuorumAddress().getPort()); - } else { - ss = new UnifiedServerSocket(self.getX509Util(), allowInsecureConnection); - } + if (portUnification || sslQuorum) { + serverSocket = new UnifiedServerSocket(self.getX509Util(), portUnification); } else { - if (self.getQuorumListenOnAllIPs()) { - ss = new ServerSocket(self.getQuorumAddress().getPort()); - } else { - ss = new ServerSocket(); - } - } - ss.setReuseAddress(true); - if (!self.getQuorumListenOnAllIPs()) { - ss.bind(self.getQuorumAddress()); + serverSocket = new ServerSocket(); } + serverSocket.setReuseAddress(true); + serverSocket.bind(address); + return serverSocket; } catch (BindException e) { - if (self.getQuorumListenOnAllIPs()) { - LOG.error("Couldn't bind to port " + self.getQuorumAddress().getPort(), e); - } else { - LOG.error("Couldn't bind to " + self.getQuorumAddress(), e); - } + LOG.error("Couldn't bind to " + self.getQuorumAddress(), e); throw e; } - this.zk = zk; - this.learnerSnapshotThrottler = createLearnerSnapshotThrottler( - maxConcurrentSnapshots, maxConcurrentSnapshotTimeout); } /** @@ -405,70 +416,112 @@ public boolean isQuorumSynced(QuorumVerifier qv) { protected final Proposal newLeaderProposal = new Proposal(); class LearnerCnxAcceptor extends ZooKeeperCriticalThread { - private volatile boolean stop = false; + private AtomicBoolean stop; + private AtomicBoolean fail; public LearnerCnxAcceptor() { - super("LearnerCnxAcceptor-" + ss.getLocalSocketAddress(), zk + super("LearnerCnxAcceptor-" + + serverSockets.stream().map(ServerSocket::getLocalSocketAddress).map(Objects::toString) + .collect(Collectors.joining(",")), zk .getZooKeeperServerListener()); + stop = new AtomicBoolean(false); + fail = new AtomicBoolean(false); } @Override public void run() { - try { - while (!stop) { - Socket s = null; - boolean error = false; - try { - s = ss.accept(); - - // start with the initLimit, once the ack is processed - // in LearnerHandler switch to the syncLimit - s.setSoTimeout(self.tickTime * self.initLimit); - s.setTcpNoDelay(nodelay); - - BufferedInputStream is = new BufferedInputStream( - s.getInputStream()); - LearnerHandler fh = new LearnerHandler(s, is, Leader.this); - fh.start(); - } catch (SocketException e) { - error = true; - if (stop) { - LOG.info("exception while shutting down acceptor: " - + e); - - // When Leader.shutdown() calls ss.close(), - // the call to accept throws an exception. - // We catch and set stop to true. - stop = true; - } else { - throw e; - } - } catch (SaslException e){ - LOG.error("Exception while connecting to quorum learner", e); - error = true; - } catch (Exception e) { - error = true; + if (!stop.get() && serverSockets != null) { + ExecutorService executor = Executors.newFixedThreadPool(serverSockets.size()); + CountDownLatch latch = new CountDownLatch(serverSockets.size()); + + serverSockets.forEach(serverSocket -> + executor.submit(new LearnerCnxAcceptorHandler(serverSocket, latch))); + + try { + latch.await(); + } catch (InterruptedException ie) { + LOG.error("Interrupted while sleeping. " + + "Ignoring exception", ie); + } finally { + closeSockets(); + } + } + } + + public void halt() { + stop.set(true); + closeSockets(); + } + + class LearnerCnxAcceptorHandler implements Runnable { + private ServerSocket serverSocket; + private CountDownLatch latch; + + LearnerCnxAcceptorHandler(ServerSocket serverSocket, CountDownLatch latch) { + this.serverSocket = serverSocket; + this.latch = latch; + } + + @Override + public void run() { + try { + Thread.currentThread().setName("LearnerCnxAcceptorHandler-" + serverSocket.getLocalSocketAddress()); + + while (!stop.get()) { + acceptConnections(); + } + } catch (Exception e) { + LOG.warn("Exception while accepting follower", e); + if (fail.compareAndSet(false, true)) { + handleException(getName(), e); + halt(); + } + } finally { + latch.countDown(); + } + } + + private void acceptConnections() throws IOException { + Socket socket = null; + boolean error = false; + try { + socket = serverSocket.accept(); + + // start with the initLimit, once the ack is processed + // in LearnerHandler switch to the syncLimit + socket.setSoTimeout(self.tickTime * self.initLimit); + socket.setTcpNoDelay(nodelay); + + BufferedInputStream is = new BufferedInputStream(socket.getInputStream()); + LearnerHandler fh = new LearnerHandler(socket, is, Leader.this); + fh.start(); + } catch (SocketException e) { + error = true; + if (stop.get()) { + LOG.info("Exception while shutting down acceptor", e); + } else { throw e; - } finally { - // Don't leak sockets on errors - if (error && s != null && !s.isClosed()) { - try { - s.close(); - } catch (IOException e) { - LOG.warn("Error closing socket", e); - } + } + } catch (SaslException e) { + LOG.error("Exception while connecting to quorum learner", e); + error = true; + } catch (Exception e) { + error = true; + throw e; + } finally { + // Don't leak sockets on errors + if (error && socket != null && !socket.isClosed()) { + try { + socket.close(); + } catch (IOException e) { + LOG.warn("Error closing socket", e); } } } - } catch (Exception e) { - LOG.warn("Exception while accepting follower", e.getMessage()); - handleException(this.getName(), e); } - } - public void halt() { - stop = true; } + } StateSummary leaderStateSummary; @@ -726,16 +779,13 @@ void shutdown(String reason) { if (cnxAcceptor != null) { cnxAcceptor.halt(); + } else { + closeSockets(); } // NIO should not accept conenctions self.setZooKeeperServer(null); self.adminServer.setZooKeeperServer(null); - try { - ss.close(); - } catch (IOException e) { - LOG.warn("Ignoring unexpected exception during close",e); - } self.closeAllConnections(); // shutdown the previous zk if (zk != null) { @@ -752,6 +802,19 @@ void shutdown(String reason) { isShutdown = true; } + synchronized void closeSockets() { + if (serverSockets != null) + for (ServerSocket serverSocket : serverSockets) { + if (!serverSocket.isClosed()) { + try { + serverSocket.close(); + } catch (IOException e) { + LOG.warn("Ignoring unexpected exception during close {}", serverSocket, e); + } + } + } + } + /** In a reconfig operation, this method attempts to find the best leader for next configuration. * If the current leader is a voter in the next configuartion, then it remains the leader. * Otherwise, choose one of the new voters that acked the reconfiguartion, such that it is as diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index 307b644fc13..23c7e4d3d23 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -24,14 +24,18 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; -import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.ByteBuffer; import java.util.LinkedList; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; import org.apache.jute.BinaryInputArchive; import org.apache.jute.BinaryOutputArchive; @@ -249,53 +253,23 @@ protected void sockConnect(Socket sock, InetSocketAddress addr, int timeout) * @param addr - the address of the Peer to connect to. * @throws IOException - if the socket connection fails on the 5th attempt *
  • if there is an authentication failure while connecting to leader
  • - * @throws ConnectException * @throws InterruptedException */ - protected void connectToLeader(InetSocketAddress addr, String hostname) - throws IOException, InterruptedException, X509Exception { - this.sock = createSocket(); + protected void connectToLeader(MultipleAddresses addr, String hostname) + throws IOException, InterruptedException { - int initLimitTime = self.tickTime * self.initLimit; - int remainingInitLimitTime; - long startNanoTime = nanoTime(); + Set addresses = addr.getAllAddresses(); + ExecutorService executor = Executors.newFixedThreadPool(addresses.size()); + CountDownLatch latch = new CountDownLatch(addresses.size()); + AtomicReference socket = new AtomicReference<>(null); + addresses.stream().map(address -> new LeaderConnector(address, socket, latch)).forEach(executor::submit); - for (int tries = 0; tries < 5; tries++) { - try { - // recalculate the init limit time because retries sleep for 1000 milliseconds - remainingInitLimitTime = initLimitTime - (int)((nanoTime() - startNanoTime) / 1000000); - if (remainingInitLimitTime <= 0) { - LOG.error("initLimit exceeded on retries."); - throw new IOException("initLimit exceeded on retries."); - } + latch.await(); - sockConnect(sock, addr, Math.min(self.tickTime * self.syncLimit, remainingInitLimitTime)); - if (self.isSslQuorum()) { - ((SSLSocket) sock).startHandshake(); - } - sock.setTcpNoDelay(nodelay); - break; - } catch (IOException e) { - remainingInitLimitTime = initLimitTime - (int)((nanoTime() - startNanoTime) / 1000000); - - if (remainingInitLimitTime <= 1000) { - LOG.error("Unexpected exception, initLimit exceeded. tries=" + tries + - ", remaining init limit=" + remainingInitLimitTime + - ", connecting to " + addr,e); - throw e; - } else if (tries >= 4) { - LOG.error("Unexpected exception, retries exceeded. tries=" + tries + - ", remaining init limit=" + remainingInitLimitTime + - ", connecting to " + addr,e); - throw e; - } else { - LOG.warn("Unexpected exception, tries=" + tries + - ", remaining init limit=" + remainingInitLimitTime + - ", connecting to " + addr,e); - this.sock = createSocket(); - } - } - Thread.sleep(leaderConnectDelayDuringRetryMs); + if (socket.get() == null) { + throw new IOException("Failed connect to " + addr); + } else { + sock = socket.get(); } self.authLearner.authenticate(sock, hostname); @@ -306,6 +280,81 @@ protected void connectToLeader(InetSocketAddress addr, String hostname) leaderOs = BinaryOutputArchive.getArchive(bufferedOutput); } + class LeaderConnector implements Runnable { + + private AtomicReference socket; + private InetSocketAddress address; + private CountDownLatch latch; + + LeaderConnector(InetSocketAddress address, AtomicReference socket, CountDownLatch latch) { + this.address = address; + this.socket = socket; + this.latch = latch; + } + + @Override + public void run() { + try { + Thread.currentThread().setName("LeaderConnector-" + address); + Socket sock = connectToLeader(); + + if (sock != null && sock.isConnected() && !socket.compareAndSet(null, sock)) { + LOG.info("Connection to the leader is already established, close the redundant connection"); + sock.close(); + } + + } catch (Exception e) { + LOG.error("Failed connect to {}", address, e); + } finally { + latch.countDown(); + } + } + + private Socket connectToLeader() throws IOException, X509Exception, InterruptedException { + Socket sock = createSocket(); + + int initLimitTime = self.tickTime * self.initLimit; + int remainingInitLimitTime; + long startNanoTime = nanoTime(); + + for (int tries = 0; tries < 5 && socket.get() == null; tries++) { + try { + // recalculate the init limit time because retries sleep for 1000 milliseconds + remainingInitLimitTime = initLimitTime - (int) ((nanoTime() - startNanoTime) / 1000000); + if (remainingInitLimitTime <= 0) { + LOG.error("initLimit exceeded on retries."); + throw new IOException("initLimit exceeded on retries."); + } + sockConnect(sock, address, Math.min(self.tickTime * self.syncLimit, remainingInitLimitTime)); + if (self.isSslQuorum()) { + ((SSLSocket) sock).startHandshake(); + } + sock.setTcpNoDelay(nodelay); + break; + } catch (IOException e) { + remainingInitLimitTime = initLimitTime - (int) ((nanoTime() - startNanoTime) / 1000000); + + if (remainingInitLimitTime <= leaderConnectDelayDuringRetryMs) { + LOG.error("Unexpected exception, initLimit exceeded. tries={}, remaining init limit={}, " + + "connecting to {}", tries, remainingInitLimitTime, address, e); + throw e; + } else if (tries >= 4) { + LOG.error("Unexpected exception, retries exceeded. tries={}, remaining init limit={}, " + + "connecting to {}", tries, remainingInitLimitTime, address, e); + throw e; + } else { + LOG.warn("Unexpected exception, tries={}, remaining init limit={}, connecting to {}", tries, + remainingInitLimitTime, address, e); + sock = createSocket(); + } + } + Thread.sleep(leaderConnectDelayDuringRetryMs); + } + + return sock; + } + } + private Socket createSocket() throws X509Exception, IOException { Socket sock; if (self.isSslQuorum()) { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LocalPeerBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LocalPeerBean.java index 170d712ee1e..4ea99cef59e 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LocalPeerBean.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LocalPeerBean.java @@ -18,6 +18,8 @@ package org.apache.zookeeper.server.quorum; +import org.apache.zookeeper.common.NetUtils; +import java.util.stream.Collectors; import static org.apache.zookeeper.common.NetUtils.formatInetAddr; /** @@ -79,7 +81,8 @@ public String getState() { } public String getQuorumAddress() { - return formatInetAddr(peer.getQuorumAddress()); + return peer.getQuorumAddress().getAllAddresses().stream().map(NetUtils::formatInetAddr) + .collect(Collectors.joining(",")); } public int getElectionType() { @@ -87,7 +90,8 @@ public int getElectionType() { } public String getElectionAddress() { - return formatInetAddr(peer.getElectionAddress()); + return peer.getElectionAddress().getAllAddresses().stream().map(NetUtils::formatInetAddr) + .collect(Collectors.joining(",")); } public String getClientAddress() { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/MultipleAddresses.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/MultipleAddresses.java new file mode 100644 index 00000000000..a26b28d0165 --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/MultipleAddresses.java @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

    + * http://www.apache.org/licenses/LICENSE-2.0 + *

    + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.NoRouteToHostException; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This class allows to store several quorum and electing addresses. + * + * See ZOOKEEPER-3188 for a discussion of this feature. + */ +public class MultipleAddresses { + private final static int DEFAULT_TIMEOUT = 100; + + private Set addresses; + private int timeout; + + public MultipleAddresses() { + addresses = Collections.newSetFromMap(new ConcurrentHashMap<>()); + timeout = DEFAULT_TIMEOUT; + } + + public MultipleAddresses(List addresses) { + this(addresses, DEFAULT_TIMEOUT); + } + + public MultipleAddresses(InetSocketAddress address) { + this(address, DEFAULT_TIMEOUT); + } + + public MultipleAddresses(List addresses, int timeout) { + this.addresses = Collections.newSetFromMap(new ConcurrentHashMap<>()); + this.addresses.addAll(addresses); + this.timeout = timeout; + } + + public MultipleAddresses(InetSocketAddress address, int timeout) { + addresses = Collections.newSetFromMap(new ConcurrentHashMap<>()); + addresses.add(address); + this.timeout = timeout; + } + + public int getTimeout() { + return timeout; + } + + public void setTimeout(int timeout) { + this.timeout = timeout; + } + + public boolean isEmpty() { + return addresses.isEmpty(); + } + + /** + * Returns all addresses. + * + * @return set of all InetSocketAddress + */ + public Set getAllAddresses() { + return Collections.unmodifiableSet(addresses); + } + + /** + * Returns wildcard addresses for all ports + * + * @return set of InetSocketAddress with wildcards for all ports + */ + public Set getWildcardAddresses() { + return addresses.stream().map(a -> new InetSocketAddress(a.getPort())).collect(Collectors.toSet()); + } + + /** + * Returns all ports + * + * @return list of all ports + */ + public List getAllPorts() { + return addresses.stream().map(InetSocketAddress::getPort).distinct().collect(Collectors.toList()); + } + + public void addAddress(InetSocketAddress address) { + addresses.add(address); + } + + /** + * Returns reachable address. If none is reachable than throws exception. + * + * @return address which is reachable. + * @throws NoRouteToHostException if none address is reachable + */ + public InetSocketAddress getReachableAddress() throws NoRouteToHostException { + AtomicReference address = new AtomicReference<>(null); + getInetSocketAddressStream().forEach(addr -> checkIfAddressIsReachableAndSet(addr, address)); + + if (address.get() != null) { + return address.get(); + } else { + throw new NoRouteToHostException("No valid address among " + addresses); + } + } + + /** + * Returns reachable address or first one, if none is reachable. + * + * @return address which is reachable or fist one. + */ + public InetSocketAddress getReachableOrOne() { + InetSocketAddress address; + try { + address = getReachableAddress(); + } catch (NoRouteToHostException e) { + address = getOne(); + } + return address; + } + + /** + * Performs a DNS lookup for addresses. + * + * If the DNS lookup fails, than address remain unmodified. + */ + public void recreateSocketAddresses() { + Set temp = Collections.newSetFromMap(new ConcurrentHashMap<>()); + temp.addAll(getInetSocketAddressStream().map(this::recreateSocketAddress).collect(Collectors.toSet())); + addresses = temp; + } + + /** + * Returns first address from set. + * + * @return address from a set. + */ + public InetSocketAddress getOne() { + return addresses.iterator().next(); + } + + private void checkIfAddressIsReachableAndSet(InetSocketAddress address, + AtomicReference reachableAddress) { + for (int i = 0; i < 5 && reachableAddress.get() == null; i++) { + try { + if (address.getAddress().isReachable((i + 1) * timeout)) { + reachableAddress.compareAndSet(null, address); + break; + } + Thread.sleep(timeout); + } catch (NullPointerException | IOException | InterruptedException ignored) { + } + } + } + + private InetSocketAddress recreateSocketAddress(InetSocketAddress address) { + try { + return new InetSocketAddress(InetAddress.getByName(address.getHostString()), address.getPort()); + } catch (UnknownHostException e) { + return address; + } + } + + private Stream getInetSocketAddressStream() { + if (addresses.size() > 1) { + return addresses.parallelStream(); + } else { + return addresses.stream(); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } else if (o == null || getClass() != o.getClass()) { + return false; + } + + MultipleAddresses that = (MultipleAddresses) o; + return Objects.equals(addresses, that.addresses); + } + + @Override + public int hashCode() { + return Objects.hash(addresses); + } + + @Override + public String toString() { + return addresses.stream().map(InetSocketAddress::toString).collect(Collectors.joining(",")); + } +} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMaster.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMaster.java index 7308f6597a6..da78873ce4b 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMaster.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMaster.java @@ -26,6 +26,7 @@ import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; +import java.net.InetAddress; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketAddress; @@ -435,23 +436,19 @@ synchronized public void start() throws IOException { } listenerRunning = true; int backlog = 10; // dog science + InetAddress address = self.getQuorumAddress().getReachableOrOne().getAddress(); if (self.shouldUsePortUnification() || self.isSslQuorum()) { boolean allowInsecureConnection = self.shouldUsePortUnification(); if (self.getQuorumListenOnAllIPs()) { ss = new UnifiedServerSocket(self.getX509Util(), allowInsecureConnection, port, backlog); } else { - ss = new UnifiedServerSocket( - self.getX509Util(), - allowInsecureConnection, - port, - backlog, - self.getQuorumAddress().getAddress()); + ss = new UnifiedServerSocket(self.getX509Util(), allowInsecureConnection, port, backlog, address); } } else { if (self.getQuorumListenOnAllIPs()) { ss = new ServerSocket(port, backlog); } else { - ss = new ServerSocket(port, backlog, self.getQuorumAddress().getAddress()); + ss = new ServerSocket(port, backlog, address); } } thread = new Thread(this, "ObserverMaster"); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java index 467533308a8..42c6540f860 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java @@ -20,6 +20,7 @@ import java.io.BufferedInputStream; import java.io.BufferedOutputStream; +import java.io.Closeable; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; @@ -35,18 +36,25 @@ import java.util.Collections; import java.util.Enumeration; import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.NoSuchElementException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import org.apache.zookeeper.common.NetUtils; import org.apache.zookeeper.common.X509Exception; import org.apache.zookeeper.server.ExitCode; import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException; @@ -332,7 +340,8 @@ public void testInitiateConnection(long sid) throws Exception { LOG.debug("Opening channel to server " + sid); Socket sock = new Socket(); setSockOpts(sock); - sock.connect(self.getVotingView().get(sid).electionAddr, cnxTO); + InetSocketAddress address = self.getVotingView().get(sid).electionAddr.getReachableOrOne(); + sock.connect(address, cnxTO); initiateConnection(sock, sid); } @@ -415,7 +424,8 @@ private boolean startConnection(Socket sock, Long sid) // represents protocol version (in other words - message type) dout.writeLong(PROTOCOL_VERSION); dout.writeLong(self.getId()); - String addr = formatInetAddr(self.getElectionAddress()); + InetSocketAddress address = self.getElectionAddress().getReachableOrOne(); + String addr = formatInetAddr(address); byte[] addr_bytes = addr.getBytes(); dout.writeInt(addr_bytes.length); dout.write(addr_bytes); @@ -575,7 +585,7 @@ private void handleConnection(Socket sock, DataInputStream din) closeSocket(sock); if (electionAddr != null) { - connectOne(sid, electionAddr); + connectOne(sid, new MultipleAddresses(electionAddr)); } else { connectOne(sid); } @@ -638,7 +648,7 @@ public void toSend(Long sid, ByteBuffer b) { * @param sid server id * @return boolean success indication */ - synchronized private boolean connectOne(long sid, InetSocketAddress electionAddr){ + synchronized private boolean connectOne(long sid, MultipleAddresses electionAddr){ if (senderWorkerMap.get(sid) != null) { LOG.debug("There is a connection already for server " + sid); return true; @@ -647,20 +657,22 @@ synchronized private boolean connectOne(long sid, InetSocketAddress electionAddr Socket sock = null; try { LOG.debug("Opening channel to server " + sid); - if (self.isSslQuorum()) { - SSLSocket sslSock = self.getX509Util().createSSLSocket(); - setSockOpts(sslSock); - sslSock.connect(electionAddr, cnxTO); - sslSock.startHandshake(); - sock = sslSock; - LOG.info("SSL handshake complete with {} - {} - {}", sslSock.getRemoteSocketAddress(), sslSock.getSession().getProtocol(), sslSock.getSession().getCipherSuite()); - } else { - sock = new Socket(); - setSockOpts(sock); - sock.connect(electionAddr, cnxTO); + InetSocketAddress address = electionAddr.getReachableOrOne(); - } - LOG.debug("Connected to server " + sid); + if (self.isSslQuorum()) { + SSLSocket sslSock = self.getX509Util().createSSLSocket(); + setSockOpts(sslSock); + sslSock.connect(address, cnxTO); + sslSock.startHandshake(); + sock = sslSock; + LOG.info("SSL handshake complete with {} - {} - {}", sslSock.getRemoteSocketAddress(), + sslSock.getSession().getProtocol(), sslSock.getSession().getCipherSuite()); + } else { + sock = new Socket(); + setSockOpts(sock); + sock.connect(address, cnxTO); + } + LOG.debug("Connected to server " + sid); // Sends connection request asynchronously if the quorum // sasl authentication is enabled. This is required because // sasl server authentication process may take few seconds to @@ -847,131 +859,209 @@ private void resetConnectionThreadCount() { } /** - * Thread to listen on some port + * Thread to listen on some ports */ public class Listener extends ZooKeeperThread { - volatile ServerSocket ss = null; + List listenerHandlers; + private AtomicBoolean bindException; public Listener() { // During startup of thread, thread name will be overridden to // specific election address super("ListenerThread"); + bindException = new AtomicBoolean(false); } - /** - * Sleeps on accept(). - */ @Override public void run() { - int numRetries = 0; - InetSocketAddress addr; - Socket client = null; - Exception exitException = null; - while((!shutdown) && (numRetries < 3)){ - try { - if (self.shouldUsePortUnification()) { - LOG.info("Creating TLS-enabled quorum server socket"); - ss = new UnifiedServerSocket(self.getX509Util(), true); - } else if (self.isSslQuorum()) { - LOG.info("Creating TLS-only quorum server socket"); - ss = new UnifiedServerSocket(self.getX509Util(), false); - } else { - ss = new ServerSocket(); - } + if(!shutdown) { + Set addresses; - ss.setReuseAddress(true); + if (self.getQuorumListenOnAllIPs()) { + addresses = self.getElectionAddress().getWildcardAddresses(); + } else { + addresses = self.getElectionAddress().getAllAddresses(); + } - if (self.getQuorumListenOnAllIPs()) { - int port = self.getElectionAddress().getPort(); - addr = new InetSocketAddress(port); - } else { - // Resolve hostname for this server in case the - // underlying ip address has changed. - self.recreateSocketAddresses(self.getId()); - addr = self.getElectionAddress(); - } - LOG.info("My election bind port: " + addr.toString()); - setName(addr.toString()); - ss.bind(addr); - while (!shutdown) { + CountDownLatch latch = new CountDownLatch(addresses.size()); + listenerHandlers = addresses.stream().map(address -> + new ListenerHandler(address, self.shouldUsePortUnification(), self.isSslQuorum(), latch)) + .collect(Collectors.toList()); + + ExecutorService executor = Executors.newFixedThreadPool(addresses.size()); + listenerHandlers.forEach(executor::submit); + + try { + latch.await(); + } catch (InterruptedException ie) { + LOG.error("Interrupted while sleeping. Ignoring exception", ie); + } finally { + // Clean up for shutdown. + for (ListenerHandler handler : listenerHandlers) { try { - client = ss.accept(); - setSockOpts(client); - LOG.info("Received connection request " - + formatInetAddr((InetSocketAddress)client.getRemoteSocketAddress())); - // Receive and handle the connection request - // asynchronously if the quorum sasl authentication is - // enabled. This is required because sasl server - // authentication process may take few seconds to finish, - // this may delay next peer connection requests. - if (quorumSaslAuthEnabled) { - receiveConnectionAsync(client); - } else { - receiveConnection(client); - } - numRetries = 0; - } catch (SocketTimeoutException e) { - LOG.warn("The socket is listening for the election accepted " - + "and it timed out unexpectedly, but will retry." - + "see ZOOKEEPER-2836"); + handler.close(); + } catch (IOException ie) { + // Don't log an error for shutdown. + LOG.debug("Error closing server socket", ie); } } - } catch (IOException e) { - if (shutdown) { - break; - } - LOG.error("Exception while listening", e); - exitException = e; - numRetries++; - try { - ss.close(); - Thread.sleep(1000); - } catch (IOException ie) { - LOG.error("Error closing server socket", ie); - } catch (InterruptedException ie) { - LOG.error("Interrupted while sleeping. " + - "Ignoring exception", ie); - } - closeSocket(client); } } + LOG.info("Leaving listener"); if (!shutdown) { LOG.error("As I'm leaving the listener thread, " + "I won't be able to participate in leader " - + "election any longer: " - + formatInetAddr(self.getElectionAddress())); - if (exitException instanceof BindException) { + + "election any longer: {}" + , self.getElectionAddress().getAllAddresses().stream().map(NetUtils::formatInetAddr) + .collect(Collectors.joining(","))); + if (bindException.get()) { // After leaving listener thread, the host cannot join the // quorum anymore, this is a severe error that we cannot // recover from, so we need to exit System.exit(ExitCode.UNABLE_TO_BIND_QUORUM_PORT.getValue()); } - } else if (ss != null) { - // Clean up for shutdown. - try { - ss.close(); - } catch (IOException ie) { - // Don't log an error for shutdown. - LOG.debug("Error closing server socket", ie); - } } } /** * Halts this listener thread. */ - void halt(){ - try{ - LOG.debug("Trying to close listener: " + ss); - if(ss != null) { - LOG.debug("Closing listener: " - + QuorumCnxManager.this.mySid); - ss.close(); + void halt() { + LOG.debug("Trying to close listeners"); + if (listenerHandlers != null) { + LOG.debug("Closing listener: {}", QuorumCnxManager.this.mySid); + for (ListenerHandler handler : listenerHandlers) { + try { + handler.close(); + } catch (IOException e) { + LOG.warn("Exception when shutting down listener: ", e); + } } - } catch (IOException e){ - LOG.warn("Exception when shutting down listener: " + e); + } + } + + class ListenerHandler implements Runnable, Closeable { + private final static int ATTEMPTS_AMOUNT = 3; + + private ServerSocket serverSocket; + private InetSocketAddress address; + private boolean portUnification; + private boolean sslQuorum; + private CountDownLatch latch; + + ListenerHandler(InetSocketAddress address, boolean portUnification, boolean sslQuorum, + CountDownLatch latch) { + this.address = address; + this.portUnification = portUnification; + this.sslQuorum = sslQuorum; + this.latch = latch; + } + + /** + * Sleeps on acceptConnections(). + */ + @Override + public void run() { + try { + Thread.currentThread().setName("ListenerHandler-" + address); + acceptConnections(); + try { + close(); + } catch (IOException e) { + LOG.warn("Exception when shutting down listener: ", e); + } + } catch (Exception e) { + // Output of unexpected exception, should never happen + LOG.error("Unexpected error ", e); + } finally { + latch.countDown(); + } + } + + @Override + public synchronized void close() throws IOException { + if (serverSocket != null && !serverSocket.isClosed()) { + LOG.debug("Trying to close listeners: {}", serverSocket); + serverSocket.close(); + } + } + + /** + * Sleeps on accept(). + */ + private void acceptConnections() { + int numRetries = 0; + Socket client = null; + + while ((!shutdown) && (numRetries < ATTEMPTS_AMOUNT)) { + try { + serverSocket = createNewServerSocket(); + LOG.info("My election bind port: {}", address.toString()); + while (!shutdown) { + try { + client = serverSocket.accept(); + setSockOpts(client); + LOG.info("Received connection request {}", client.getRemoteSocketAddress()); + // Receive and handle the connection request + // asynchronously if the quorum sasl authentication is + // enabled. This is required because sasl server + // authentication process may take few seconds to finish, + // this may delay next peer connection requests. + if (quorumSaslAuthEnabled) { + receiveConnectionAsync(client); + } else { + receiveConnection(client); + } + numRetries = 0; + } catch (SocketTimeoutException e) { + LOG.warn("The socket is listening for the election accepted " + + "and it timed out unexpectedly, but will retry." + + "see ZOOKEEPER-2836"); + } + } + } catch (IOException e) { + if (shutdown) { + break; + } + + LOG.error("Exception while listening", e); + + if (e instanceof BindException) + bindException.set(true); + + numRetries++; + try { + close(); + Thread.sleep(1000); + } catch (IOException ie) { + LOG.error("Error closing server socket", ie); + } catch (InterruptedException ie) { + LOG.error("Interrupted while sleeping. Ignoring exception", ie); + } + closeSocket(client); + } + } + } + + private ServerSocket createNewServerSocket() throws IOException { + ServerSocket socket; + + if (portUnification) { + LOG.info("Creating TLS-enabled quorum server socket"); + socket = new UnifiedServerSocket(self.getX509Util(), true); + } else if (sslQuorum) { + LOG.info("Creating TLS-only quorum server socket"); + socket = new UnifiedServerSocket(self.getX509Util(), false); + } else { + socket = new ServerSocket(); + } + + socket.setReuseAddress(true); + socket.bind(address); + + return socket; } } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java index 5a79a6230ed..dd933f699ac 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java @@ -29,22 +29,24 @@ import java.net.DatagramSocket; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Properties; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.concurrent.atomic.AtomicReference; import javax.security.sasl.SaslException; - import org.apache.zookeeper.KeeperException.BadArgumentsException; import org.apache.zookeeper.common.AtomicFileWritingIdiom; import org.apache.zookeeper.common.AtomicFileWritingIdiom.WriterStatement; @@ -133,11 +135,11 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider private ZKDatabase zkDb; public static final class AddressTuple { - public final InetSocketAddress quorumAddr; - public final InetSocketAddress electionAddr; + public final MultipleAddresses quorumAddr; + public final MultipleAddresses electionAddr; public final InetSocketAddress clientAddr; - public AddressTuple(InetSocketAddress quorumAddr, InetSocketAddress electionAddr, InetSocketAddress clientAddr) { + public AddressTuple(MultipleAddresses quorumAddr, MultipleAddresses electionAddr, InetSocketAddress clientAddr) { this.quorumAddr = quorumAddr; this.electionAddr = electionAddr; this.clientAddr = clientAddr; @@ -155,9 +157,9 @@ public void setObserverMasterPort(int observerMasterPort) { } public static class QuorumServer { - public InetSocketAddress addr = null; + public MultipleAddresses addr = new MultipleAddresses(); - public InetSocketAddress electionAddr = null; + public MultipleAddresses electionAddr = new MultipleAddresses(); public InetSocketAddress clientAddr = null; @@ -197,37 +199,33 @@ public long getId() { * unmodified. */ public void recreateSocketAddresses() { - if (this.addr == null) { + if (this.addr.isEmpty()) { LOG.warn("Server address has not been initialized"); return; } - if (this.electionAddr == null) { + if (this.electionAddr.isEmpty()) { LOG.warn("Election address has not been initialized"); return; } - String host = this.addr.getHostString(); - InetAddress address = null; - try { - address = InetAddress.getByName(host); - } catch (UnknownHostException ex) { - LOG.warn("Failed to resolve address: {}", host, ex); - return; - } - LOG.debug("Resolved address for {}: {}", host, address); - int port = this.addr.getPort(); - this.addr = new InetSocketAddress(address, port); - port = this.electionAddr.getPort(); - this.electionAddr = new InetSocketAddress(address, port); - } - - private void setType(String s) throws ConfigException { - if (s.toLowerCase().equals("observer")) { - type = LearnerType.OBSERVER; - } else if (s.toLowerCase().equals("participant")) { - type = LearnerType.PARTICIPANT; - } else { - throw new ConfigException("Unrecognised peertype: " + s); + this.addr.recreateSocketAddresses(); + this.electionAddr.recreateSocketAddresses(); + } + + private LearnerType getType(String s) throws ConfigException { + LearnerType tempType; + + switch (s.toLowerCase()) { + case "observer": + tempType = LearnerType.OBSERVER; + break; + case "participant": + tempType = LearnerType.PARTICIPANT; + break; + default: + throw new ConfigException("Unrecognised peertype: " + s); } + + return tempType; } private static final String wrongFormat = " does not have the form server_config or server_config;client_config"+ @@ -236,12 +234,9 @@ private void setType(String s) throws ConfigException { public QuorumServer(long sid, String addressStr) throws ConfigException { // LOG.warn("sid = " + sid + " addressStr = " + addressStr); this.id = sid; + LearnerType newType = null; String serverClientParts[] = addressStr.split(";"); - String serverParts[] = ConfigUtils.getHostAndPort(serverClientParts[0]); - if ((serverClientParts.length > 2) || (serverParts.length < 3) - || (serverParts.length > 4)) { - throw new ConfigException(addressStr + wrongFormat); - } + String serverAddresses[] = serverClientParts[0].split(","); if (serverClientParts.length == 2) { //LOG.warn("ClientParts: " + serverClientParts[1]); @@ -261,31 +256,47 @@ public QuorumServer(long sid, String addressStr) throws ConfigException { } } - // server_config should be either host:port:port or host:port:port:type - try { - addr = new InetSocketAddress(serverParts[0], - Integer.parseInt(serverParts[1])); - } catch (NumberFormatException e) { - throw new ConfigException("Address unresolved: " + serverParts[0] + ":" + serverParts[1]); - } - try { - electionAddr = new InetSocketAddress(serverParts[0], - Integer.parseInt(serverParts[2])); - } catch (NumberFormatException e) { - throw new ConfigException("Address unresolved: " + serverParts[0] + ":" + serverParts[2]); - } + for(String serverAddress : serverAddresses) { + String serverParts[] = ConfigUtils.getHostAndPort(serverAddress); + if ((serverClientParts.length > 2) || (serverParts.length < 3) + || (serverParts.length > 4)) { + throw new ConfigException(addressStr + wrongFormat); + } + + // server_config should be either host:port:port or host:port:port:type + InetSocketAddress tempAddress; + InetSocketAddress tempElectionAddress; + try { + tempAddress = new InetSocketAddress(serverParts[0], Integer.parseInt(serverParts[1])); + addr.addAddress(tempAddress); + } catch (NumberFormatException e) { + throw new ConfigException("Address unresolved: " + serverParts[0] + ":" + serverParts[1]); + } + try { + tempElectionAddress = new InetSocketAddress(serverParts[0], Integer.parseInt(serverParts[2])); + electionAddr.addAddress(tempElectionAddress); + } catch (NumberFormatException e) { + throw new ConfigException("Address unresolved: " + serverParts[0] + ":" + serverParts[2]); + } - if(addr.getPort() == electionAddr.getPort()) { + if(tempAddress.getPort() == tempElectionAddress.getPort()) { throw new ConfigException( "Client and election port must be different! Please update the configuration file on server." + sid); - } + } - if (serverParts.length == 4) { - setType(serverParts[3]); - } + if (serverParts.length == 4) { + LearnerType tempType = getType(serverParts[3]); + if (newType == null) + newType = tempType; - this.hostname = serverParts[0]; - + if (newType != tempType) + throw new ConfigException("Multiple addresses should have similar roles: " + type + " vs " + tempType); + } + + this.hostname = serverParts[0]; + } + if(newType != null) + type = newType; setMyAddrs(); } @@ -297,8 +308,10 @@ public QuorumServer(long id, InetSocketAddress addr, public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, InetSocketAddress clientAddr, LearnerType type) { this.id = id; - this.addr = addr; - this.electionAddr = electionAddr; + if(addr != null) + this.addr.addAddress(addr); + if(electionAddr != null) + this.electionAddr.addAddress(electionAddr); this.type = type; this.clientAddr = clientAddr; @@ -306,10 +319,10 @@ public QuorumServer(long id, InetSocketAddress addr, } private void setMyAddrs() { - this.myAddrs = new ArrayList(); - this.myAddrs.add(this.addr); + this.myAddrs = new ArrayList<>(); + this.myAddrs.addAll(this.addr.getAllAddresses()); this.myAddrs.add(this.clientAddr); - this.myAddrs.add(this.electionAddr); + this.myAddrs.addAll(this.electionAddr.getAllAddresses()); this.myAddrs = excludedSpecialAddresses(this.myAddrs); } @@ -325,25 +338,29 @@ private static String delimitedHostString(InetSocketAddress addr) public String toString(){ StringWriter sw = new StringWriter(); - //addr should never be null, but just to make sure - if (addr !=null) { - sw.append(delimitedHostString(addr)); - sw.append(":"); - sw.append(String.valueOf(addr.getPort())); + + List addrList = new LinkedList<>(addr.getAllAddresses()); + List electionAddrList = new LinkedList<>(electionAddr.getAllAddresses()); + + if(addrList.size() > 0 && electionAddrList.size() > 0) { + addrList.sort(Comparator.comparing(InetSocketAddress::getHostString)); + electionAddrList.sort(Comparator.comparing(InetSocketAddress::getHostString)); + sw.append(IntStream.range(0, addrList.size()).mapToObj(i -> String.format("%s:%d:%d", + delimitedHostString(addrList.get(i)), addrList.get(i).getPort(), electionAddrList.get(i).getPort())) + .collect(Collectors.joining(","))); } - if (electionAddr!=null){ - sw.append(":"); - sw.append(String.valueOf(electionAddr.getPort())); - } + if (type == LearnerType.OBSERVER) sw.append(":observer"); - else if (type == LearnerType.PARTICIPANT) sw.append(":participant"); + else if (type == LearnerType.PARTICIPANT) sw.append(":participant"); + if (clientAddr!=null && !isClientAddrFromStatic){ sw.append(";"); sw.append(delimitedHostString(clientAddr)); sw.append(":"); sw.append(String.valueOf(clientAddr.getPort())); } - return sw.toString(); + + return sw.toString(); } public int hashCode() { @@ -361,18 +378,16 @@ private boolean checkAddressesEqual(InetSocketAddress addr1, InetSocketAddress a public boolean equals(Object o){ if (!(o instanceof QuorumServer)) return false; QuorumServer qs = (QuorumServer)o; - if ((qs.id != id) || (qs.type != type)) return false; - if (!checkAddressesEqual(addr, qs.addr)) return false; - if (!checkAddressesEqual(electionAddr, qs.electionAddr)) return false; - if (!checkAddressesEqual(clientAddr, qs.clientAddr)) return false; - return true; + if ((qs.id != id) || (qs.type != type)) return false; + if (!addr.equals(qs.addr)) return false; + if (!electionAddr.equals(qs.electionAddr)) return false; + return checkAddressesEqual(clientAddr, qs.clientAddr); } public void checkAddressDuplicate(QuorumServer s) throws BadArgumentsException { - List otherAddrs = new ArrayList(); - otherAddrs.add(s.addr); + List otherAddrs = new ArrayList<>(s.addr.getAllAddresses()); otherAddrs.add(s.clientAddr); - otherAddrs.add(s.electionAddr); + otherAddrs.addAll(s.electionAddr.getAllAddresses()); otherAddrs = excludedSpecialAddresses(otherAddrs); for (InetSocketAddress my: this.myAddrs) { @@ -799,11 +814,11 @@ private AddressTuple getAddrs(){ } } - public InetSocketAddress getQuorumAddress(){ + public MultipleAddresses getQuorumAddress(){ return getAddrs().quorumAddr; } - public InetSocketAddress getElectionAddress(){ + public MultipleAddresses getElectionAddress(){ return getAddrs().electionAddr; } @@ -812,7 +827,7 @@ public InetSocketAddress getClientAddress(){ return (addrs == null) ? null : addrs.clientAddr; } - private void setAddrs(InetSocketAddress quorumAddr, InetSocketAddress electionAddr, InetSocketAddress clientAddr){ + private void setAddrs(MultipleAddresses quorumAddr, MultipleAddresses electionAddr, InetSocketAddress clientAddr){ synchronized (QV_LOCK) { myAddrs.set(new AddressTuple(quorumAddr, electionAddr, clientAddr)); QV_LOCK.notifyAll(); @@ -1571,7 +1586,7 @@ public QuorumVerifier getLastSeenQuorumVerifier(){ return lastSeenQuorumVerifier; } } - + public synchronized void restartLeaderElection(QuorumVerifier qvOLD, QuorumVerifier qvNEW){ if (qvOLD == null || !qvOLD.equals(qvNEW)) { LOG.warn("Restarting Leader Election"); @@ -2039,7 +2054,8 @@ private void updateObserverMasterList() { observerMasters.clear(); StringBuilder sb = new StringBuilder(); for (QuorumServer server : quorumVerifier.getVotingMembers().values()) { - InetSocketAddress addr = new InetSocketAddress(server.addr.getAddress(), observerMasterPort); + InetAddress address = server.addr.getReachableOrOne().getAddress(); + InetSocketAddress addr = new InetSocketAddress(address, observerMasterPort); observerMasters.add(new QuorumServer(server.id, addr)); sb.append(addr).append(","); } @@ -2077,9 +2093,11 @@ QuorumServer validateLearnerMaster(String desiredMaster) { } for (QuorumServer server : observerMasters) { if (sid == null) { - String serverAddr = server.addr.getAddress().getHostAddress() + ':' + server.addr.getPort(); - if (serverAddr.startsWith(desiredMaster)) { - return server; + for(InetSocketAddress address : server.addr.getAllAddresses()) { + String serverAddr = address.getAddress().getHostAddress() + ':' + address.getPort(); + if (serverAddr.startsWith(desiredMaster)) { + return server; + } } } else { if (sid.equals(server.id)) { @@ -2144,38 +2162,38 @@ private boolean updateVote(long designatedLeader, long zxid){ * Updates leader election info to avoid inconsistencies when * a new server tries to join the ensemble. * - * Here is the inconsistency scenario we try to solve by updating the peer + * Here is the inconsistency scenario we try to solve by updating the peer * epoch after following leader: * * Let's say we have an ensemble with 3 servers z1, z2 and z3. * - * 1. z1, z2 were following z3 with peerEpoch to be 0xb8, the new epoch is + * 1. z1, z2 were following z3 with peerEpoch to be 0xb8, the new epoch is * 0xb9, aka current accepted epoch on disk. * 2. z2 get restarted, which will use 0xb9 as it's peer epoch when loading * the current accept epoch from disk. - * 3. z2 received notification from z1 and z3, which is following z3 with + * 3. z2 received notification from z1 and z3, which is following z3 with * epoch 0xb8, so it started following z3 again with peer epoch 0xb8. - * 4. before z2 successfully connected to z3, z3 get restarted with new + * 4. before z2 successfully connected to z3, z3 get restarted with new * epoch 0xb9. - * 5. z2 will retry around a few round (default 5s) before giving up, + * 5. z2 will retry around a few round (default 5s) before giving up, * meanwhile it will report z3 as leader. * 6. z1 restarted, and looking with peer epoch 0xb9. * 7. z1 voted z3, and z3 was elected as leader again with peer epoch 0xb9. - * 8. z2 successfully connected to z3 before giving up, but with peer + * 8. z2 successfully connected to z3 before giving up, but with peer * epoch 0xb8. - * 9. z1 get restarted, looking for leader with peer epoch 0xba, but cannot - * join, because z2 is reporting peer epoch 0xb8, while z3 is reporting + * 9. z1 get restarted, looking for leader with peer epoch 0xba, but cannot + * join, because z2 is reporting peer epoch 0xb8, while z3 is reporting * 0xb9. * - * By updating the election vote after actually following leader, we can + * By updating the election vote after actually following leader, we can * avoid this kind of stuck happened. * - * Btw, the zxid and electionEpoch could be inconsistent because of the same - * reason, it's better to update these as well after syncing with leader, but - * that required protocol change which is non trivial. This problem is worked - * around by skipping comparing the zxid and electionEpoch when counting for + * Btw, the zxid and electionEpoch could be inconsistent because of the same + * reason, it's better to update these as well after syncing with leader, but + * that required protocol change which is non trivial. This problem is worked + * around by skipping comparing the zxid and electionEpoch when counting for * votes for out of election servers during looking for leader. - * + * * {@see https://issues.apache.org/jira/browse/ZOOKEEPER-1732} */ protected void updateElectionVote(long newEpoch) { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java index 566c945ba03..952c3b38699 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java @@ -20,6 +20,8 @@ import java.io.IOException; import java.io.PrintWriter; import java.nio.ByteBuffer; +import java.util.Objects; +import java.util.stream.Collectors; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -173,9 +175,11 @@ public void dumpConf(PrintWriter pwriter) { pwriter.print("electionAlg="); pwriter.println(self.getElectionType()); pwriter.print("electionPort="); - pwriter.println(self.getElectionAddress().getPort()); + pwriter.println(self.getElectionAddress().getAllPorts() + .stream().map(Objects::toString).collect(Collectors.joining(","))); pwriter.print("quorumPort="); - pwriter.println(self.getQuorumAddress().getPort()); + pwriter.println(self.getQuorumAddress().getAllPorts() + .stream().map(Objects::toString).collect(Collectors.joining(","))); pwriter.print("peerType="); pwriter.println(self.getLearnerType().ordinal()); pwriter.println("membership: "); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java index 5c086b7cfec..4083ff2371f 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java @@ -19,6 +19,8 @@ package org.apache.zookeeper.server.quorum; import java.io.PrintWriter; +import java.util.Objects; +import java.util.stream.Collectors; import org.apache.zookeeper.jmx.MBeanRegistry; import org.apache.zookeeper.server.DataTreeBean; @@ -166,9 +168,11 @@ public void dumpConf(PrintWriter pwriter) { pwriter.print("electionAlg="); pwriter.println(self.getElectionType()); pwriter.print("electionPort="); - pwriter.println(self.getElectionAddress().getPort()); + pwriter.println(self.getElectionAddress().getAllPorts() + .stream().map(Objects::toString).collect(Collectors.joining(","))); pwriter.print("quorumPort="); - pwriter.println(self.getQuorumAddress().getPort()); + pwriter.println(self.getQuorumAddress().getAllPorts() + .stream().map(Objects::toString).collect(Collectors.joining(","))); pwriter.print("peerType="); pwriter.println(self.getLearnerType().ordinal()); } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/RemotePeerBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/RemotePeerBean.java index 285f11a8593..c6e0185a093 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/RemotePeerBean.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/RemotePeerBean.java @@ -20,6 +20,8 @@ import org.apache.zookeeper.jmx.ZKMBeanInfo; +import java.util.stream.Collectors; + /** * A remote peer bean only provides limited information about the remote peer, * and the peer cannot be managed remotely. @@ -45,11 +47,15 @@ public boolean isHidden() { } public String getQuorumAddress() { - return peer.addr.getHostString()+":"+peer.addr.getPort(); + return peer.addr.getAllAddresses().stream() + .map(address -> String.format("%s:%d", address.getHostString(), address.getPort())) + .collect(Collectors.joining(",")); } public String getElectionAddress() { - return peer.electionAddr.getHostString() + ":" + peer.electionAddr.getPort(); + return peer.electionAddr.getAllAddresses().stream() + .map(address -> String.format("%s:%d", address.getHostString(), address.getPort())) + .collect(Collectors.joining(",")); } public String getClientAddress() { diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerTest.java index 85295f84114..c0a675f0ce5 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerTest.java @@ -98,7 +98,7 @@ protected void sockConnect(Socket sock, InetSocketAddress addr, int timeout) } } - @Test(expected=IOException.class) + @Test(expected = IOException.class) public void connectionRetryTimeoutTest() throws Exception { Learner learner = new TimeoutLearner(); learner.self = new QuorumPeer(); @@ -110,7 +110,7 @@ public void connectionRetryTimeoutTest() throws Exception { InetSocketAddress addr = new InetSocketAddress(1111); // we expect this to throw an IOException since we're faking socket connect errors every time - learner.connectToLeader(addr, ""); + learner.connectToLeader(new MultipleAddresses(addr), ""); } @Test public void connectionInitLimitTimeoutTest() throws Exception { @@ -130,7 +130,7 @@ public void connectionInitLimitTimeoutTest() throws Exception { // we expect this to throw an IOException since we're faking socket connect errors every time try { - learner.connectToLeader(addr, ""); + learner.connectToLeader(new MultipleAddresses(addr), ""); Assert.fail("should have thrown IOException!"); } catch (IOException e) { //good, wanted to see that, let's make sure we ran out of time diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/MultipleAddressesTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/MultipleAddressesTest.java new file mode 100644 index 00000000000..b1ea4d0bf0e --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/MultipleAddressesTest.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

    + * http://www.apache.org/licenses/LICENSE-2.0 + *

    + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.zookeeper.PortAssignment; +import org.junit.Assert; +import org.junit.Test; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.NoRouteToHostException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class MultipleAddressesTest { + + public final static int PORTS_AMOUNT = 10; + + @Test + public void testIsEmpty() { + MultipleAddresses multipleAddresses = new MultipleAddresses(); + Assert.assertTrue(multipleAddresses.isEmpty()); + + multipleAddresses.addAddress(new InetSocketAddress(22)); + Assert.assertFalse(multipleAddresses.isEmpty()); + } + + @Test + public void testGetAllAddresses() { + List addresses = getAddressList(); + MultipleAddresses multipleAddresses = new MultipleAddresses(addresses); + + Assert.assertTrue(CollectionUtils.isEqualCollection(addresses, multipleAddresses.getAllAddresses())); + + multipleAddresses.addAddress(addresses.get(1)); + Assert.assertTrue(CollectionUtils.isEqualCollection(addresses, multipleAddresses.getAllAddresses())); + } + + @Test + public void testGetAllPorts() { + List ports = getPortList(); + MultipleAddresses multipleAddresses = new MultipleAddresses(getAddressList(ports)); + + Assert.assertTrue(CollectionUtils.isEqualCollection(ports, multipleAddresses.getAllPorts())); + + multipleAddresses.addAddress(new InetSocketAddress("localhost", ports.get(ports.size() - 1))); + Assert.assertTrue(CollectionUtils.isEqualCollection(ports, multipleAddresses.getAllPorts())); + } + + @Test + public void testGetWildcardAddresses() { + List ports = getPortList(); + List addresses = getAddressList(ports); + MultipleAddresses multipleAddresses = new MultipleAddresses(addresses); + List allAddresses = ports.stream().map(InetSocketAddress::new).collect(Collectors.toList()); + + Assert.assertTrue(CollectionUtils.isEqualCollection(allAddresses, multipleAddresses.getWildcardAddresses())); + + multipleAddresses.addAddress(new InetSocketAddress("localhost", ports.get(ports.size() - 1))); + Assert.assertTrue(CollectionUtils.isEqualCollection(allAddresses, multipleAddresses.getWildcardAddresses())); + } + + @Test + public void testGetValidAddress() throws NoRouteToHostException { + List addresses = getAddressList(); + MultipleAddresses multipleAddresses = new MultipleAddresses(addresses); + + Assert.assertTrue(addresses.contains(multipleAddresses.getReachableAddress())); + } + + @Test(expected = NoRouteToHostException.class) + public void testGetValidAddressWithNotValid() throws NoRouteToHostException { + MultipleAddresses multipleAddresses = new MultipleAddresses(new InetSocketAddress("10.0.0.1", 22)); + multipleAddresses.getReachableAddress(); + } + + @Test + public void testRecreateSocketAddresses() throws UnknownHostException { + List searchedAddresses = Arrays.stream(InetAddress.getAllByName("google.com")) + .map(addr -> new InetSocketAddress(addr, 222)).collect(Collectors.toList()); + + MultipleAddresses multipleAddresses = new MultipleAddresses(searchedAddresses.get(searchedAddresses.size() - 1)); + List addresses = new ArrayList<>(multipleAddresses.getAllAddresses()); + + Assert.assertEquals(1, addresses.size()); + Assert.assertEquals(searchedAddresses.get(searchedAddresses.size() - 1), addresses.get(0)); + + multipleAddresses.recreateSocketAddresses(); + + addresses = new ArrayList<>(multipleAddresses.getAllAddresses()); + Assert.assertEquals(1, addresses.size()); + Assert.assertEquals(searchedAddresses.get(0), addresses.get(0)); + } + + @Test + public void testRecreateSocketAddressesWithWrongAddresses() { + InetSocketAddress address = new InetSocketAddress("locahost", 222); + MultipleAddresses multipleAddresses = new MultipleAddresses(address); + multipleAddresses.recreateSocketAddresses(); + + Assert.assertEquals(address, multipleAddresses.getOne()); + } + + @Test + public void testEquals() { + List addresses = getAddressList(); + + MultipleAddresses multipleAddresses = new MultipleAddresses(addresses); + MultipleAddresses multipleAddressesEquals = new MultipleAddresses(addresses); + + Assert.assertEquals(multipleAddresses, multipleAddressesEquals); + + MultipleAddresses multipleAddressesNotEquals = new MultipleAddresses(getAddressList()); + + Assert.assertNotEquals(multipleAddresses, multipleAddressesNotEquals); + } + + public List getPortList() { + return IntStream.range(0, PORTS_AMOUNT).mapToObj(i -> PortAssignment.unique()).collect(Collectors.toList()); + } + + public List getAddressList() { + return getAddressList(getPortList()); + } + + public List getAddressList(List ports) { + return IntStream.range(0, ports.size()) + .mapToObj(i -> new InetSocketAddress("127.0.0." + i, ports.get(i))).collect(Collectors.toList()); + } + +} diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java index 947e7a505b7..b6fecd208e9 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java @@ -384,7 +384,6 @@ public void testElectionFraud() throws IOException, InterruptedException { Assert.assertTrue("All servers should join the quorum", servers.mt[falseLeader].main.quorumPeer.follower != null); // to keep the quorum peer running and force it to go into the looking state, we kill leader election - // and close the connection to the leader servers.mt[falseLeader].main.quorumPeer.electionAlg.shutdown(); servers.mt[falseLeader].main.quorumPeer.follower.getSocket().close(); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigFailureCasesTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigFailureCasesTest.java index a0b9760ebb6..de0322abc38 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigFailureCasesTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigFailureCasesTest.java @@ -79,8 +79,8 @@ public void testIncrementalReconfigInvokedOnHiearchicalQS() throws Exception { for (int i = 1; i <= 5; i++) { members.add("server." + i + "=127.0.0.1:" - + qu.getPeer(i).peer.getQuorumAddress().getPort() + ":" - + qu.getPeer(i).peer.getElectionAddress().getPort() + ";" + + qu.getPeer(i).peer.getQuorumAddress().getAllPorts().get(0)+ ":" + + qu.getPeer(i).peer.getElectionAddress().getAllPorts().get(0) + ";" + "127.0.0.1:" + qu.getPeer(i).peer.getClientPort()); } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/CnxManagerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/CnxManagerTest.java index 3cee7ba4be2..396a8df19e4 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/CnxManagerTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/CnxManagerTest.java @@ -30,7 +30,7 @@ import java.util.Date; import java.util.HashMap; import java.util.Map; -import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.net.Socket; @@ -186,14 +186,13 @@ public void testCnxManager() throws Exception { @Test public void testCnxManagerTimeout() throws Exception { - Random rand = new Random(); - byte b = (byte) rand.nextInt(); + int address = ThreadLocalRandom.current().nextInt(1, 255); int deadPort = PortAssignment.unique(); - String deadAddress = "10.1.1." + b; + String deadAddress = "10.1.1." + address; LOG.info("This is the dead address I'm trying: " + deadAddress); - peers.put(Long.valueOf(2), + peers.put(2L, new QuorumServer(2, new InetSocketAddress(deadAddress, deadPort), new InetSocketAddress(deadAddress, PortAssignment.unique()), @@ -213,7 +212,7 @@ public void testCnxManagerTimeout() throws Exception { cnxManager.toSend(2L, createMsg(ServerState.LOOKING.ordinal(), 1, -1, 1)); long end = Time.currentElapsedTime(); - if((end - begin) > 6000) Assert.fail("Waited more than necessary"); + if((end - begin) > 10000) Assert.fail("Waited more than necessary"); cnxManager.halt(); Assert.assertFalse(cnxManager.listener.isAlive()); } @@ -236,19 +235,19 @@ public void testCnxManagerSpinLock() throws Exception { } else { LOG.error("Null listener when initializing cnx manager"); } - - int port = peers.get(peer.getId()).electionAddr.getPort(); - LOG.info("Election port: " + port); + InetSocketAddress address = peers.get(peer.getId()).electionAddr.getReachableOrOne(); + LOG.info("Election port: " + address.getPort()); Thread.sleep(1000); SocketChannel sc = SocketChannel.open(); - sc.socket().connect(peers.get(1L).electionAddr, 5000); + sc.socket().connect(address, 5000); + + InetSocketAddress otherAddr = peers.get(2L).electionAddr.getReachableOrOne(); - InetSocketAddress otherAddr = peers.get(new Long(2)).electionAddr; DataOutputStream dout = new DataOutputStream(sc.socket().getOutputStream()); dout.writeLong(QuorumCnxManager.PROTOCOL_VERSION); - dout.writeLong(new Long(2)); + dout.writeLong(2L); String addr = otherAddr.getHostString()+ ":" + otherAddr.getPort(); byte[] addr_bytes = addr.getBytes(); dout.writeInt(addr_bytes.length); @@ -301,13 +300,14 @@ public void testCnxManagerNPE() throws Exception { } else { LOG.error("Null listener when initializing cnx manager"); } - int port = peers.get(peer.getId()).electionAddr.getPort(); - LOG.info("Election port: " + port); + + InetSocketAddress address = peers.get(peer.getId()).electionAddr.getReachableOrOne(); + LOG.info("Election port: " + address.getPort()); Thread.sleep(1000); SocketChannel sc = SocketChannel.open(); - sc.socket().connect(peers.get(1L).electionAddr, 5000); + sc.socket().connect(address, 5000); /* * Write id (3.4.6 protocol). This previously caused a NPE in @@ -348,12 +348,13 @@ public void testSocketTimeout() throws Exception { } else { LOG.error("Null listener when initializing cnx manager"); } - int port = peers.get(peer.getId()).electionAddr.getPort(); - LOG.info("Election port: " + port); + + InetSocketAddress address = peers.get(peer.getId()).electionAddr.getReachableOrOne(); + LOG.info("Election port: " + address.getPort()); Thread.sleep(1000); Socket sock = new Socket(); - sock.connect(peers.get(1L).electionAddr, 5000); + sock.connect(address, 5000); long begin = Time.currentElapsedTime(); // Read without sending data. Verify timeout. cnxManager.receiveConnection(sock); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java index 314171d873a..5a8cea81bc6 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java @@ -148,7 +148,7 @@ public void startAll() throws IOException { LOG.info("Checking ports " + hostPort); for (String hp : hostPort.split(",")) { - Assert.assertTrue("waiting for server up", ClientBase.waitForServerUp(hp, + Assert.assertTrue("waiting for server " + hp + " up", ClientBase.waitForServerUp(hp, ClientBase.CONNECTION_TIMEOUT)); LOG.info(hp + " is accepting client connections"); } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigExceptionTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigExceptionTest.java index 5eda4b0f429..a564dcdfc91 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigExceptionTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigExceptionTest.java @@ -204,8 +204,8 @@ private boolean reconfigPort() throws KeeperException, InterruptedException { leaderId++; int followerId = leaderId == 1 ? 2 : 1; joiningServers.add("server." + followerId + "=localhost:" - + qu.getPeer(followerId).peer.getQuorumAddress().getPort() /*quorum port*/ - + ":" + qu.getPeer(followerId).peer.getElectionAddress().getPort() /*election port*/ + + qu.getPeer(followerId).peer.getQuorumAddress().getAllPorts().get(0) /*quorum port*/ + + ":" + qu.getPeer(followerId).peer.getElectionAddress().getAllPorts().get(0) /*election port*/ + ":participant;localhost:" + PortAssignment.unique()/* new client port */); zkAdmin.reconfigure(joiningServers, null, null, -1, new Stat()); return true; diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigMisconfigTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigMisconfigTest.java index 1694fcf2a8a..e0e963a8eaf 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigMisconfigTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigMisconfigTest.java @@ -119,8 +119,8 @@ private boolean reconfigPort() throws KeeperException, InterruptedException { leaderId++; int followerId = leaderId == 1 ? 2 : 1; joiningServers.add("server." + followerId + "=localhost:" - + qu.getPeer(followerId).peer.getQuorumAddress().getPort() /*quorum port*/ - + ":" + qu.getPeer(followerId).peer.getElectionAddress().getPort() /*election port*/ + + qu.getPeer(followerId).peer.getQuorumAddress().getAllPorts().get(0) /*quorum port*/ + + ":" + qu.getPeer(followerId).peer.getElectionAddress().getAllPorts().get(0) /*election port*/ + ":participant;localhost:" + PortAssignment.unique()/* new client port */); zkAdmin.reconfigure(joiningServers, null, null, -1, new Stat()); return true; diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java index 2015fe59a81..03c14e4ab09 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java @@ -143,12 +143,13 @@ public static String testServerHasConfig(ZooKeeper zk, String configStr = new String(config); if (joiningServers != null) { for (String joiner : joiningServers) { - Assert.assertTrue(configStr.contains(joiner)); + Assert.assertTrue("Config:<" + configStr + ">\n" + joiner, configStr.contains(joiner)); } } if (leavingServers != null) { for (String leaving : leavingServers) - Assert.assertFalse(configStr.contains("server.".concat(leaving))); + Assert.assertFalse("Config:<" + configStr + ">\n" + leaving, + configStr.contains("server.".concat(leaving))); } return configStr; @@ -293,11 +294,10 @@ public void testRemoveAddOne() throws Exception { joiningServers.add("server." + leavingIndex + "=localhost:" - + qu.getPeer(leavingIndex).peer.getQuorumAddress() - .getPort() + + qu.getPeer(leavingIndex).peer.getQuorumAddress().getAllPorts().get(0) + ":" - + qu.getPeer(leavingIndex).peer.getElectionAddress() - .getPort() + ":participant;localhost:" + + qu.getPeer(leavingIndex).peer.getElectionAddress().getAllPorts().get(0) + + ":participant;localhost:" + qu.getPeer(leavingIndex).peer.getClientPort()); String configStr = reconfig(zkAdmin1, null, leavingServers, null, -1); @@ -370,17 +370,17 @@ public void testRemoveAddTwo() throws Exception { // remember these servers so we can add them back later joiningServers.add("server." + leavingIndex1 + "=localhost:" - + qu.getPeer(leavingIndex1).peer.getQuorumAddress().getPort() + + qu.getPeer(leavingIndex1).peer.getQuorumAddress().getAllPorts().get(0) + ":" - + qu.getPeer(leavingIndex1).peer.getElectionAddress().getPort() + + qu.getPeer(leavingIndex1).peer.getElectionAddress().getAllPorts().get(0) + ":participant;localhost:" + qu.getPeer(leavingIndex1).peer.getClientPort()); // this server will be added back as an observer joiningServers.add("server." + leavingIndex2 + "=localhost:" - + qu.getPeer(leavingIndex2).peer.getQuorumAddress().getPort() + + qu.getPeer(leavingIndex2).peer.getQuorumAddress().getAllPorts().get(0) + ":" - + qu.getPeer(leavingIndex2).peer.getElectionAddress().getPort() + + qu.getPeer(leavingIndex2).peer.getElectionAddress().getAllPorts().get(0) + ":observer;localhost:" + qu.getPeer(leavingIndex2).peer.getClientPort()); @@ -548,11 +548,10 @@ public void testRoleChange() throws Exception { joiningServers.add("server." + changingIndex + "=localhost:" - + qu.getPeer(changingIndex).peer.getQuorumAddress() - .getPort() + + qu.getPeer(changingIndex).peer.getQuorumAddress().getAllPorts().get(0) + ":" - + qu.getPeer(changingIndex).peer.getElectionAddress() - .getPort() + ":" + newRole + ";localhost:" + + qu.getPeer(changingIndex).peer.getElectionAddress().getAllPorts().get(0) + + ":" + newRole + ";localhost:" + qu.getPeer(changingIndex).peer.getClientPort()); reconfig(zkAdmin1, joiningServers, null, null, -1); @@ -599,8 +598,8 @@ public void testPortChange() throws Exception { // modify follower's client port - int quorumPort = qu.getPeer(followerIndex).peer.getQuorumAddress().getPort(); - int electionPort = qu.getPeer(followerIndex).peer.getElectionAddress().getPort(); + int quorumPort = qu.getPeer(followerIndex).peer.getQuorumAddress().getAllPorts().get(0); + int electionPort = qu.getPeer(followerIndex).peer.getElectionAddress().getAllPorts().get(0); int oldClientPort = qu.getPeer(followerIndex).peer.getClientPort(); int newClientPort = PortAssignment.unique(); joiningServers.add("server." + followerIndex + "=localhost:" + quorumPort @@ -668,7 +667,7 @@ ClientBase.CONNECTION_TIMEOUT, new Watcher() { joiningServers.add("server." + leaderIndex + "=localhost:" + newQuorumPort + ":" - + qu.getPeer(leaderIndex).peer.getElectionAddress().getPort() + + qu.getPeer(leaderIndex).peer.getElectionAddress().getAllPorts().get(0) + ":participant;localhost:" + qu.getPeer(leaderIndex).peer.getClientPort()); @@ -676,8 +675,7 @@ ClientBase.CONNECTION_TIMEOUT, new Watcher() { testNormalOperation(zkArr[followerIndex], zkArr[leaderIndex]); - Assert.assertTrue(qu.getPeer(leaderIndex).peer.getQuorumAddress() - .getPort() == newQuorumPort); + Assert.assertEquals((int) qu.getPeer(leaderIndex).peer.getQuorumAddress().getAllPorts().get(0), newQuorumPort); joiningServers.clear(); @@ -685,7 +683,7 @@ ClientBase.CONNECTION_TIMEOUT, new Watcher() { for (int i = 1; i <= 3; i++) { joiningServers.add("server." + i + "=localhost:" - + qu.getPeer(i).peer.getQuorumAddress().getPort() + ":" + + qu.getPeer(i).peer.getQuorumAddress().getAllPorts().get(0) + ":" + PortAssignment.unique() + ":participant;localhost:" + qu.getPeer(i).peer.getClientPort()); } @@ -731,8 +729,8 @@ private void testPortChangeToBlockedPort(boolean testLeader) throws Exception { int reconfigIndex = testLeader ? followerIndex : leaderIndex; // modify server's client port - int quorumPort = qu.getPeer(serverIndex).peer.getQuorumAddress().getPort(); - int electionPort = qu.getPeer(serverIndex).peer.getElectionAddress().getPort(); + int quorumPort = qu.getPeer(serverIndex).peer.getQuorumAddress().getAllPorts().get(0); + int electionPort = qu.getPeer(serverIndex).peer.getElectionAddress().getAllPorts().get(0); int oldClientPort = qu.getPeer(serverIndex).peer.getClientPort(); int newClientPort = PortAssignment.unique(); @@ -829,8 +827,8 @@ public void testQuorumSystemChange() throws Exception { for (int i = 1; i <= 5; i++) { members.add("server." + i + "=127.0.0.1:" - + qu.getPeer(i).peer.getQuorumAddress().getPort() + ":" - + qu.getPeer(i).peer.getElectionAddress().getPort() + ";" + + qu.getPeer(i).peer.getQuorumAddress().getAllPorts().get(0) + ":" + + qu.getPeer(i).peer.getElectionAddress().getAllPorts().get(0) + ";" + "127.0.0.1:" + qu.getPeer(i).peer.getClientPort()); } @@ -861,8 +859,8 @@ public void testQuorumSystemChange() throws Exception { members.clear(); for (int i = 1; i <= 3; i++) { members.add("server." + i + "=127.0.0.1:" - + qu.getPeer(i).peer.getQuorumAddress().getPort() + ":" - + qu.getPeer(i).peer.getElectionAddress().getPort() + ";" + + qu.getPeer(i).peer.getQuorumAddress().getAllPorts().get(0) + ":" + + qu.getPeer(i).peer.getElectionAddress().getAllPorts().get(0) + ";" + "127.0.0.1:" + qu.getPeer(i).peer.getClientPort()); } @@ -941,9 +939,9 @@ public void testJMXBeanAfterRemoveAddOne() throws Exception { // remember this server so we can add it back later joiningServers.add("server." + leavingIndex + "=127.0.0.1:" - + qu.getPeer(leavingIndex).peer.getQuorumAddress().getPort() + + qu.getPeer(leavingIndex).peer.getQuorumAddress().getAllPorts().get(0) + ":" - + qu.getPeer(leavingIndex).peer.getElectionAddress().getPort() + + qu.getPeer(leavingIndex).peer.getElectionAddress().getAllPorts().get(0) + ":participant;127.0.0.1:" + qu.getPeer(leavingIndex).peer.getClientPort()); @@ -1019,9 +1017,9 @@ public void testJMXBeanAfterRoleChange() throws Exception { // exactly as it is now, except for role change joiningServers.add("server." + changingIndex + "=127.0.0.1:" - + qu.getPeer(changingIndex).peer.getQuorumAddress().getPort() + + qu.getPeer(changingIndex).peer.getQuorumAddress().getAllPorts().get(0) + ":" - + qu.getPeer(changingIndex).peer.getElectionAddress().getPort() + + qu.getPeer(changingIndex).peer.getElectionAddress().getAllPorts().get(0) + ":" + newRole + ";127.0.0.1:" + qu.getPeer(changingIndex).peer.getClientPort()); @@ -1058,7 +1056,8 @@ private void assertLocalPeerMXBeanAttributes(QuorumPeer qp, qp.getClientAddress().getHostString() + ":" + qp.getClientAddress().getPort(), JMXEnv.ensureBeanAttribute(beanName, "ClientAddress")); Assert.assertEquals("Mismatches LearnerType!", - qp.getElectionAddress().getHostString() + ":" + qp.getElectionAddress().getPort(), + qp.getElectionAddress().getOne().getHostString() + ":" + + qp.getElectionAddress().getOne().getPort(), JMXEnv.ensureBeanAttribute(beanName, "ElectionAddress")); Assert.assertEquals("Mismatches PartOfEnsemble!", isPartOfEnsemble, JMXEnv.ensureBeanAttribute(beanName, "PartOfEnsemble")); @@ -1096,10 +1095,11 @@ private void assertRemotePeerMXBeanAttributes(QuorumServer qs, getNumericalAddrPort(qs.clientAddr.getHostString() + ":" + qs.clientAddr.getPort()), getAddrPortFromBean(beanName, "ClientAddress") ); Assert.assertEquals("Mismatches ElectionAddress!", - getNumericalAddrPort(qs.electionAddr.getHostString() + ":" + qs.electionAddr.getPort()), + getNumericalAddrPort(qs.electionAddr.getOne().getHostString() + ":" + + qs.electionAddr.getOne().getPort()), getAddrPortFromBean(beanName, "ElectionAddress") ); Assert.assertEquals("Mismatches QuorumAddress!", - getNumericalAddrPort(qs.addr.getHostString() + ":" + qs.addr.getPort()), + getNumericalAddrPort(qs.addr.getOne().getHostString() + ":" + qs.addr.getOne().getPort()), getAddrPortFromBean(beanName, "QuorumAddress") ); } }