Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ protected enum State {
private static volatile int maxBatchSize;

/**
* Starting size of read and write ByteArroyOuputBuffers. Default is 32 bytes.
* Starting size of read and write ByteArrayOutputBuffers. Default is 32 bytes.
* Flag not used for small transfers like connectResponses.
*/
public static final String INT_BUFFER_STARTING_SIZE_BYTES = "zookeeper.intBufferStartingSizeBytes";
Expand Down Expand Up @@ -793,19 +793,6 @@ public void startdata() throws IOException, InterruptedException {
}

public synchronized void startup() {
startupWithServerState(State.RUNNING);
}

public synchronized void startupWithoutServing() {
startupWithServerState(State.INITIAL);
}

public synchronized void startServing() {
setState(State.RUNNING);
notifyAll();
}

private void startupWithServerState(State state) {
if (sessionTracker == null) {
createSessionTracker();
}
Expand All @@ -820,7 +807,7 @@ private void startupWithServerState(State state) {

registerMetrics();

setState(state);
setState(State.RUNNING);

requestPathMetricsCollector.start();

Expand Down Expand Up @@ -1829,7 +1816,7 @@ private void processSasl(RequestRecord request, ServerCnxn cnxn, RequestHeader r
int error;
if (authHelper.isSaslAuthRequired()) {
LOG.warn(
"Closing client connection due to server requires client SASL authenticaiton,"
"Closing client connection due to server requires client SASL authentication,"
+ "but client SASL authentication has failed, or client is not configured with SASL "
+ "authentication.");
error = Code.SESSIONCLOSEDREQUIRESASLAUTH.intValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,11 @@ public void logRequest(TxnHeader hdr, Record txn, TxnDigest digest) {
* @param hdr the txn header
* @param txn the txn
* @param digest the digest of txn
* @return a request moving through a chain of RequestProcessors
*/
public Request appendRequest(final TxnHeader hdr, final Record txn, final TxnDigest digest) throws IOException {
final Request request = buildRequestToProcess(hdr, txn, digest);
public void appendRequest(final TxnHeader hdr, final Record txn, final TxnDigest digest) throws IOException {
final Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
request.setTxnDigest(digest);
getZKDatabase().append(request);
return request;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ protected long nanoTime() {

/**
* Overridable helper method to simply call sock.connect(). This can be
* overriden in tests to fake connection success/failure for connectToLeader.
* overridden in tests to fake connection success/failure for connectToLeader.
*/
protected void sockConnect(Socket sock, InetSocketAddress addr, int timeout) throws IOException {
sock.connect(addr, timeout);
Expand Down Expand Up @@ -555,8 +555,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
boolean syncSnapshot = false;
readPacket(qp);
Deque<Long> packetsCommitted = new ArrayDeque<>();
Deque<PacketInFlight> packetsNotCommitted = new ArrayDeque<>();
Deque<Request> requestsToAck = new ArrayDeque<>();
Deque<PacketInFlight> packetsNotLogged = new ArrayDeque<>();

synchronized (zk) {
if (qp.getType() == Leader.DIFF) {
Expand Down Expand Up @@ -645,11 +644,11 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
self.setLastSeenQuorumVerifier(qv, true);
}

packetsNotCommitted.add(pif);
packetsNotLogged.add(pif);
break;
case Leader.COMMIT:
case Leader.COMMITANDACTIVATE:
pif = packetsNotCommitted.peekFirst();
pif = packetsNotLogged.peekFirst();
if (pif.hdr.getZxid() == qp.getZxid() && qp.getType() == Leader.COMMITANDACTIVATE) {
QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData(), UTF_8));
boolean majorChange = self.processReconfig(
Expand All @@ -668,7 +667,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
Long.toHexString(pif.hdr.getZxid()));
} else {
zk.processTxn(pif.hdr, pif.rec);
packetsNotCommitted.remove();
packetsNotLogged.remove();
}
} else {
packetsCommitted.add(qp.getZxid());
Expand Down Expand Up @@ -710,7 +709,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
// Apply to db directly if we haven't taken the snapshot
zk.processTxn(packet.hdr, packet.rec);
} else {
packetsNotCommitted.add(packet);
packetsNotLogged.add(packet);
packetsCommitted.add(qp.getZxid());
}

Expand Down Expand Up @@ -753,29 +752,55 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
isPreZAB1_0 = false;

// ZOOKEEPER-3911: make sure sync the uncommitted logs before commit them (ACK NEWLEADER).
sock.setSoTimeout(self.tickTime * self.syncLimit);
self.setSyncMode(QuorumPeer.SyncMode.NONE);
zk.startupWithoutServing();
if (zk instanceof FollowerZooKeeperServer) {
if (zk instanceof FollowerZooKeeperServer && !packetsCommitted.isEmpty()) {
long startTime = Time.currentElapsedTime();
FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
for (PacketInFlight p : packetsNotCommitted) {
final Request request = fzk.appendRequest(p.hdr, p.rec, p.digest);
requestsToAck.add(request);

/*
* @see https://github.com/apache/zookeeper/pull/1848
* Persist and process the committed txns in "packetsNotLogged"
* according to "packetsCommitted", which have been committed by
* the leader. For these committed proposals, there is no need to
* reply ack.
*
* @see https://issues.apache.org/jira/browse/ZOOKEEPER-4394
* Keep the outstanding proposals in "packetsNotLogged" to avoid
* NullPointerException when the follower receives COMMIT packet(s)
* right after replying NEWLEADER ack.
*/
while (!packetsCommitted.isEmpty()) {
long zxid = packetsCommitted.removeFirst();
pif = packetsNotLogged.peekFirst();
if (pif == null) {
LOG.warn("Committing 0x{}, but got no proposal", Long.toHexString(zxid));
continue;
} else if (pif.hdr.getZxid() != zxid) {
LOG.warn("Committing 0x{}, but next proposal is 0x{}",
Long.toHexString(zxid), Long.toHexString(pif.hdr.getZxid()));
continue;
}
packetsNotLogged.removeFirst();
fzk.appendRequest(pif.hdr, pif.rec, pif.digest);
fzk.processTxn(pif.hdr, pif.rec);
}

// persist the txns to disk
// @see https://issues.apache.org/jira/browse/ZOOKEEPER-4646
// Make sure to persist the txns to disk before replying NEWLEADER ack.
fzk.getZKDatabase().commit();
LOG.info("{} txns have been persisted and it took {}ms",
packetsNotCommitted.size(), Time.currentElapsedTime() - startTime);
packetsNotCommitted.clear();
LOG.info("It took {}ms to persist and commit txns in packetsCommitted. "
+ "{} outstanding txns left in packetsNotLogged",
Time.currentElapsedTime() - startTime, packetsNotLogged.size());
}

// set the current epoch after all the tnxs are persisted
// @see https://issues.apache.org/jira/browse/ZOOKEEPER-4643
// @see https://issues.apache.org/jira/browse/ZOOKEEPER-4785
// Update current epoch after the committed txns are persisted
self.setCurrentEpoch(newEpoch);
LOG.info("Set the current epoch to {}", newEpoch);
sock.setSoTimeout(self.tickTime * self.syncLimit);
self.setSyncMode(QuorumPeer.SyncMode.NONE);

// send NEWLEADER ack after all the tnxs are persisted
// send NEWLEADER ack after the committed txns are persisted
writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
LOG.info("Sent NEWLEADER ack to leader with zxid {}", Long.toHexString(newLeaderZxid));
break;
Expand All @@ -784,7 +809,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
}
ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
writePacket(ack, true);
zk.startServing();
zk.startup();
/*
* Update the election vote here to ensure that all members of the
* ensemble report the same vote to new servers that start up and
Expand All @@ -796,20 +821,11 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {

// We need to log the stuff that came in between the snapshot and the uptodate
if (zk instanceof FollowerZooKeeperServer) {
// reply ACK of PROPOSAL after ACK of NEWLEADER to avoid leader shutdown due to timeout
// on waiting for a quorum of followers
for (final Request request : requestsToAck) {
final QuorumPacket ackPacket = new QuorumPacket(Leader.ACK, request.getHdr().getZxid(), null, null);
writePacket(ackPacket, false);
}
writePacket(null, true);
requestsToAck.clear();

FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
for (PacketInFlight p : packetsNotCommitted) {
for (PacketInFlight p : packetsNotLogged) {
fzk.logRequest(p.hdr, p.rec, p.digest);
}
LOG.info("{} txns have been logged asynchronously", packetsNotCommitted.size());
LOG.info("{} txns have been logged asynchronously", packetsNotLogged.size());

for (Long zxid : packetsCommitted) {
fzk.commit(zxid);
Expand All @@ -819,7 +835,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
// Similar to follower, we need to log requests between the snapshot
// and UPTODATE
ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk;
for (PacketInFlight p : packetsNotCommitted) {
for (PacketInFlight p : packetsNotLogged) {
Long zxid = packetsCommitted.peekFirst();
if (p.hdr.getZxid() != zxid) {
// log warning message if there is no matching commit
Expand Down
Loading