getOzoneManagersList() {
public OzoneManager getOMLeader() {
OzoneManager res = null;
for (OzoneManager ozoneManager : this.ozoneManagers) {
- if (ozoneManager.isLeader()) {
+ if (ozoneManager.isLeaderReady()) {
if (res != null) {
// Found more than one leader
// Return null, expect the caller to retry in a while
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneHACluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneHACluster.java
index 96121afed966..051eb94d582e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneHACluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneHACluster.java
@@ -107,6 +107,6 @@ public void testGetOMLeader() throws InterruptedException, TimeoutException {
Assert.assertNotNull("Timed out waiting OM leader election to finish: "
+ "no leader or more than one leader.", ozoneManager);
Assert.assertTrue("Should have gotten the leader!",
- ozoneManager.get().isLeader());
+ ozoneManager.get().isLeaderReady());
}
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconWithOzoneManagerHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconWithOzoneManagerHA.java
index be146fc9d49f..2ff79de07500 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconWithOzoneManagerHA.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconWithOzoneManagerHA.java
@@ -103,7 +103,7 @@ public void testReconGetsSnapshotFromLeader() throws Exception {
Assert.assertNotNull("Timed out waiting OM leader election to finish: "
+ "no leader or more than one leader.", ozoneManager);
Assert.assertTrue("Should have gotten the leader!",
- ozoneManager.get().isLeader());
+ ozoneManager.get().isLeaderReady());
OzoneManagerServiceProviderImpl impl = (OzoneManagerServiceProviderImpl)
cluster.getReconServer().getOzoneManagerServiceProvider();
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java
index 466a55f18300..418d7a88274b 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java
@@ -129,7 +129,7 @@ private boolean shouldRun() {
// OzoneManager can be null for testing
return true;
}
- return ozoneManager.isLeader();
+ return ozoneManager.isLeaderReady();
}
private boolean isRatisEnabled() {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 6229beffa265..cc5af0f11edd 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -234,6 +234,7 @@
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_ERROR_OTHER;
import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
+import static org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer.RaftServerStatus.LEADER_AND_READY;
import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneManagerService.newReflectiveBlockingService;
import org.apache.hadoop.util.Time;
@@ -3502,21 +3503,16 @@ public String getComponent() {
public long getMaxUserVolumeCount() {
return maxUserVolumeCount;
}
-
/**
- * Checks the Leader status of OM Ratis Server.
- * Note that this status has a small window of error. It should not be used
- * to determine the absolute leader status.
- * If it is the leader, the role status is cached till Ratis server
- * notifies of leader change. If it is not leader, the role information is
- * retrieved through by submitting a GroupInfoRequest to Ratis server.
- *
- * If ratis is not enabled, then it always returns true.
+ * Return true, if the current OM node is leader and in ready state to
+ * process the requests.
*
- * @return Return true if this node is the leader, false otherwsie.
+ * If ratis is not enabled, then it always returns true.
+ * @return
*/
- public boolean isLeader() {
- return isRatisEnabled ? omRatisServer.isLeader() : true;
+ public boolean isLeaderReady() {
+ return isRatisEnabled ?
+ omRatisServer.checkLeaderStatus() == LEADER_AND_READY : true;
}
/**
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
index 467764b2f6a0..0f9e20b8d9fa 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
@@ -26,13 +26,8 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Optional;
import java.util.UUID;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -59,11 +54,7 @@
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.netty.NettyConfigKeys;
-import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
-import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.GroupInfoReply;
-import org.apache.ratis.protocol.GroupInfoRequest;
import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.protocol.exceptions.StateMachineException;
@@ -78,8 +69,9 @@
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.server.protocol.TermIndex;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.StringUtils;
@@ -106,20 +98,6 @@ public final class OzoneManagerRatisServer {
private final OzoneManager ozoneManager;
private final OzoneManagerStateMachine omStateMachine;
- private final ClientId clientId = ClientId.randomId();
-
- private final ScheduledExecutorService scheduledRoleChecker;
- private long roleCheckInitialDelayMs = 1000; // 1 second default
- private long roleCheckIntervalMs;
- private ReentrantReadWriteLock roleCheckLock = new ReentrantReadWriteLock();
- private Optional cachedPeerRole = Optional.empty();
- private Optional cachedLeaderPeerId = Optional.empty();
-
- private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
-
- private static long nextCallId() {
- return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
- }
/**
* Submit request to Ratis server.
@@ -301,20 +279,6 @@ private OzoneManagerRatisServer(ConfigurationSource conf,
.setProperties(serverProperties)
.setStateMachine(omStateMachine)
.build();
-
- // Run a scheduler to check and update the server role on the leader
- // periodically
- this.scheduledRoleChecker = Executors.newSingleThreadScheduledExecutor();
- this.scheduledRoleChecker.scheduleWithFixedDelay(new Runnable() {
- @Override
- public void run() {
- // Run this check only on the leader OM
- if (cachedPeerRole.isPresent() &&
- cachedPeerRole.get() == RaftPeerRole.LEADER) {
- updateServerRole();
- }
- }
- }, roleCheckInitialDelayMs, roleCheckIntervalMs, TimeUnit.MILLISECONDS);
}
/**
@@ -556,19 +520,6 @@ private RaftProperties newRaftProperties(ConfigurationSource conf) {
RaftServerConfigKeys.Rpc.setSlownessTimeout(properties,
nodeFailureTimeout);
- TimeUnit roleCheckIntervalUnit =
- OMConfigKeys.OZONE_OM_RATIS_SERVER_ROLE_CHECK_INTERVAL_DEFAULT
- .getUnit();
- long roleCheckIntervalDuration = conf.getTimeDuration(
- OMConfigKeys.OZONE_OM_RATIS_SERVER_ROLE_CHECK_INTERVAL_KEY,
- OMConfigKeys.OZONE_OM_RATIS_SERVER_ROLE_CHECK_INTERVAL_DEFAULT
- .getDuration(), nodeFailureTimeoutUnit);
- this.roleCheckIntervalMs = TimeDuration.valueOf(
- roleCheckIntervalDuration, roleCheckIntervalUnit)
- .toLong(TimeUnit.MILLISECONDS);
- this.roleCheckInitialDelayMs = leaderElectionMinTimeout
- .toLong(TimeUnit.MILLISECONDS);
-
// Set auto trigger snapshot. We don't need to configure auto trigger
// threshold in OM, as last applied index is flushed during double buffer
// flush automatically. (But added this property internally, so that this
@@ -591,107 +542,39 @@ private RaftProperties newRaftProperties(ConfigurationSource conf) {
}
/**
- * Check the cached leader status.
- * @return true if cached role is Leader, false otherwise.
- */
- private boolean checkCachedPeerRoleIsLeader() {
- this.roleCheckLock.readLock().lock();
- try {
- if (cachedPeerRole.isPresent() &&
- cachedPeerRole.get() == RaftPeerRole.LEADER) {
- return true;
- }
- return false;
- } finally {
- this.roleCheckLock.readLock().unlock();
- }
- }
-
- /**
- * Check if the current OM node is the leader node.
- * @return true if Leader, false otherwise.
- */
- public boolean isLeader() {
- if (checkCachedPeerRoleIsLeader()) {
- return true;
- }
-
- // Get the server role from ratis server and update the cached values.
- updateServerRole();
-
- // After updating the server role, check and return if leader or not.
- return checkCachedPeerRoleIsLeader();
- }
-
- /**
- * Get the suggested leader peer id.
- * @return RaftPeerId of the suggested leader node.
+ * Defines RaftServer Status.
*/
- public Optional getCachedLeaderPeerId() {
- this.roleCheckLock.readLock().lock();
- try {
- return cachedLeaderPeerId;
- } finally {
- this.roleCheckLock.readLock().unlock();
- }
+ public enum RaftServerStatus {
+ NOT_LEADER,
+ LEADER_AND_NOT_READY,
+ LEADER_AND_READY;
}
/**
- * Get the gorup info (peer role and leader peer id) from Ratis server and
- * update the OM server role.
+ * Check Leader status and return the state of the RaftServer.
+ *
+ * @return RaftServerStatus.
*/
- public void updateServerRole() {
+ public RaftServerStatus checkLeaderStatus() {
+ Preconditions.checkState(server instanceof RaftServerProxy);
+ RaftServerImpl serverImpl;
try {
- GroupInfoReply groupInfo = getGroupInfo();
- RoleInfoProto roleInfoProto = groupInfo.getRoleInfoProto();
- RaftPeerRole thisNodeRole = roleInfoProto.getRole();
-
- if (thisNodeRole.equals(RaftPeerRole.LEADER)) {
- setServerRole(thisNodeRole, raftPeerId);
-
- } else if (thisNodeRole.equals(RaftPeerRole.FOLLOWER)) {
- ByteString leaderNodeId = roleInfoProto.getFollowerInfo()
- .getLeaderInfo().getId().getId();
- // There may be a chance, here we get leaderNodeId as null. For
- // example, in 3 node OM Ratis, if 2 OM nodes are down, there will
- // be no leader.
- RaftPeerId leaderPeerId = null;
- if (leaderNodeId != null && !leaderNodeId.isEmpty()) {
- leaderPeerId = RaftPeerId.valueOf(leaderNodeId);
+ serverImpl = ((RaftServerProxy) server).getImpl(raftGroupId);
+ if (serverImpl != null) {
+ if (!serverImpl.isLeader()) {
+ return RaftServerStatus.NOT_LEADER;
+ } else if (serverImpl.isLeaderReady()) {
+ return RaftServerStatus.LEADER_AND_READY;
+ } else {
+ return RaftServerStatus.LEADER_AND_NOT_READY;
}
-
- setServerRole(thisNodeRole, leaderPeerId);
-
- } else {
- setServerRole(thisNodeRole, null);
-
}
- } catch (IOException e) {
- LOG.error("Failed to retrieve RaftPeerRole. Setting cached role to " +
- "{} and resetting leader info.", RaftPeerRole.UNRECOGNIZED, e);
- setServerRole(null, null);
- }
- }
-
- /**
- * Set the current server role and the leader peer id.
- */
- private void setServerRole(RaftPeerRole currentRole,
- RaftPeerId leaderPeerId) {
- this.roleCheckLock.writeLock().lock();
- try {
- this.cachedPeerRole = Optional.ofNullable(currentRole);
- this.cachedLeaderPeerId = Optional.ofNullable(leaderPeerId);
- } finally {
- this.roleCheckLock.writeLock().unlock();
+ } catch (IOException ioe) {
+ // In this case we return not a leader.
+ LOG.error("Fail to get RaftServer impl and therefore it's not clear " +
+ "whether it's leader. ", ioe);
}
- }
-
- private GroupInfoReply getGroupInfo() throws IOException {
- GroupInfoRequest groupInfoRequest = new GroupInfoRequest(clientId,
- raftPeerId, raftGroupId, nextCallId());
- GroupInfoReply groupInfo = server.getGroupInfo(groupInfoRequest);
- return groupInfo;
+ return RaftServerStatus.NOT_LEADER;
}
public int getServerPort() {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
index aaf94e9b8c56..183fe461cbcd 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
@@ -380,7 +380,6 @@ public CompletableFuture notifyInstallSnapshotFromLeader(
@Override
public void notifyNotLeader(Collection pendingEntries)
throws IOException {
- omRatisServer.updateServerRole();
}
@Override
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
index 57b8fac6d0d1..c118307fd28d 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
@@ -17,7 +17,6 @@
package org.apache.hadoop.ozone.protocolPB;
import java.io.IOException;
-import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
@@ -26,10 +25,12 @@
import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerDoubleBuffer;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer.RaftServerStatus;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.om.request.OMClientRequest;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
@@ -44,6 +45,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer.RaftServerStatus.LEADER_AND_READY;
+import static org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer.RaftServerStatus.NOT_LEADER;
+
/**
* This class is the server-side translator that forwards requests received on
* {@link OzoneManagerProtocolPB}
@@ -121,13 +125,14 @@ public OMResponse submitRequest(RpcController controller,
private OMResponse processRequest(OMRequest request) throws
ServiceException {
-
+ RaftServerStatus raftServerStatus;
if (isRatisEnabled) {
// Check if the request is a read only request
if (OmUtils.isReadOnly(request)) {
return submitReadRequestToOM(request);
} else {
- if (omRatisServer.isLeader()) {
+ raftServerStatus = omRatisServer.checkLeaderStatus();
+ if (raftServerStatus == LEADER_AND_READY) {
try {
OMClientRequest omClientRequest =
OzoneManagerRatisUtils.createClientRequest(request);
@@ -138,12 +143,7 @@ private OMResponse processRequest(OMRequest request) throws
}
return submitRequestToRatis(request);
} else {
- // throw not leader exception. This is being done, so to avoid
- // unnecessary execution of preExecute on follower OM's. This
- // will be helpful in the case like where we we reduce the
- // chance of allocate blocks on follower OM's. Right now our
- // leader status is updated every 1 second.
- throw createNotLeaderException();
+ throw createLeaderErrorException(raftServerStatus);
}
}
} else {
@@ -185,25 +185,22 @@ private OMResponse submitRequestToRatis(OMRequest request)
private OMResponse submitReadRequestToOM(OMRequest request)
throws ServiceException {
// Check if this OM is the leader.
- if (omRatisServer.isLeader()) {
+ RaftServerStatus raftServerStatus = omRatisServer.checkLeaderStatus();
+ if (raftServerStatus == LEADER_AND_READY) {
return handler.handleReadRequest(request);
} else {
- throw createNotLeaderException();
+ throw createLeaderErrorException(raftServerStatus);
}
}
private ServiceException createNotLeaderException() {
RaftPeerId raftPeerId = omRatisServer.getRaftPeerId();
- Optional leaderRaftPeerId = omRatisServer
- .getCachedLeaderPeerId();
- OMNotLeaderException notLeaderException;
- if (leaderRaftPeerId.isPresent()) {
- notLeaderException = new OMNotLeaderException(
- raftPeerId, leaderRaftPeerId.get());
- } else {
- notLeaderException = new OMNotLeaderException(raftPeerId);
- }
+ // TODO: Set suggest leaderID. Right now, client is not using suggest
+ // leaderID. Need to fix this.
+
+ OMNotLeaderException notLeaderException =
+ new OMNotLeaderException(raftPeerId);
if (LOG.isDebugEnabled()) {
LOG.debug(notLeaderException.getMessage());
@@ -212,6 +209,26 @@ private ServiceException createNotLeaderException() {
return new ServiceException(notLeaderException);
}
+ private ServiceException createLeaderErrorException(
+ RaftServerStatus raftServerStatus) {
+ if (raftServerStatus == NOT_LEADER) {
+ return createNotLeaderException();
+ } else {
+ return createLeaderNotReadyException();
+ }
+ }
+
+
+ private ServiceException createLeaderNotReadyException() {
+ RaftPeerId raftPeerId = omRatisServer.getRaftPeerId();
+
+ OMLeaderNotReadyException leaderNotReadyException =
+ new OMLeaderNotReadyException(raftPeerId.toString() + " is Leader " +
+ "but not ready to process request");
+
+ return new ServiceException(leaderNotReadyException);
+ }
+
/**
* Submits request directly to OM.
*/