From 2f86f89cc68a4b10dbdf6fba759baeedf62de302 Mon Sep 17 00:00:00 2001 From: Sirius Date: Wed, 29 Mar 2023 11:12:42 +0800 Subject: [PATCH] ZOOKEEPER-4646: Committed txns may still be lost if followers crash after replying ACK of NEWLEADER but before writing txns to disk --- .../server/quorum/FollowerZooKeeperServer.java | 13 +++++++++++++ .../apache/zookeeper/server/quorum/Learner.java | 17 +++++++++++++++-- 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java index 82e56c390d8..5d7ad0b4ba3 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java @@ -23,6 +23,7 @@ import java.util.concurrent.LinkedBlockingQueue; import javax.management.JMException; import org.apache.jute.Record; +import org.apache.zookeeper.common.Time; import org.apache.zookeeper.jmx.MBeanRegistry; import org.apache.zookeeper.metrics.MetricsContext; import org.apache.zookeeper.server.ExitCode; @@ -88,6 +89,18 @@ public void logRequest(TxnHeader hdr, Record txn, TxnDigest digest) { syncProcessor.processRequest(request); } + public Request logRequestBeforeAckNewleader(TxnHeader hdr, Record txn, TxnDigest digest) throws IOException { + Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid()); + request.setTxnDigest(digest); + if ((request.zxid & 0xffffffffL) != 0) { + pendingTxns.add(request); + } + long startProcessTime = Time.currentElapsedTime(); + getZKDatabase().append(request); + ServerMetrics.getMetrics().SYNC_PROCESS_TIME.add(Time.currentElapsedTime() - startProcessTime); + return request; + } + /** * When a COMMIT message is received, eventually this method is called, * which matches up the zxid from the COMMIT with (hopefully) the head of diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index d534b8f45e9..41250173a10 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -556,6 +556,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { readPacket(qp); Deque packetsCommitted = new ArrayDeque<>(); Deque packetsNotCommitted = new ArrayDeque<>(); + Deque requestsToBeReplied = new ArrayDeque<>(); synchronized (zk) { if (qp.getType() == Leader.DIFF) { LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid())); @@ -750,16 +751,18 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { //Anything after this needs to go to the transaction log, not applied directly in memory isPreZAB1_0 = false; - // ZOOKEEPER-3911: make sure sync the uncommitted logs before commit them (ACK NEWLEADER). + // ZOOKEEPER-3911 & 4646: make sure sync the uncommitted logs before commit them (ACK NEWLEADER). sock.setSoTimeout(self.tickTime * self.syncLimit); self.setSyncMode(QuorumPeer.SyncMode.NONE); zk.startupWithoutServing(); if (zk instanceof FollowerZooKeeperServer) { FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; for (PacketInFlight p : packetsNotCommitted) { - fzk.logRequest(p.hdr, p.rec, p.digest); + requestsToBeReplied.add(fzk.logRequestBeforeAckNewleader(p.hdr, p.rec, p.digest)); } packetsNotCommitted.clear(); + // persist the transaction logs + fzk.getZKDatabase().commit(); } writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true); @@ -781,6 +784,16 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { // We need to log the stuff that came in between the snapshot and the uptodate if (zk instanceof FollowerZooKeeperServer) { + // Reply queued ACKs that are generated before replying ACK of NEWLEADER + // ZOOKEEPER-4685: make sure to reply ACK of PROPOSAL after replying ACK of NEWLEADER. + for (Request si : requestsToBeReplied) { + QuorumPacket p = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null, null); + si.logLatency(ServerMetrics.getMetrics().PROPOSAL_ACK_CREATION_LATENCY); + writePacket(p, false); + } + requestsToBeReplied.clear(); + writePacket(null, true); + FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; for (PacketInFlight p : packetsNotCommitted) { fzk.logRequest(p.hdr, p.rec, p.digest);