diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperInternals.md b/zookeeper-docs/src/main/resources/markdown/zookeeperInternals.md index 171f33ddb4c..d27ba0c9cfc 100644 --- a/zookeeper-docs/src/main/resources/markdown/zookeeperInternals.md +++ b/zookeeper-docs/src/main/resources/markdown/zookeeperInternals.md @@ -275,7 +275,7 @@ The [consistency](https://jepsen.io/consistency) guarantees of ZooKeeper lie bet Write operations in ZooKeeper are *linearizable*. In other words, each `write` will appear to take effect atomically at some point between when the client issues the request and receives the corresponding response. This means that the writes performed by all the clients in ZooKeeper can be totally ordered in such a way that respects the real-time ordering of these writes. However, merely stating that write operations are linearizable is meaningless unless we also talk about read operations. -Read operations in ZooKeeper are *not linearizable* since they can return potentially stale data. This is because a `read` in ZooKeeper is not a quorum operation and a server will respond immediately to a client that is performing a `read`. ZooKeeper does this because it prioritizes performance over consistency for the read use case. However, reads in ZooKeeper are *sequentially consistent*, because `read` operations will appear to take effect in some sequential order that furthermore respects the order of each client's operations. A common pattern to work around this is to issue a `sync` before issuing a `read`. This too does **not** strictly guarantee up-to-date data because `sync` is [not currently a quorum operation](https://issues.apache.org/jira/browse/ZOOKEEPER-1675). To illustrate, consider a scenario where two servers simultaneously think they are the leader, something that could occur if the TCP connection timeout is smaller than `syncLimit * tickTime`. Note that this is [unlikely](https://www.amazon.com/ZooKeeper-Distributed-Coordination-Flavio-Junqueira/dp/1449361307) to occur in practice, but should be kept in mind nevertheless when discussing strict theoretical guarantees. Under this scenario, it is possible that the `sync` is served by the “leader” with stale data, thereby allowing the following `read` to be stale as well. The stronger guarantee of linearizability is provided if an actual quorum operation (e.g., a `write`) is performed before a `read`. +Read operations in ZooKeeper are *not linearizable* since they can return potentially stale data. This is because a `read` in ZooKeeper is not a quorum operation and a server will respond immediately to a client that is performing a `read`. ZooKeeper does this because it prioritizes performance over consistency for the read use case. However, reads in ZooKeeper are *sequentially consistent*, because `read` operations will appear to take effect in some sequential order that furthermore respects the order of each client's operations. A common pattern to work around this is to issue a `sync` before issuing a `read`. This does **not work prior 3.10.0** because `sync` is [not a quorum operation before](https://issues.apache.org/jira/browse/ZOOKEEPER-1675). To illustrate, consider a scenario where two servers simultaneously think they are the leader, something that could occur if the TCP connection timeout is smaller than `syncLimit * tickTime`. Note that this is [unlikely](https://www.amazon.com/ZooKeeper-Distributed-Coordination-Flavio-Junqueira/dp/1449361307) to occur in practice, but should be kept in mind nevertheless when discussing strict theoretical guarantees. Under this scenario, it is possible that the `sync` is served by the “leader” with stale data, thereby allowing the following `read` to be stale as well. Prior to 3.10.0, the stronger guarantee of linearizability is provided if an actual quorum operation (e.g., a `write`) is performed before a `read`. Overall, the consistency guarantees of ZooKeeper are formally captured by the notion of [ordered sequential consistency](http://webee.technion.ac.il/people/idish/ftp/OSC-IPL17.pdf) or `OSC(U)` to be exact, which lies between sequential consistency and linearizability. diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java index 3be43c37e30..1674845eee7 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java @@ -149,17 +149,9 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements RequestP */ private static volatile int maxCommitBatchSize; - /** - * This flag indicates whether we need to wait for a response to come back from the - * leader or we just let the sync operation flow through like a read. The flag will - * be false if the CommitProcessor is in a Leader pipeline. - */ - boolean matchSyncs; - - public CommitProcessor(RequestProcessor nextProcessor, String id, boolean matchSyncs, ZooKeeperServerListener listener) { + public CommitProcessor(RequestProcessor nextProcessor, String id, ZooKeeperServerListener listener) { super("CommitProcessor:" + id, listener); this.nextProcessor = nextProcessor; - this.matchSyncs = matchSyncs; } private boolean isProcessingRequest() { @@ -182,9 +174,8 @@ protected boolean needCommit(Request request) { case OpCode.multi: case OpCode.setACL: case OpCode.check: - return true; case OpCode.sync: - return matchSyncs; + return true; case OpCode.createSession: case OpCode.closeSession: return !request.isLocalSession(); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java index b6766199988..f21195b09d1 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java @@ -69,7 +69,7 @@ public Follower getFollower() { @Override protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); - commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener()); + commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), getZooKeeperServerListener()); commitProcessor.start(); firstProcessor = new FollowerRequestProcessor(this, commitProcessor); ((FollowerRequestProcessor) firstProcessor).start(); @@ -125,9 +125,8 @@ public synchronized void sync() { } Request r = pendingSyncs.remove(); - if (r instanceof LearnerSyncRequest) { - LearnerSyncRequest lsr = (LearnerSyncRequest) r; - lsr.fh.queuePacket(new QuorumPacket(Leader.SYNC, 0, null, null)); + if (r.getOwner() instanceof LearnerHandler) { + ((LearnerHandler) r.getOwner()).queuePacket(new QuorumPacket(Leader.SYNC, 0, null, null)); } commitProcessor.commit(r); } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java index d4281871135..882e69d072b 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java @@ -33,7 +33,6 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; @@ -50,6 +49,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import javax.security.sasl.SaslException; @@ -86,10 +86,17 @@ public class Leader extends LearnerMaster { LOG.info("TCP NoDelay set to: {}", nodelay); } + // This must be an invalid session id. + static final long PING_SESSION_END = 0; + + private static final int PING_PAYLOAD_TYPE_QUORUM_SYNC = 1; + public static class Proposal extends SyncedLearnerTracker { private QuorumPacket packet; protected Request request; + public long pingXid = -1; + public final List pendingSyncs = new ArrayList<>(); public Proposal() { } @@ -115,9 +122,18 @@ public long getZxid() { return packet.getZxid(); } + boolean isRead() { + return pingXid > 0; + } + + boolean isWrite() { + return !isRead(); + } + @Override public String toString() { - return packet.getType() + ", " + packet.getZxid() + ", " + request; + return packet.getType() + ", 0x" + Long.toHexString(packet.getZxid()) + ", " + request + + ", " + pingXid + ", " + pendingSyncs; } } @@ -194,6 +210,14 @@ public List getLearners() { // list of followers that are ready to follow (i.e synced with the leader) private final HashSet forwardingFollowers = new HashSet<>(); + volatile int followersProtocolVersion = ProtocolVersion.CURRENT; + + void recalculateFollowersVersion() { + followersProtocolVersion = forwardingFollowers.stream() + .mapToInt(LearnerHandler::getVersion) + .min() + .orElse(ProtocolVersion.CURRENT); + } /** * Returns a copy of the current forwarding follower snapshot @@ -219,6 +243,7 @@ public List getNonVotingFollowers() { void addForwardingFollower(LearnerHandler lh) { synchronized (forwardingFollowers) { forwardingFollowers.add(lh); + recalculateFollowersVersion(); /* * Any changes on forwardiongFollowers could possible affect the need of Oracle. * */ @@ -261,11 +286,11 @@ public void resetObserverConnectionStats() { } } - // Pending sync requests. Must access under 'this' lock. - private final Map> pendingSyncs = new HashMap<>(); + long quorumPingXid = 0; + private final AtomicInteger pendingSyncs = new AtomicInteger(); public synchronized int getNumPendingSyncs() { - return pendingSyncs.size(); + return pendingSyncs.get(); } //Follower counter @@ -293,6 +318,7 @@ public void addLearnerHandler(LearnerHandler learner) { public void removeLearnerHandler(LearnerHandler peer) { synchronized (forwardingFollowers) { forwardingFollowers.remove(peer); + recalculateFollowersVersion(); } synchronized (learners) { learners.remove(peer); @@ -698,10 +724,7 @@ void lead() throws IOException, InterruptedException { } } - newLeaderProposal.addQuorumVerifier(self.getQuorumVerifier()); - if (self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) { - newLeaderProposal.addQuorumVerifier(self.getLastSeenQuorumVerifier()); - } + setupQuorumTracker(newLeaderProposal); // We have to get at least a majority of servers in sync with // us. We do this by waiting for the NEWLEADER packet to get @@ -789,11 +812,7 @@ void lead() throws IOException, InterruptedException { // track synced learners to make sure we still have a // quorum of current (and potentially next pending) view. SyncedLearnerTracker syncedAckSet = new SyncedLearnerTracker(); - syncedAckSet.addQuorumVerifier(self.getQuorumVerifier()); - if (self.getLastSeenQuorumVerifier() != null - && self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) { - syncedAckSet.addQuorumVerifier(self.getLastSeenQuorumVerifier()); - } + setupQuorumTracker(syncedAckSet); syncedAckSet.addAck(self.getMyId()); @@ -968,6 +987,13 @@ && recreateSocketAddresses(newQVAcksetPair.getQuorumVerifier().getVotingMembers( * @return True if committed, otherwise false. **/ public synchronized boolean tryToCommit(Proposal p, long zxid, SocketAddress followerAddr) { + // in order to be committed, a proposal must be accepted by a quorum. + // + // getting a quorum from all necessary configurations. + if (!p.hasAllQuorums()) { + return false; + } + // make sure that ops are committed in order. With reconfigurations it is now possible // that different operations wait for different sets of acks, and we still want to enforce // that they are committed in order. Currently we only permit one outstanding reconfiguration @@ -975,15 +1001,16 @@ public synchronized boolean tryToCommit(Proposal p, long zxid, SocketAddress fol // pending all wait for a quorum of old and new config, so it's not possible to get enough acks // for an operation without getting enough acks for preceding ops. But in the future if multiple // concurrent reconfigs are allowed, this can happen. - if (outstandingProposals.containsKey(zxid - 1)) { - return false; - } - - // in order to be committed, a proposal must be accepted by a quorum. - // - // getting a quorum from all necessary configurations. - if (!p.hasAllQuorums()) { - return false; + Proposal previous = outstandingProposals.get(zxid - 1); + if (previous != null) { + if (previous.isWrite()) { + return false; + } + // It is possible in case of downgrading, leader probably will never get enough acks. + // + // These lines are probably should be reverted in new major version. + outstandingProposals.remove(zxid - 1); + commitQuorumSync(previous); } // commit proposals in order @@ -1033,11 +1060,7 @@ public synchronized boolean tryToCommit(Proposal p, long zxid, SocketAddress fol inform(p); } zk.commitProcessor.commit(p.request); - if (pendingSyncs.containsKey(zxid)) { - for (LearnerSyncRequest r : pendingSyncs.remove(zxid)) { - sendSync(r); - } - } + commitPendingSyncs(p); return true; } @@ -1059,6 +1082,9 @@ public synchronized void processAck(long sid, long zxid, SocketAddress followerA if (LOG.isTraceEnabled()) { LOG.trace("Ack zxid: 0x{}", Long.toHexString(zxid)); for (Proposal p : outstandingProposals.values()) { + if (p.isRead()) { + continue; + } long packetZxid = p.packet.getZxid(); LOG.trace("outstanding proposal: 0x{}", Long.toHexString(packetZxid)); } @@ -1090,6 +1116,12 @@ public synchronized void processAck(long sid, long zxid, SocketAddress followerA if (p == null) { LOG.warn("Trying to commit future proposal: zxid 0x{} from {}", Long.toHexString(zxid), followerAddr); return; + } else if (p.isRead()) { + // The write proposal with same zxid is completed, dated write ack should be filtered out by + // above `lastCommitted >= zxid`. + LOG.error("Receive ack for quorum read proposal: lastCommitted {}, zxid 0x{}, proposal {}, follower {}", + Long.toHexString(lastCommitted), Long.toHexString(zxid), p, followerAddr); + return; } if (ackLoggingFrequency > 0 && (zxid % ackLoggingFrequency == 0)) { @@ -1121,6 +1153,64 @@ public synchronized void processAck(long sid, long zxid, SocketAddress followerA } } + void setupQuorumTracker(SyncedLearnerTracker tracker) { + QuorumVerifier verifier = self.getQuorumVerifier(); + QuorumVerifier lastSeen = self.getLastSeenQuorumVerifier(); + tracker.addQuorumVerifier(verifier); + if (lastSeen != null && lastSeen.getVersion() > verifier.getVersion()) { + tracker.addQuorumVerifier(lastSeen); + } + } + + private synchronized void commitQuorumSync(Proposal p) { + sendSync(p.request); + pendingSyncs.decrementAndGet(); + commitPendingSyncs(p); + } + + private synchronized void processQuorumSyncAck(long sid, long zxid, long xid) { + Proposal p = outstandingProposals.get(zxid); + if (p == null) { + LOG.debug("Receive dated quorum sync zxid 0x{}, xid {} with no ping", Long.toHexString(zxid), xid); + return; + } + if (p.isWrite()) { + // This is not possible as write proposal will get new zxid. + LOG.error("Receive quorum sync zxid 0x{}, xid {}, proposal {}", Long.toHexString(zxid), xid, p); + return; + } else if (xid < p.pingXid) { + // It is possible to issue two quorum sync with no write in between. + LOG.debug("Receive dated quorum sync zxid 0x{}, xid {}, ping {}", Long.toHexString(zxid), xid, p); + return; + } else if (xid > p.pingXid) { + // It is not possible as new syncs are either collapsed into old one or + // started after old one completed. + LOG.error("Receive quorum sync zxid 0x{}, xid {}, dated ping {}", Long.toHexString(zxid), xid, p); + return; + } + p.addAck(sid); + if (LOG.isDebugEnabled()) { + LOG.debug("Receive quorum sync zxid 0x{}, xid {}, ping {}, acks {}", Long.toHexString(zxid), xid, p, p.ackSetsToString()); + } + if (p.hasAllQuorums()) { + outstandingProposals.remove(zxid); + commitQuorumSync(p); + } + } + + @Override + void processPing(long sid, long zxid, byte[] payload) throws IOException { + ByteArrayInputStream bis = new ByteArrayInputStream(payload); + DataInputStream dis = new DataInputStream(bis); + int type = dis.readInt(); + if (type != PING_PAYLOAD_TYPE_QUORUM_SYNC) { + String msg = String.format("invalid ping payload type: %d", type); + throw new IOException(msg); + } + long xid = dis.readLong(); + processQuorumSyncAck(sid, zxid, xid); + } + static class ToBeAppliedRequestProcessor implements RequestProcessor { private final RequestProcessor next; @@ -1314,16 +1404,11 @@ public Proposal propose(Request request) throws XidRolloverException { Proposal p = new Proposal(request, pp); synchronized (this) { - p.addQuorumVerifier(self.getQuorumVerifier()); - if (request.getHdr().getType() == OpCode.reconfig) { self.setLastSeenQuorumVerifier(request.qv, true); } - if (self.getQuorumVerifier().getVersion() < self.getLastSeenQuorumVerifier().getVersion()) { - p.addQuorumVerifier(self.getLastSeenQuorumVerifier()); - } - + setupQuorumTracker(p); LOG.debug("Proposing:: {}", request); lastProposed = p.packet.getZxid(); @@ -1334,26 +1419,74 @@ public Proposal propose(Request request) throws XidRolloverException { return p; } + private static byte[] createQuorumSyncPingPayload(long xid) { + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + DataOutputStream output = new DataOutputStream(stream); + try { + output.writeLong(PING_SESSION_END); + output.writeInt(PING_PAYLOAD_TYPE_QUORUM_SYNC); + output.writeLong(xid); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + return stream.toByteArray(); + } + + private Proposal createQuorumSyncProposal(Request r) { + quorumPingXid += 1; + byte[] payload = createQuorumSyncPingPayload(quorumPingXid); + QuorumPacket packet = new QuorumPacket(Leader.PING, lastProposed, payload, null); + Proposal p = new Proposal(); + p.request = r; + p.packet = packet; + p.pingXid = quorumPingXid; + setupQuorumTracker(p); + p.addAck(self.getMyId()); + return p; + } + /** * Process sync requests * * @param r the request */ - - public synchronized void processSync(LearnerSyncRequest r) { - if (outstandingProposals.isEmpty()) { + public synchronized void processSync(Request r) { + Proposal p = outstandingProposals.get(lastProposed); + if (p != null) { + p.pendingSyncs.add(r); + pendingSyncs.incrementAndGet(); + } else if (followersProtocolVersion < ProtocolVersion.V3_10_0) { sendSync(r); } else { - pendingSyncs.computeIfAbsent(lastProposed, k -> new ArrayList<>()).add(r); + p = createQuorumSyncProposal(r); + if (p.hasAllQuorums()) { + // single server distributed mode. + sendSync(r); + return; + } + outstandingProposals.put(lastProposed, p); + pendingSyncs.incrementAndGet(); + sendPacket(p.packet); + } + } + + private synchronized void commitPendingSyncs(Proposal p) { + for (Request r : p.pendingSyncs) { + pendingSyncs.decrementAndGet(); + sendSync(r); } } /** * Sends a sync message to the appropriate server */ - public void sendSync(LearnerSyncRequest r) { - QuorumPacket qp = new QuorumPacket(Leader.SYNC, 0, null, null); - r.fh.queuePacket(qp); + public void sendSync(Request r) { + if (r.getOwner() instanceof LearnerHandler) { + QuorumPacket qp = new QuorumPacket(Leader.SYNC, 0, null, null); + ((LearnerHandler) r.getOwner()).queuePacket(qp); + } else { + zk.commitProcessor.commit(r); + } } /** diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java index 799b8f96148..30a2cc11b2e 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java @@ -66,7 +66,7 @@ public Leader getLeader() { protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader()); - commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener()); + commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), getZooKeeperServerListener()); commitProcessor.start(); ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor); proposalProcessor.initialize(); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index 1ef99e50aae..df5013f6169 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -104,7 +104,7 @@ public Socket getSocket() { protected InputArchive leaderIs; protected OutputArchive leaderOs; /** the protocol version of the leader */ - protected int leaderProtocolVersion = 0x01; + protected int leaderProtocolVersion = ProtocolVersion.ANCIENT; private static final int BUFFERED_MESSAGE_SIZE = 10; protected final MessageTracker messageTracker = new MessageTracker(BUFFERED_MESSAGE_SIZE); @@ -491,7 +491,7 @@ protected long registerWithLeader(int pktType) throws IOException { /* * Add sid to payload */ - LearnerInfo li = new LearnerInfo(self.getMyId(), 0x10000, self.getQuorumVerifier().getVersion()); + LearnerInfo li = new LearnerInfo(self.getMyId(), ProtocolVersion.CURRENT, self.getQuorumVerifier().getVersion()); ByteArrayOutputStream bsid = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid); boa.writeRecord(li, "LearnerInfo"); @@ -885,6 +885,12 @@ protected void ping(QuorumPacket qp) throws IOException { dos.writeLong(entry.getKey()); dos.writeInt(entry.getValue()); } + // Piggyback whatever leader/master sent + byte[] data = qp.getData(); + if (data != null && data.length != 0) { + dos.write(data); + } + QuorumPacket pingReply = new QuorumPacket(qp.getType(), qp.getZxid(), bos.toByteArray(), qp.getAuthinfo()); writePacket(pingReply, true); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java index 57947daa86c..ee42d41dcd2 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java @@ -40,7 +40,6 @@ import javax.security.sasl.SaslException; import org.apache.jute.BinaryInputArchive; import org.apache.jute.BinaryOutputArchive; -import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.common.Time; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.RequestRecord; @@ -105,7 +104,7 @@ String getRemoteAddress() { return sock == null ? "" : sock.getRemoteSocketAddress().toString(); } - protected int version = 0x1; + protected int version = ProtocolVersion.ANCIENT; int getVersion() { return version; @@ -525,7 +524,7 @@ public void run() { long newEpoch = learnerMaster.getEpochToPropose(this.getSid(), lastAcceptedEpoch); long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0); - if (this.getVersion() < 0x10000) { + if (this.getVersion() < ProtocolVersion.V3_4_0) { // we are going to have to extrapolate the epoch information long epoch = ZxidUtils.getEpochFromZxid(zxid); ss = new StateSummary(epoch, zxid); @@ -533,7 +532,7 @@ public void run() { learnerMaster.waitForEpochAck(this.getSid(), ss); } else { byte[] ver = new byte[4]; - ByteBuffer.wrap(ver).putInt(0x10000); + ByteBuffer.wrap(ver).putInt(ProtocolVersion.CURRENT); QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null); oa.writeRecord(newEpochPacket, "packet"); messageTracker.trackSent(Leader.LEADERINFO); @@ -597,7 +596,7 @@ public void run() { // the version of this quorumVerifier will be set by leader.lead() in case // the leader is just being established. waitForEpochAck makes sure that readyToStart is true if // we got here, so the version was set - if (getVersion() < 0x10000) { + if (getVersion() < ProtocolVersion.V3_4_0) { QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, null, null); oa.writeRecord(newLeaderQP, "packet"); } else { @@ -687,6 +686,13 @@ public void run() { DataInputStream dis = new DataInputStream(bis); while (dis.available() > 0) { long sess = dis.readLong(); + if (sess == Leader.PING_SESSION_END) { + int n = dis.available(); + byte[] payload = new byte[n]; + dis.read(payload); + learnerMaster.processPing(this.sid, qp.getZxid(), payload); + break; + } int to = dis.readInt(); learnerMaster.touch(sess, to); } @@ -701,12 +707,7 @@ public void run() { cxid = bb.getInt(); type = bb.getInt(); bb = bb.slice(); - Request si; - if (type == OpCode.sync) { - si = new LearnerSyncRequest(this, sessionId, cxid, type, RequestRecord.fromBytes(bb), qp.getAuthinfo()); - } else { - si = new Request(null, sessionId, cxid, type, RequestRecord.fromBytes(bb), qp.getAuthinfo()); - } + Request si = new Request(null, sessionId, cxid, type, RequestRecord.fromBytes(bb), qp.getAuthinfo()); si.setOwner(this); learnerMaster.submitLearnerRequest(si); requestsReceived.incrementAndGet(); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerMaster.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerMaster.java index 9bf6032af68..8ac77f9f027 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerMaster.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerMaster.java @@ -188,6 +188,8 @@ public LearnerSyncThrottler getLearnerDiffSyncThrottler() { */ abstract void processAck(long sid, long zxid, SocketAddress localSocketAddress); + abstract void processPing(long sid, long zxid, byte[] payload) throws IOException; + /** * mark session as alive * @param sess session id diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMaster.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMaster.java index 2369eabe36c..710f282746a 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMaster.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMaster.java @@ -232,6 +232,10 @@ public void processAck(long sid, long zxid, SocketAddress localSocketAddress) { throw new RuntimeException("Observers shouldn't send ACKS ack = " + Long.toHexString(zxid)); } + @Override + public void processPing(long sid, long zxid, byte[] payload) throws IOException { + } + @Override public void touch(long sess, int to) { zks.getSessionTracker().touchSession(sess, to); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java index 74663a9c136..e76873e09f4 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java @@ -90,7 +90,7 @@ protected void setupRequestProcessors() { // Observers to, for example, remove the disk sync requirements. // Currently, they behave almost exactly the same as followers. RequestProcessor finalProcessor = new FinalRequestProcessor(this); - commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener()); + commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), getZooKeeperServerListener()); commitProcessor.start(); firstProcessor = new ObserverRequestProcessor(this, commitProcessor); ((ObserverRequestProcessor) firstProcessor).start(); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java index c1e2fe16e43..af47829ca8d 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java @@ -18,6 +18,7 @@ package org.apache.zookeeper.server.quorum; +import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.RequestProcessor; import org.apache.zookeeper.server.ServerMetrics; @@ -66,15 +67,13 @@ public void initialize() { } public void processRequest(Request request) throws RequestProcessorException { - /* In the following IF-THEN-ELSE block, we process syncs on the leader. - * If the sync is coming from a follower, then the follower - * handler adds it to syncHandler. Otherwise, if it is a client of - * the leader that issued the sync command, then syncHandler won't - * contain the handler. In this case, we add it to syncHandler, and - * call processRequest on the next processor. - */ - if (request instanceof LearnerSyncRequest) { - zks.getLeader().processSync((LearnerSyncRequest) request); + if (request.type == ZooDefs.OpCode.sync) { + if (!request.isFromLearner()) { + // Submit to commit processor first since no-quorum-sync could + // commit sync immediately without a consensus. + nextProcessor.processRequest(request); + } + zks.getLeader().processSync(request); } else { if (shouldForwardToNextProcessor(request)) { nextProcessor.processRequest(request); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSyncRequest.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ProtocolVersion.java similarity index 57% rename from zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSyncRequest.java rename to zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ProtocolVersion.java index 6892d3dd8a7..3ec072a9a53 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSyncRequest.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ProtocolVersion.java @@ -18,18 +18,28 @@ package org.apache.zookeeper.server.quorum; -import java.util.List; -import org.apache.zookeeper.data.Id; -import org.apache.zookeeper.server.Request; -import org.apache.zookeeper.server.RequestRecord; +public class ProtocolVersion { + private ProtocolVersion() {} -public class LearnerSyncRequest extends Request { + /** + * Pre ZAB 1.0. + */ + public static final int ANCIENT = 1; - LearnerHandler fh; - public LearnerSyncRequest( - LearnerHandler fh, long sessionId, int xid, int type, RequestRecord request, List authInfo) { - super(null, sessionId, xid, type, request, authInfo); - this.fh = fh; - } + /** + * ZAB 1.0. + */ + public static final int V3_4_0 = 0x10000; + /** + * Protocol changes: + * * Learner will piggyback whatever data leader attached in {@link Leader#PING} after session data. + * This way, leader is free to enhance {@link Leader#PING} in future without agreement from learner. + */ + public static final int V3_10_0 = 0x20000; + + /** + * Point to the newest coding version. + */ + public static final int CURRENT = V3_10_0; } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java index 5f5449d240e..b3e11637bae 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java @@ -109,7 +109,7 @@ public void processRequest(Request request) throws RequestProcessorException { public void shutdown() { } - }, "0", false, new ZooKeeperServerListener() { + }, "0", new ZooKeeperServerListener() { @Override public void notifyStopping(String threadName, int errorCode) { diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java index f4ba976de3a..9487f313f2c 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java @@ -79,7 +79,7 @@ private class TestCommitProcessor extends CommitProcessor { int numWorkerThreads; public TestCommitProcessor(RequestProcessor finalProcessor, int numWorkerThreads) { - super(finalProcessor, "1", true, null); + super(finalProcessor, "1", null); this.numWorkerThreads = numWorkerThreads; } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorTest.java index 2377dc77fae..cddfccb6803 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorTest.java @@ -300,7 +300,7 @@ protected void setupRequestProcessors() { // ValidateProcessor is set up in a similar fashion to ToBeApplied // processor, so it can do pre/post validating of requests ValidateProcessor validateProcessor = new ValidateProcessor(finalProcessor); - commitProcessor = new CommitProcessor(validateProcessor, "1", true, null); + commitProcessor = new CommitProcessor(validateProcessor, "1", null); validateProcessor.setCommitProcessor(commitProcessor); commitProcessor.start(); MockProposalRequestProcessor proposalProcessor = new MockProposalRequestProcessor(commitProcessor); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncTest.java index a9be09f3973..83c9d24577e 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncTest.java @@ -252,7 +252,7 @@ protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException { @Override protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); - commitProcessor = new MockCommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener()); + commitProcessor = new MockCommitProcessor(finalProcessor, Long.toString(getServerId()), getZooKeeperServerListener()); commitProcessor.start(); firstProcessor = new FollowerRequestProcessor(this, commitProcessor); @@ -279,9 +279,9 @@ public void processRequest(final Request request) { private static class MockCommitProcessor extends CommitProcessor { public MockCommitProcessor(final RequestProcessor nextProcessor, final String id, - final boolean matchSyncs, final ZooKeeperServerListener listener) { + final ZooKeeperServerListener listener) { - super(nextProcessor, id, matchSyncs, listener); + super(nextProcessor, id, listener); } @Override diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/RaceConditionTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/RaceConditionTest.java index 5216eb70324..f6160b8a88d 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/RaceConditionTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/RaceConditionTest.java @@ -194,7 +194,7 @@ protected void setupRequestProcessors() { */ RequestProcessor finalProcessor = new FinalRequestProcessor(this); RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader()); - commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener()); + commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), getZooKeeperServerListener()); commitProcessor.start(); ProposalRequestProcessor proposalProcessor = new MockProposalRequestProcessor(this, commitProcessor); proposalProcessor.initialize(); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java index d374062e293..c4c6786da11 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java @@ -516,7 +516,7 @@ public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l, long assertEquals(1, l.self.getCurrentEpoch()); /* we test a normal run. everything should work out well. */ - LearnerInfo li = new LearnerInfo(1, 0x10000, 0); + LearnerInfo li = new LearnerInfo(1, ProtocolVersion.CURRENT, 0); byte[] liBytes = RequestRecord.fromRecord(li).readBytes(); QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 1, liBytes, null); oa.writeRecord(qp, null); @@ -524,7 +524,7 @@ public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l, long readPacketSkippingPing(ia, qp); assertEquals(Leader.LEADERINFO, qp.getType()); assertEquals(ZxidUtils.makeZxid(2, 0), qp.getZxid()); - assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000); + assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), ProtocolVersion.CURRENT); assertEquals(2, l.self.getAcceptedEpoch()); assertEquals(1, l.self.getCurrentEpoch()); @@ -594,14 +594,14 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f) assertEquals(qp.getZxid(), 0); LearnerInfo learnInfo = new LearnerInfo(); ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()), learnInfo); - assertEquals(learnInfo.getProtocolVersion(), 0x10000); + assertEquals(learnInfo.getProtocolVersion(), ProtocolVersion.CURRENT); assertEquals(learnInfo.getServerid(), 0); // We are simulating an established leader, so the epoch is 1 qp.setType(Leader.LEADERINFO); qp.setZxid(ZxidUtils.makeZxid(1, 0)); byte[] protoBytes = new byte[4]; - ByteBuffer.wrap(protoBytes).putInt(0x10000); + ByteBuffer.wrap(protoBytes).putInt(ProtocolVersion.CURRENT); qp.setData(protoBytes); oa.writeRecord(qp, null); @@ -728,14 +728,14 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f) assertEquals(qp.getZxid(), 0); LearnerInfo learnInfo = new LearnerInfo(); ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()), learnInfo); - assertEquals(learnInfo.getProtocolVersion(), 0x10000); + assertEquals(learnInfo.getProtocolVersion(), ProtocolVersion.CURRENT); assertEquals(learnInfo.getServerid(), 0); // We are simulating an established leader, so the epoch is 1 qp.setType(Leader.LEADERINFO); qp.setZxid(ZxidUtils.makeZxid(1, 0)); byte[] protoBytes = new byte[4]; - ByteBuffer.wrap(protoBytes).putInt(0x10000); + ByteBuffer.wrap(protoBytes).putInt(ProtocolVersion.CURRENT); qp.setData(protoBytes); oa.writeRecord(qp, null); @@ -835,14 +835,14 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f) assertEquals(qp.getZxid(), 0); LearnerInfo learnInfo = new LearnerInfo(); ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()), learnInfo); - assertEquals(learnInfo.getProtocolVersion(), 0x10000); + assertEquals(learnInfo.getProtocolVersion(), ProtocolVersion.CURRENT); assertEquals(learnInfo.getServerid(), 0); // We are simulating an established leader, so the epoch is 1 qp.setType(Leader.LEADERINFO); qp.setZxid(ZxidUtils.makeZxid(1, 0)); byte[] protoBytes = new byte[4]; - ByteBuffer.wrap(protoBytes).putInt(0x10000); + ByteBuffer.wrap(protoBytes).putInt(ProtocolVersion.CURRENT); qp.setData(protoBytes); oa.writeRecord(qp, null); @@ -951,7 +951,7 @@ public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) thro assertEquals(0, l.self.getCurrentEpoch()); /* we test a normal run. everything should work out well. */ - LearnerInfo li = new LearnerInfo(1, 0x10000, 0); + LearnerInfo li = new LearnerInfo(1, ProtocolVersion.CURRENT, 0); byte[] liBytes = RequestRecord.fromRecord(li).readBytes(); QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0, liBytes, null); oa.writeRecord(qp, null); @@ -959,7 +959,7 @@ public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) thro readPacketSkippingPing(ia, qp); assertEquals(Leader.LEADERINFO, qp.getType()); assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); - assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000); + assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), ProtocolVersion.CURRENT); assertEquals(1, l.self.getAcceptedEpoch()); assertEquals(0, l.self.getCurrentEpoch()); @@ -991,7 +991,7 @@ public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) thro assertEquals(0, l.self.getAcceptedEpoch()); assertEquals(0, l.self.getCurrentEpoch()); - LearnerInfo li = new LearnerInfo(1, 0x10000, 0); + LearnerInfo li = new LearnerInfo(1, ProtocolVersion.CURRENT, 0); byte[] liBytes = RequestRecord.fromRecord(li).readBytes(); QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0, liBytes, null); oa.writeRecord(qp, null); @@ -999,7 +999,7 @@ public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) thro readPacketSkippingPing(ia, qp); assertEquals(Leader.LEADERINFO, qp.getType()); assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); - assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000); + assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), ProtocolVersion.CURRENT); assertEquals(1, l.self.getAcceptedEpoch()); assertEquals(0, l.self.getCurrentEpoch()); @@ -1081,14 +1081,14 @@ public void converseWithObserver(InputArchive ia, OutputArchive oa, Observer o) assertEquals(qp.getZxid(), 0); LearnerInfo learnInfo = new LearnerInfo(); ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()), learnInfo); - assertEquals(learnInfo.getProtocolVersion(), 0x10000); + assertEquals(learnInfo.getProtocolVersion(), ProtocolVersion.CURRENT); assertEquals(learnInfo.getServerid(), 0); // We are simulating an established leader, so the epoch is 1 qp.setType(Leader.LEADERINFO); qp.setZxid(ZxidUtils.makeZxid(1, 0)); byte[] protoBytes = new byte[4]; - ByteBuffer.wrap(protoBytes).putInt(0x10000); + ByteBuffer.wrap(protoBytes).putInt(ProtocolVersion.CURRENT); qp.setData(protoBytes); oa.writeRecord(qp, null); @@ -1193,7 +1193,7 @@ public void testLeaderBehind(@TempDir File testData) throws Exception { testLeaderConversation(new LeaderConversation() { public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) throws IOException { /* we test a normal run. everything should work out well. */ - LearnerInfo li = new LearnerInfo(1, 0x10000, 0); + LearnerInfo li = new LearnerInfo(1, ProtocolVersion.CURRENT, 0); byte[] liBytes = RequestRecord.fromRecord(li).readBytes(); /* we are going to say we last acked epoch 20 */ QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, ZxidUtils.makeZxid(20, 0), liBytes, null); @@ -1201,7 +1201,7 @@ public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) thro readPacketSkippingPing(ia, qp); assertEquals(Leader.LEADERINFO, qp.getType()); assertEquals(ZxidUtils.makeZxid(21, 0), qp.getZxid()); - assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000); + assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), ProtocolVersion.CURRENT); qp = new QuorumPacket(Leader.ACKEPOCH, 0, new byte[4], null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); @@ -1230,14 +1230,14 @@ public void testAbandonBeforeACKEpoch(@TempDir File testData) throws Exception { testLeaderConversation(new LeaderConversation() { public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) throws IOException, InterruptedException { /* we test a normal run. everything should work out well. */ - LearnerInfo li = new LearnerInfo(1, 0x10000, 0); + LearnerInfo li = new LearnerInfo(1, ProtocolVersion.CURRENT, 0); byte[] liBytes = RequestRecord.fromRecord(li).readBytes(); QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0, liBytes, null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); assertEquals(Leader.LEADERINFO, qp.getType()); assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); - assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000); + assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), ProtocolVersion.CURRENT); Thread.sleep(l.self.getInitLimit() * l.self.getTickTime() + 5000); // The leader didn't get a quorum of acks - make sure that leader's current epoch is not advanced diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumSyncTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumSyncTest.java new file mode 100644 index 00000000000..7514d3e0cfe --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumSyncTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.server.quorum.Leader; +import org.apache.zookeeper.server.quorum.LearnerHandler; +import org.apache.zookeeper.server.quorum.QuorumPeer; +import org.junit.jupiter.api.Test; + +public class QuorumSyncTest extends QuorumBase { + @Test + public void testReadAfterSync() throws Exception { + int leaderPort = getLeaderClientPort(); + + ZooKeeper leaderReader = createClient("127.0.0.1:" + leaderPort); + ZooKeeper followerWriter = createClient(getPeersMatching(QuorumPeer.ServerState.FOLLOWING)); + + followerWriter.create("/test", "test0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + // given: dying leader + Leader leader = getLeaderQuorumPeer().leader; + for (LearnerHandler f : leader.getForwardingFollowers()) { + f.getSocket().shutdownInput(); + } + + // and: write succeed in new epoch + while (true) { + try { + followerWriter.setData("/test", "test1".getBytes(), -1); + break; + } catch (KeeperException.ConnectionLossException ignored) { + } + } + + while (true) { + try { + // when: sync succeed + leaderReader.sync("/"); + + // then: read up-to-date data + byte[] test1 = leaderReader.getData("/test", null, null); + assertEquals("test1", new String(test1)); + break; + } catch (Exception ignored) { + } + } + } +} diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java index 1615be17242..e607c4a3de7 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java @@ -140,17 +140,10 @@ public static String testServerHasConfig( ZooKeeper zk, List joiningServers, List leavingServers) throws KeeperException, InterruptedException { - boolean testNodeExists = false; byte[] config = null; for (int j = 0; j < 30; j++) { try { - if (!testNodeExists) { - createZNode(zk, "/dummy", "dummy"); - testNodeExists = true; - } - // Use setData instead of sync API to force a view update. - // Check ZOOKEEPER-2137 for details. - zk.setData("/dummy", "dummy".getBytes(), -1); + zk.sync("/"); config = zk.getConfig(false, new Stat()); break; } catch (KeeperException.ConnectionLossException e) { @@ -189,15 +182,12 @@ public static void testNormalOperation(ZooKeeper writer, ZooKeeper reader, boole try { if (createNodes) { createZNode(writer, "/test", "test"); - createZNode(reader, "/dummy", "dummy"); createNodes = false; } String data = "test" + j; writer.setData("/test", data.getBytes(), -1); - // Use setData instead of sync API to force a view update. - // Check ZOOKEEPER-2137 for details. - reader.setData("/dummy", "dummy".getBytes(), -1); + reader.sync("/"); byte[] res = reader.getData("/test", null, new Stat()); assertEquals(data, new String(res)); break;