testOMNodeIDList) {
this.omProxies = testOMProxies;
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ha/OMNodeDetails.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMNodeDetails.java
similarity index 54%
rename from hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ha/OMNodeDetails.java
rename to hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMNodeDetails.java
index 7156381ba2ed..d908b57f2729 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ha/OMNodeDetails.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMNodeDetails.java
@@ -15,17 +15,23 @@
* the License.
*/
-package org.apache.hadoop.ozone.om.ha;
+package org.apache.hadoop.ozone.om.helpers;
import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.hdds.server.http.HttpConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.ha.ConfUtils;
import org.apache.hadoop.hdds.NodeDetails;
import java.net.InetSocketAddress;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_FLUSH;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_DB_CHECKPOINT_HTTP_ENDPOINT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_PORT_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_PORT_KEY;
/**
* This class stores OM node details.
@@ -43,6 +49,16 @@ private OMNodeDetails(String serviceId, String nodeId,
this.rpcPort = rpcPort;
}
+ /**
+ * Constructs OMNodeDetails object.
+ */
+ private OMNodeDetails(String serviceId, String nodeId, String hostAddr,
+ int rpcPort, int ratisPort, String httpAddress, String httpsAddress) {
+ super(serviceId, nodeId, hostAddr, rpcPort, ratisPort, httpAddress,
+ httpsAddress);
+ this.rpcPort = rpcPort;
+ }
+
@Override
public String toString() {
return "OMNodeDetails["
@@ -66,18 +82,35 @@ public int getRpcPort() {
public static class Builder {
private String omServiceId;
private String omNodeId;
+ private String hostAddress;
private InetSocketAddress rpcAddress;
private int rpcPort;
private int ratisPort;
private String httpAddr;
private String httpsAddr;
+ public Builder setHostAddress(String hostName) {
+ this.hostAddress = hostName;
+ return this;
+ }
+
public Builder setRpcAddress(InetSocketAddress rpcAddr) {
this.rpcAddress = rpcAddr;
this.rpcPort = rpcAddress.getPort();
return this;
}
+ public Builder setRatisAddress(InetSocketAddress ratisAddr) {
+ this.hostAddress = ratisAddr.getHostName();
+ this.ratisPort = ratisAddr.getPort();
+ return this;
+ }
+
+ public Builder setRpcPort(int port) {
+ this.rpcPort = port;
+ return this;
+ }
+
public Builder setRatisPort(int port) {
this.ratisPort = port;
return this;
@@ -104,13 +137,18 @@ public Builder setHttpsAddress(String httpsAddress) {
}
public OMNodeDetails build() {
- return new OMNodeDetails(omServiceId, omNodeId, rpcAddress, rpcPort,
- ratisPort, httpAddr, httpsAddr);
+ if (rpcAddress != null) {
+ return new OMNodeDetails(omServiceId, omNodeId, rpcAddress, rpcPort,
+ ratisPort, httpAddr, httpsAddr);
+ } else {
+ return new OMNodeDetails(omServiceId, omNodeId, hostAddress, rpcPort,
+ ratisPort, httpAddr, httpsAddr);
+ }
}
}
- public String getOMDBCheckpointEnpointUrl(HttpConfig.Policy httpPolicy) {
- if (httpPolicy.isHttpEnabled()) {
+ public String getOMDBCheckpointEnpointUrl(boolean isHttpPolicy) {
+ if (isHttpPolicy) {
if (StringUtils.isNotEmpty(getHttpAddress())) {
return "http://" + getHttpAddress() +
OZONE_OM_DB_CHECKPOINT_HTTP_ENDPOINT +
@@ -125,4 +163,40 @@ public String getOMDBCheckpointEnpointUrl(HttpConfig.Policy httpPolicy) {
}
return null;
}
+
+ public static OMNodeDetails getOMNodeDetailsFromConf(OzoneConfiguration conf,
+ String omServiceId, String omNodeId) {
+ String rpcAddrKey = ConfUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
+ omServiceId, omNodeId);
+ String rpcAddrStr = OmUtils.getOmRpcAddress(conf, rpcAddrKey);
+ if (rpcAddrStr == null || rpcAddrStr.isEmpty()) {
+ return null;
+ }
+
+ String ratisPortKey = ConfUtils.addKeySuffixes(OZONE_OM_RATIS_PORT_KEY,
+ omServiceId, omNodeId);
+ int ratisPort = conf.getInt(ratisPortKey, OZONE_OM_RATIS_PORT_DEFAULT);
+
+ InetSocketAddress omRpcAddress = null;
+ try {
+ omRpcAddress = NetUtils.createSocketAddr(rpcAddrStr);
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Couldn't create socket address" +
+ " for OM " + omNodeId + " at " + rpcAddrStr, e);
+ }
+
+ String httpAddr = OmUtils.getHttpAddressForOMPeerNode(conf,
+ omServiceId, omNodeId, omRpcAddress.getHostName());
+ String httpsAddr = OmUtils.getHttpsAddressForOMPeerNode(conf,
+ omServiceId, omNodeId, omRpcAddress.getHostName());
+
+ return new OMNodeDetails.Builder()
+ .setOMNodeId(omNodeId)
+ .setRpcAddress(omRpcAddress)
+ .setRatisPort(ratisPort)
+ .setHttpAddress(httpAddr)
+ .setHttpsAddress(httpsAddr)
+ .setOMServiceId(omServiceId)
+ .build();
+ }
}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OMInterServiceProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OMInterServiceProtocol.java
new file mode 100644
index 000000000000..165cd0332366
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OMInterServiceProtocol.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.protocol;
+
+import java.io.Closeable;
+import java.io.IOException;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.helpers.OMNodeDetails;
+import org.apache.hadoop.security.KerberosInfo;
+
+/**
+ * Protocol for inter OM communication.
+ */
+@KerberosInfo(
+ serverPrincipal = OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY)
+public interface OMInterServiceProtocol extends Closeable {
+
+ /**
+ * Bootstrap OM by adding to existing OM Ratis ring.
+ */
+ void bootstrap(OMNodeDetails newOMNode) throws IOException;
+}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
index ea0e534582cd..32d4f54b1f6b 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
@@ -64,7 +64,7 @@ public Hadoop3OmTransport(ConfigurationSource conf,
ProtobufRpcEngine.class);
this.omFailoverProxyProvider = new OMFailoverProxyProvider(conf, ugi,
- omServiceId);
+ omServiceId, OzoneManagerProtocolPB.class);
int maxFailovers = conf.getInt(
OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMInterServiceProtocolClientSideImpl.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMInterServiceProtocolClientSideImpl.java
new file mode 100644
index 000000000000..a81a19bfd679
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMInterServiceProtocolClientSideImpl.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.protocolPB;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import java.io.IOException;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.ipc.ProtobufHelper;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
+import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
+import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
+import org.apache.hadoop.ozone.om.helpers.OMNodeDetails;
+import org.apache.hadoop.ozone.om.protocol.OMInterServiceProtocol;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerInterServiceProtocolProtos.BootstrapOMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerInterServiceProtocolProtos.BootstrapOMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerInterServiceProtocolProtos.ErrorCode;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Protocol implementation for Inter OM communication.
+ */
+public class OMInterServiceProtocolClientSideImpl implements
+ OMInterServiceProtocol {
+
+ /**
+ * RpcController is not used and hence is set to null.
+ */
+ private static final RpcController NULL_RPC_CONTROLLER = null;
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(OMInterServiceProtocolClientSideImpl.class);
+
+ private final OMFailoverProxyProvider omFailoverProxyProvider;
+
+ private final OMInterServiceProtocolPB rpcProxy;
+
+ public OMInterServiceProtocolClientSideImpl(ConfigurationSource conf,
+ UserGroupInformation ugi, String omServiceId) throws IOException {
+
+ RPC.setProtocolEngine(OzoneConfiguration.of(conf),
+ OMInterServiceProtocolPB.class, ProtobufRpcEngine.class);
+
+ this.omFailoverProxyProvider = new OMFailoverProxyProvider(conf, ugi,
+ omServiceId, OMInterServiceProtocolPB.class);
+
+ int maxFailovers = conf.getInt(
+ OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
+ OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
+
+ this.rpcProxy = (OMInterServiceProtocolPB) RetryProxy.create(
+ OMInterServiceProtocolPB.class, omFailoverProxyProvider,
+ omFailoverProxyProvider.getRetryPolicy(maxFailovers));
+ }
+
+ @Override
+ public void bootstrap(OMNodeDetails newOMNode) throws IOException {
+ BootstrapOMRequest bootstrapOMRequest = BootstrapOMRequest.newBuilder()
+ .setNodeId(newOMNode.getNodeId())
+ .setHostAddress(newOMNode.getHostAddress())
+ .setRatisPort(newOMNode.getRatisPort())
+ .build();
+
+ BootstrapOMResponse response;
+ try {
+ response = rpcProxy.bootstrap(NULL_RPC_CONTROLLER, bootstrapOMRequest);
+ } catch (ServiceException e) {
+ OMNotLeaderException notLeaderException =
+ OMFailoverProxyProvider.getNotLeaderException(e);
+ if (notLeaderException != null) {
+ throwException(ErrorCode.LEADER_UNDETERMINED,
+ notLeaderException.getMessage());
+ }
+
+ OMLeaderNotReadyException leaderNotReadyException =
+ OMFailoverProxyProvider.getLeaderNotReadyException(e);
+ if (leaderNotReadyException != null) {
+ throwException(ErrorCode.LEADER_NOT_READY,
+ leaderNotReadyException.getMessage());
+ }
+ throw ProtobufHelper.getRemoteException(e);
+ }
+
+ if (!response.getSuccess()) {
+ throwException(response.getErrorCode(), response.getErrorMsg());
+ }
+ }
+
+ private void throwException(ErrorCode errorCode, String errorMsg)
+ throws IOException {
+ throw new IOException("Failed to Bootstrap OM. Error Code: " + errorCode +
+ ", Error Message: " + errorMsg);
+ }
+
+ @Override
+ public void close() throws IOException {
+ omFailoverProxyProvider.close();
+ }
+}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMInterServiceProtocolPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMInterServiceProtocolPB.java
new file mode 100644
index 000000000000..90f773be814e
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMInterServiceProtocolPB.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.protocolPB;
+
+import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.ozone.protocol.proto
+ .OzoneManagerInterServiceProtocolProtos.OzoneManagerInterService;
+import org.apache.hadoop.security.KerberosInfo;
+
+/**
+ * Protocol used for communication between OMs.
+ */
+@ProtocolInfo(protocolName =
+ "org.apache.hadoop.ozone.om.protocol.OMInterServiceProtocol",
+ protocolVersion = 1)
+@KerberosInfo(
+ serverPrincipal = OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY)
+@InterfaceAudience.Private
+public interface OMInterServiceProtocolPB
+ extends OzoneManagerInterService.BlockingInterface {
+}
diff --git a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestOMFailoverProxyProvider.java b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestOMFailoverProxyProvider.java
index 21589174de39..12b0d408654c 100644
--- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestOMFailoverProxyProvider.java
+++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestOMFailoverProxyProvider.java
@@ -26,6 +26,7 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ozone.ha.ConfUtils;
+import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
import org.junit.Assert;
import org.junit.Test;
import org.junit.Before;
@@ -68,7 +69,8 @@ public void init() throws Exception {
config.set(ConfUtils.addKeySuffixes(OZONE_OM_NODES_KEY, OM_SERVICE_ID),
allNodeIds.toString());
provider = new OMFailoverProxyProvider(config,
- UserGroupInformation.getCurrentUser(), OM_SERVICE_ID);
+ UserGroupInformation.getCurrentUser(), OM_SERVICE_ID,
+ OzoneManagerProtocolPB.class);
}
/**
@@ -184,7 +186,8 @@ public void testCanonicalTokenServiceName() throws IOException {
ozoneConf.set(ConfUtils.addKeySuffixes(OZONE_OM_NODES_KEY, OM_SERVICE_ID),
allNodeIds.toString());
OMFailoverProxyProvider prov = new OMFailoverProxyProvider(ozoneConf,
- UserGroupInformation.getCurrentUser(), OM_SERVICE_ID);
+ UserGroupInformation.getCurrentUser(), OM_SERVICE_ID,
+ OzoneManagerProtocolPB.class);
Text dtService = prov.getCurrentProxyDelegationToken();
diff --git a/hadoop-ozone/fault-injection-test/mini-chaos-tests/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java b/hadoop-ozone/fault-injection-test/mini-chaos-tests/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java
index 1773b6d290ce..728ded031e44 100644
--- a/hadoop-ozone/fault-injection-test/mini-chaos-tests/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java
+++ b/hadoop-ozone/fault-injection-test/mini-chaos-tests/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java
@@ -100,11 +100,14 @@ public static FailureService of(String serviceName) {
}
}
+ @SuppressWarnings("parameternumber")
public MiniOzoneChaosCluster(OzoneConfiguration conf,
List ozoneManagers, List scms,
List hddsDatanodes, String omServiceID,
- String scmServiceId, Set> clazzes) {
- super(conf, ozoneManagers, scms, hddsDatanodes, omServiceID, scmServiceId);
+ String scmServiceId, String clusterPath,
+ Set> clazzes) {
+ super(conf, ozoneManagers, scms, hddsDatanodes, omServiceID, scmServiceId,
+ clusterPath);
this.numDatanodes = getHddsDatanodes().size();
this.numOzoneManagers = ozoneManagers.size();
this.numStorageContainerManagers = scms.size();
@@ -327,7 +330,7 @@ public MiniOzoneChaosCluster build() throws IOException {
MiniOzoneChaosCluster cluster =
new MiniOzoneChaosCluster(conf, omList, scmList, hddsDatanodes,
- omServiceId, scmServiceId, clazzes);
+ omServiceId, scmServiceId, path, clazzes);
if (startDataNodes) {
cluster.startHddsDatanodes();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index 309fb9cbe48d..0263fcfd7fae 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -75,6 +75,11 @@ static Builder newHABuilder(OzoneConfiguration conf) {
*/
OzoneConfiguration getConf();
+ /**
+ * Set the configuration for the MiniOzoneCluster.
+ */
+ void setConf(OzoneConfiguration newConf);
+
/**
* Waits for the cluster to be ready, this call blocks till all the
* configured {@link HddsDatanodeService} registers with
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index 738c8d8f8cf9..edda93971c2c 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -110,7 +110,7 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
private static final Logger LOG =
LoggerFactory.getLogger(MiniOzoneClusterImpl.class);
- private final OzoneConfiguration conf;
+ private OzoneConfiguration conf;
private StorageContainerManager scm;
private OzoneManager ozoneManager;
private final List hddsDatanodes;
@@ -189,6 +189,11 @@ public OzoneConfiguration getConf() {
return conf;
}
+ @Override
+ public void setConf(OzoneConfiguration newConf) {
+ this.conf = newConf;
+ }
+
@Override
public String getOMServiceId() {
// Non-HA cluster doesn't have OM Service Id.
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
index 82b31e4f9472..af62ed08faec 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
@@ -34,6 +34,7 @@
import org.apache.hadoop.ozone.ha.ConfUtils;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
import org.apache.hadoop.ozone.recon.ReconServer;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.ozone.test.GenericTestUtils;
@@ -68,11 +69,13 @@ public class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
private final OMHAService omhaService;
private final SCMHAService scmhaService;
+ private final String clusterMetaPath;
+
private int waitForClusterToBeReadyTimeout = 120000; // 2 min
private static final Random RANDOM = new Random();
private static final int RATIS_RPC_TIMEOUT = 1000; // 1 second
- public static final int NODE_FAILURE_TIMEOUT = 2000; // 2 seconds
+ private static final int NODE_FAILURE_TIMEOUT = 2000; // 2 seconds
/**
* Creates a new MiniOzoneCluster.
@@ -89,12 +92,14 @@ public MiniOzoneHAClusterImpl(
List hddsDatanodes,
String omServiceId,
String scmServiceId,
+ String clusterPath,
ReconServer reconServer) {
super(conf, hddsDatanodes, reconServer);
omhaService =
new OMHAService(activeOMList, inactiveOMList, omServiceId);
scmhaService =
new SCMHAService(activeSCMList, inactiveSCMList, scmServiceId);
+ this.clusterMetaPath = clusterPath;
}
/**
@@ -107,9 +112,10 @@ protected MiniOzoneHAClusterImpl(
List scmList,
List hddsDatanodes,
String omServiceId,
- String scmServiceId) {
+ String scmServiceId,
+ String clusterPath) {
this(conf, omList, null, scmList, null, hddsDatanodes,
- omServiceId, scmServiceId, null);
+ omServiceId, scmServiceId, clusterPath, null);
}
@Override
@@ -181,7 +187,7 @@ public StorageContainerManager getStorageContainerManager(int index) {
*/
public OzoneManager getOMLeader() {
OzoneManager res = null;
- for (OzoneManager ozoneManager : this.omhaService.getServices()) {
+ for (OzoneManager ozoneManager : this.omhaService.getActiveServices()) {
if (ozoneManager.isLeaderReady()) {
if (res != null) {
// Found more than one leader
@@ -239,7 +245,7 @@ public void shutdownStorageContainerManager(StorageContainerManager scm) {
LOG.info("Shutting down StorageContainerManager " + scm.getScmId());
scm.stop();
- scmhaService.removeInstance(scm);
+ scmhaService.deactivate(scm);
}
public void restartStorageContainerManager(
@@ -251,7 +257,7 @@ public void restartStorageContainerManager(
shutdownStorageContainerManager(scm);
scm.join();
scm = TestUtils.getScmSimple(scmConf);
- scmhaService.addInstance(scm);
+ scmhaService.activate(scm);
scm.start();
if (waitForSCM) {
waitForClusterToBeReady();
@@ -305,13 +311,17 @@ public void stop() {
}
public void stopOzoneManager(int index) {
- omhaService.getServices().get(index).stop();
- omhaService.getServices().get(index).join();
+ OzoneManager om = omhaService.getServices().get(index);
+ om.stop();
+ om.join();
+ omhaService.deactivate(om);
}
public void stopOzoneManager(String omNodeId) {
- omhaService.getServiceById(omNodeId).stop();
- omhaService.getServiceById(omNodeId).join();
+ OzoneManager om = omhaService.getServiceById(omNodeId);
+ om.stop();
+ om.join();
+ omhaService.deactivate(om);
}
/**
@@ -356,7 +366,12 @@ public MiniOzoneCluster build() throws IOException {
numOfActiveOMs = numOfOMs;
}
- // If num of ActiveOMs is not set, set it to numOfOMs.
+ // If num of SCMs it not set, set it to 1.
+ if (numOfSCMs == 0) {
+ numOfSCMs = 1;
+ }
+
+ // If num of ActiveSCMs is not set, set it to numOfSCMs.
if (numOfActiveSCMs == ACTIVE_SCMS_NOT_SET) {
numOfActiveSCMs = numOfSCMs;
}
@@ -383,7 +398,7 @@ public MiniOzoneCluster build() throws IOException {
MiniOzoneHAClusterImpl cluster = new MiniOzoneHAClusterImpl(conf,
activeOMs, inactiveOMs, activeSCMs, inactiveSCMs,
- hddsDatanodes, omServiceId, scmServiceId, reconServer);
+ hddsDatanodes, omServiceId, scmServiceId, path, reconServer);
if (startDataNodes) {
cluster.startHddsDatanodes();
@@ -430,7 +445,7 @@ protected List createOMService() throws IOException,
List omList = Lists.newArrayList();
int retryCount = 0;
- int basePort = 10000;
+ int basePort;
while (true) {
try {
@@ -466,7 +481,7 @@ protected List createOMService() throws IOException,
if (i <= numOfActiveOMs) {
om.start();
activeOMs.add(om);
- LOG.info("Started OzoneManager RPC server at {}",
+ LOG.info("Started OzoneManager {} RPC server at {}", nodeId,
om.getOmRpcServerAddr());
} else {
inactiveOMs.add(om);
@@ -642,13 +657,14 @@ private void initOMHAConfig(int basePort) throws IOException {
conf.set(OMConfigKeys.OZONE_OM_INTERNAL_SERVICE_ID, omServiceId);
String omNodesKey = ConfUtils.addKeySuffixes(
OMConfigKeys.OZONE_OM_NODES_KEY, omServiceId);
- StringBuilder omNodesKeyValue = new StringBuilder();
+ List omNodeIds = new ArrayList<>();
int port = basePort;
for (int i = 1; i <= numOfOMs; i++, port+=6) {
String omNodeId = OM_NODE_ID_PREFIX + i;
- omNodesKeyValue.append(",").append(omNodeId);
+ omNodeIds.add(omNodeId);
+
String omAddrKey = ConfUtils.addKeySuffixes(
OMConfigKeys.OZONE_OM_ADDRESS_KEY, omServiceId, omNodeId);
String omHttpAddrKey = ConfUtils.addKeySuffixes(
@@ -664,8 +680,160 @@ private void initOMHAConfig(int basePort) throws IOException {
conf.setInt(omRatisPortKey, port + 4);
}
- conf.set(omNodesKey, omNodesKeyValue.substring(1));
+ conf.set(omNodesKey, String.join(",", omNodeIds));
+ }
+ }
+
+ /**
+ * Bootstrap new OM and add to existing OM HA service ring.
+ * @return new OM nodeId
+ */
+ public void bootstrapOzoneManager(String omNodeId) throws Exception {
+
+ int basePort;
+ int retryCount = 0;
+
+ OzoneManager om = null;
+
+ long leaderSnapshotIndex = getOMLeader().getRatisSnapshotIndex();
+
+ while (true) {
+ try {
+ basePort = 10000 + RANDOM.nextInt(1000) * 4;
+ OzoneConfiguration newConf = addNewOMToConfig(getOMServiceId(),
+ omNodeId, basePort);
+
+ om = bootstrapNewOM(omNodeId);
+
+ LOG.info("Bootstrapped OzoneManager {} RPC server at {}", omNodeId,
+ om.getOmRpcServerAddr());
+
+ // Add new OMs to cluster's in memory map and update existing OMs conf.
+ setConf(newConf);
+
+ omhaService.addInstance(om, true);
+ break;
+ } catch (IOException e) {
+ // Existing OM config could have been updated with new conf. Reset it.
+ for (OzoneManager existingOM : omhaService.getServices()) {
+ existingOM.setConfiguration(getConf());
+ }
+ if (e instanceof BindException ||
+ e.getCause() instanceof BindException) {
+ ++retryCount;
+ LOG.info("MiniOzoneHACluster port conflicts, retried {} times",
+ retryCount);
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ waitForBootstrappedNodeToBeReady(om, leaderSnapshotIndex);
+ waitForConfigUpdateOnAllOMs(omNodeId);
+ }
+
+ /**
+ * Set the configs for new OMs.
+ */
+ private OzoneConfiguration addNewOMToConfig(String omServiceId,
+ String omNodeId, int basePort) {
+ OzoneConfiguration newConf = getConf();
+ String omNodesKey = ConfUtils.addKeySuffixes(
+ OMConfigKeys.OZONE_OM_NODES_KEY, omServiceId);
+ StringBuilder omNodesKeyValue = new StringBuilder();
+ omNodesKeyValue.append(newConf.get(omNodesKey))
+ .append(",").append(omNodeId);
+
+ String omAddrKey = ConfUtils.addKeySuffixes(
+ OMConfigKeys.OZONE_OM_ADDRESS_KEY, omServiceId, omNodeId);
+ String omHttpAddrKey = ConfUtils.addKeySuffixes(
+ OMConfigKeys.OZONE_OM_HTTP_ADDRESS_KEY, omServiceId, omNodeId);
+ String omHttpsAddrKey = ConfUtils.addKeySuffixes(
+ OMConfigKeys.OZONE_OM_HTTPS_ADDRESS_KEY, omServiceId, omNodeId);
+ String omRatisPortKey = ConfUtils.addKeySuffixes(
+ OMConfigKeys.OZONE_OM_RATIS_PORT_KEY, omServiceId, omNodeId);
+
+ newConf.set(omAddrKey, "127.0.0.1:" + basePort);
+ newConf.set(omHttpAddrKey, "127.0.0.1:" + (basePort + 2));
+ newConf.set(omHttpsAddrKey, "127.0.0.1:" + (basePort + 3));
+ newConf.setInt(omRatisPortKey, basePort + 4);
+
+ newConf.set(omNodesKey, omNodesKeyValue.toString());
+
+ return newConf;
+ }
+
+ /**
+ * Start a new OM in Bootstrap mode. Configs for the new OM must already be
+ * set.
+ */
+ private OzoneManager bootstrapNewOM(String nodeId)
+ throws IOException, AuthenticationException {
+ OzoneConfiguration config = new OzoneConfiguration(getConf());
+ config.set(OMConfigKeys.OZONE_OM_NODE_ID_KEY, nodeId);
+ // Set the OM rpc and http(s) address to null so that the cluster picks
+ // up the address set with service ID and node ID
+ config.set(OMConfigKeys.OZONE_OM_ADDRESS_KEY, "");
+ config.set(OMConfigKeys.OZONE_OM_HTTP_ADDRESS_KEY, "");
+ config.set(OMConfigKeys.OZONE_OM_HTTPS_ADDRESS_KEY, "");
+
+ // Set metadata/DB dir base path
+ String metaDirPath = clusterMetaPath + "/" + nodeId;
+ config.set(OZONE_METADATA_DIRS, metaDirPath);
+
+ // Update existing OMs config
+ for (OzoneManager existingOM : omhaService.getServices()) {
+ existingOM.setConfiguration(config);
}
+
+ OzoneManager.omInit(config);
+ OzoneManager om = OzoneManager.createOm(config,
+ OzoneManager.StartupOption.BOOTSTRAP);
+ om.start();
+ return om;
+
+ }
+
+ /**
+ * Wait for AddOM command to execute on all OMs.
+ */
+ private void waitForBootstrappedNodeToBeReady(OzoneManager newOM,
+ long leaderSnapshotIndex) throws Exception {
+ // Wait for bootstrapped nodes to catch up with others
+ GenericTestUtils.waitFor(() -> {
+ try {
+ if (newOM.getRatisSnapshotIndex() >= leaderSnapshotIndex) {
+ return true;
+ }
+ } catch (IOException e) {
+ return false;
+ }
+ return false;
+ }, 1000, waitForClusterToBeReadyTimeout);
+ }
+
+ private void waitForConfigUpdateOnAllOMs(String newOMNodeId)
+ throws Exception {
+ OzoneManager newOMNode = omhaService.getServiceById(newOMNodeId);
+ OzoneManagerRatisServer newOMRatisServer = newOMNode.getOmRatisServer();
+ GenericTestUtils.waitFor(() -> {
+ // Each existing active OM should contain the new OM in its peerList.
+ // Also, the new OM should contain each existing active OM in it's OM
+ // peer list and RatisServer peerList.
+ for (OzoneManager om : omhaService.getActiveServices()) {
+ if (!om.doesPeerExist(newOMNodeId)) {
+ return false;
+ }
+ if (!newOMNode.doesPeerExist(om.getOMNodeId())) {
+ return false;
+ }
+ if (!newOMRatisServer.doesPeerExist(om.getOMNodeId())) {
+ return false;
+ }
+ }
+ return true;
+ }, 1000, waitForClusterToBeReadyTimeout);
}
/**
@@ -683,11 +851,15 @@ static class MiniOzoneHAService {
private List activeServices;
private List inactiveServices;
+ // Function to extract the Id from service
+ private Function serviceIdProvider;
+
MiniOzoneHAService(String name, List activeList,
List inactiveList, String serviceId,
Function idProvider) {
this.serviceName = name;
this.serviceMap = Maps.newHashMap();
+ this.serviceIdProvider = idProvider;
if (activeList != null) {
for (Type service : activeList) {
this.serviceMap.put(idProvider.apply(service), service);
@@ -717,12 +889,30 @@ public List getServices() {
return services;
}
+ public List getActiveServices() {
+ return activeServices;
+ }
+
public boolean removeInstance(Type t) {
return services.remove(t);
}
- public boolean addInstance(Type t) {
- return services.add(t);
+ public void addInstance(Type t, boolean isActive) {
+ services.add(t);
+ serviceMap.put(serviceIdProvider.apply(t), t);
+ if (isActive) {
+ activeServices.add(t);
+ }
+ }
+
+ public void activate(Type t) {
+ activeServices.add(t);
+ inactiveServices.remove(t);
+ }
+
+ public void deactivate(Type t) {
+ activeServices.remove(t);
+ inactiveServices.add(t);
}
public boolean isServiceActive(String id) {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneOMHAClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneOMHAClusterImpl.java
index 30a934540cd1..2f2b77341481 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneOMHAClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneOMHAClusterImpl.java
@@ -50,9 +50,10 @@ private MiniOzoneOMHAClusterImpl(
StorageContainerManager scm,
List hddsDatanodes,
String omServiceId,
+ String metaPath,
ReconServer reconServer) {
super(conf, activeOMList, inactiveOMList, Collections.singletonList(scm),
- null, hddsDatanodes, omServiceId, null, reconServer);
+ null, hddsDatanodes, omServiceId, null, metaPath, reconServer);
}
/**
@@ -104,7 +105,7 @@ public MiniOzoneCluster build() throws IOException {
MiniOzoneClusterImpl cluster = new MiniOzoneOMHAClusterImpl(conf,
getActiveOMs(), getInactiveOMs(), scm, hddsDatanodes,
- omServiceId, reconServer);
+ omServiceId, path, reconServer);
if (startDataNodes) {
cluster.startHddsDatanodes();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerBootstrap.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerBootstrap.java
new file mode 100644
index 000000000000..a71b2ea1a238
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerBootstrap.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.Timeout;
+
+import static org.apache.hadoop.ozone.OzoneConsts.SCM_DUMMY_SERVICE_ID;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SERVER_REQUEST_TIMEOUT_DEFAULT;
+import static org.apache.hadoop.ozone.om.TestOzoneManagerHA.createKey;
+
+/**
+ * Test for OM bootstrap process.
+ */
+public class TestOzoneManagerBootstrap {
+
+ @Rule
+ public ExpectedException exception = ExpectedException.none();
+
+ @Rule
+ public Timeout timeout = new Timeout(500_000);
+
+ private MiniOzoneHAClusterImpl cluster = null;
+ private ObjectStore objectStore;
+ private OzoneConfiguration conf;
+ private final String clusterId = UUID.randomUUID().toString();
+ private final String scmId = UUID.randomUUID().toString();
+
+ private static final int NUM_INITIAL_OMS = 3;
+
+ private static final String OM_SERVICE_ID = "om-bootstrap";
+ private static final String VOLUME_NAME;
+ private static final String BUCKET_NAME;
+
+ private long lastTransactionIndex;
+
+ static {
+ VOLUME_NAME = "volume" + RandomStringUtils.randomNumeric(5);
+ BUCKET_NAME = "bucket" + RandomStringUtils.randomNumeric(5);
+ }
+
+ private void setupCluster() throws Exception {
+ setupCluster(NUM_INITIAL_OMS);
+ }
+
+ private void setupCluster(int numInitialOMs) throws Exception {
+ conf = new OzoneConfiguration();
+ cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf)
+ .setClusterId(clusterId)
+ .setScmId(scmId)
+ .setSCMServiceId(SCM_DUMMY_SERVICE_ID)
+ .setOMServiceId(OM_SERVICE_ID)
+ .setNumOfOzoneManagers(numInitialOMs)
+ .build();
+ cluster.waitForClusterToBeReady();
+ objectStore = OzoneClientFactory.getRpcClient(OM_SERVICE_ID, conf)
+ .getObjectStore();
+
+ // Perform some transactions
+ objectStore.createVolume(VOLUME_NAME);
+ OzoneVolume volume = objectStore.getVolume(VOLUME_NAME);
+ volume.createBucket(BUCKET_NAME);
+ OzoneBucket bucket = volume.getBucket(BUCKET_NAME);
+ createKey(bucket);
+
+ lastTransactionIndex = cluster.getOMLeader().getOmRatisServer()
+ .getOmStateMachine().getLastAppliedTermIndex().getIndex();
+ }
+
+ @After
+ public void shutdown() throws Exception {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ private void assertNewOMExistsInPeerList(String nodeId) throws Exception {
+ // Check that new peer exists in all OMs peers list and also in their Ratis
+ // server's peer list
+ for (OzoneManager om : cluster.getOzoneManagersList()) {
+ Assert.assertTrue("New OM node " + nodeId + " not present in Peer list " +
+ "of OM " + om.getOMNodeId(), om.doesPeerExist(nodeId));
+ Assert.assertTrue("New OM node " + nodeId + " not present in Peer list " +
+ "of OM " + om.getOMNodeId() + " RatisServer",
+ om.getOmRatisServer().doesPeerExist(nodeId));
+ Assert.assertTrue("New OM node " + nodeId + " not present in " +
+ "OM " + om.getOMNodeId() + "RatisServer's RaftConf",
+ om.getOmRatisServer().getCurrentPeersFromRaftConf().contains(nodeId));
+ }
+
+ OzoneManager newOM = cluster.getOzoneManager(nodeId);
+ GenericTestUtils.waitFor(() ->
+ newOM.getOmRatisServer().getLastAppliedTermIndex().getIndex()
+ >= lastTransactionIndex, 100, 100000);
+
+ // Check Ratis Dir for log files
+ File[] logFiles = getRatisLogFiles(newOM);
+ Assert.assertTrue("There are no ratis logs in new OM ",
+ logFiles.length > 0);
+ }
+
+ private File[] getRatisLogFiles(OzoneManager om) {
+ OzoneManagerRatisServer newOMRatisServer = om.getOmRatisServer();
+ File ratisDir = new File(newOMRatisServer.getRatisStorageDir(),
+ newOMRatisServer.getRaftGroupId().getUuid().toString());
+ File ratisLogDir = new File(ratisDir, Storage.STORAGE_DIR_CURRENT);
+ return ratisLogDir.listFiles(new FileFilter() {
+ @Override
+ public boolean accept(File pathname) {
+ return pathname.getName().startsWith("log");
+ }
+ });
+ }
+
+ private List testBootstrapOMs(int numNewOMs) throws Exception {
+ List newOMNodeIds = new ArrayList<>(numNewOMs);
+ for (int i = 1; i <= numNewOMs; i++) {
+ String nodeId = "omNode-bootstrap-" + i;
+ cluster.bootstrapOzoneManager(nodeId);
+ assertNewOMExistsInPeerList(nodeId);
+ newOMNodeIds.add(nodeId);
+ }
+ return newOMNodeIds;
+ }
+
+ /**
+ * Add 1 new OM to cluster.
+ * @throws Exception
+ */
+ @Test
+ public void testBootstrapOneNewOM() throws Exception {
+ setupCluster();
+ testBootstrapOMs(1);
+ }
+
+ /**
+ * Add 2 new OMs to cluster.
+ * @throws Exception
+ */
+ @Test
+ public void testBootstrapTwoNewOMs() throws Exception {
+ setupCluster();
+ testBootstrapOMs(2);
+ }
+
+ /**
+ * Add 2 new OMs to a 1 node OM cluster. Verify that one of the new OMs
+ * must becomes the leader by stopping the old OM.
+ */
+ @Test
+ public void testLeaderChangeToNewOM() throws Exception {
+ setupCluster(1);
+ OzoneManager oldOM = cluster.getOzoneManager();
+ List newOMNodeIds = testBootstrapOMs(2);
+
+ // Stop old OM
+ cluster.stopOzoneManager(oldOM.getOMNodeId());
+
+ // Wait for Leader Election timeout
+ Thread.sleep(OZONE_OM_RATIS_SERVER_REQUEST_TIMEOUT_DEFAULT
+ .toLong(TimeUnit.MILLISECONDS) * 3);
+
+ // Verify that one of the new OMs is the leader
+ GenericTestUtils.waitFor(() -> cluster.getOMLeader() != null, 500, 30000);
+ OzoneManager omLeader = cluster.getOMLeader();
+
+ Assert.assertTrue("New Bootstrapped OM not elected Leader even though " +
+ "other OMs are down", newOMNodeIds.contains(omLeader.getOMNodeId()));
+
+ // Perform some read and write operations with new OM leader
+ objectStore = OzoneClientFactory.getRpcClient(OM_SERVICE_ID, conf)
+ .getObjectStore();
+ OzoneVolume volume = objectStore.getVolume(VOLUME_NAME);
+ OzoneBucket bucket = volume.getBucket(BUCKET_NAME);
+ String key = createKey(bucket);
+
+ Assert.assertNotNull(bucket.getKey(key));
+ }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerConfiguration.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerConfiguration.java
index 2d96923e1f10..6e57445ee924 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerConfiguration.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerConfiguration.java
@@ -34,7 +34,7 @@
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.OzoneIllegalArgumentException;
import org.apache.hadoop.ozone.ha.ConfUtils;
-import org.apache.hadoop.ozone.om.ha.OMNodeDetails;
+import org.apache.hadoop.ozone.om.helpers.OMNodeDetails;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
import org.apache.ozone.test.GenericTestUtils;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
index 6c8bac278c3e..ffc1643f6e1a 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
@@ -73,6 +73,7 @@ public abstract class TestOzoneManagerHA {
private OzoneConfiguration conf;
private String clusterId;
private String scmId;
+ private String omId;
private String omServiceId;
private static int numOfOMs = 3;
private static final int LOG_PURGE_GAP = 50;
@@ -137,6 +138,7 @@ public void init() throws Exception {
clusterId = UUID.randomUUID().toString();
scmId = UUID.randomUUID().toString();
omServiceId = "om-service-test1";
+ omId = UUID.randomUUID().toString();
conf.setBoolean(OZONE_ACL_ENABLED, true);
conf.set(OzoneConfigKeys.OZONE_ADMINISTRATORS,
OZONE_ADMINISTRATORS_WILDCARD);
@@ -168,6 +170,7 @@ public void init() throws Exception {
.setClusterId(clusterId)
.setScmId(scmId)
.setOMServiceId(omServiceId)
+ .setOmId(omId)
.setNumOfOzoneManagers(numOfOMs)
.build();
cluster.waitForClusterToBeReady();
diff --git a/hadoop-ozone/interface-client/src/main/proto/OmInterServiceProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmInterServiceProtocol.proto
new file mode 100644
index 000000000000..49ef56357726
--- /dev/null
+++ b/hadoop-ozone/interface-client/src/main/proto/OmInterServiceProtocol.proto
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * These .proto interfaces are private and unstable.
+ * Please see http://wiki.apache.org/hadoop/Compatibility
+ * for what changes are allowed for a *unstable* .proto interface.
+ */
+
+syntax = "proto2";
+option java_package = "org.apache.hadoop.ozone.protocol.proto";
+option java_outer_classname = "OzoneManagerInterServiceProtocolProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+package hadoop.ozone;
+
+/**
+This file contains the protocol for communication between Ozone Managers in
+an HA setup.
+*/
+
+message BootstrapOMRequest {
+ required string nodeId = 1;
+ required string hostAddress = 2;
+ required uint32 ratisPort = 3;
+}
+
+message BootstrapOMResponse {
+ required bool success = 1;
+ optional ErrorCode errorCode = 2;
+ optional string errorMsg = 3;
+}
+
+enum ErrorCode {
+ RATIS_NOT_ENABLED = 1;
+ LEADER_UNDETERMINED = 2;
+ LEADER_NOT_READY = 3;
+ RATIS_BOOTSTRAP_ERROR = 4;
+ UNDEFINED_ERROR = 5;
+}
+
+/**
+ The OM service for OM-OM communication.
+*/
+service OzoneManagerInterService {
+ // RPC request from new OM to existing OM ring to bootstrap itself
+ rpc bootstrap(BootstrapOMRequest)
+ returns(BootstrapOMResponse);
+}
\ No newline at end of file
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMPolicyProvider.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMPolicyProvider.java
index 18a288f3cea4..f8f265e3c26f 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMPolicyProvider.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMPolicyProvider.java
@@ -21,12 +21,14 @@
import org.apache.hadoop.hdds.annotation.InterfaceAudience.Private;
import org.apache.hadoop.hdds.annotation.InterfaceStability;
import org.apache.hadoop.hdds.annotation.InterfaceStability.Unstable;
+import org.apache.hadoop.ozone.om.protocol.OMInterServiceProtocol;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.Service;
import java.util.concurrent.atomic.AtomicReference;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SECURITY_ADMIN_PROTOCOL_ACL;
import static org.apache.hadoop.ozone.om.OMConfigKeys
.OZONE_OM_SECURITY_CLIENT_PROTOCOL_ACL;
@@ -56,6 +58,8 @@ public static OMPolicyProvider getInstance() {
new Service[]{
new Service(OZONE_OM_SECURITY_CLIENT_PROTOCOL_ACL,
OzoneManagerProtocol.class),
+ new Service(OZONE_OM_SECURITY_ADMIN_PROTOCOL_ACL,
+ OMInterServiceProtocol.class)
};
@SuppressFBWarnings("EI_EXPOSE_REP")
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMStarterInterface.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMStarterInterface.java
index 1ffdaa206bd7..bafeec337ecf 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMStarterInterface.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMStarterInterface.java
@@ -30,6 +30,8 @@ void start(OzoneConfiguration conf) throws IOException,
AuthenticationException;
boolean init(OzoneConfiguration conf) throws IOException,
AuthenticationException;
+ void bootstrap(OzoneConfiguration conf) throws IOException,
+ AuthenticationException;
void startAndCancelPrepare(OzoneConfiguration conf) throws IOException,
AuthenticationException;
}
\ No newline at end of file
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 82eb2f8f7b7a..393632e2880a 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -97,6 +97,7 @@
import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.OzoneIllegalArgumentException;
import org.apache.hadoop.ozone.OzoneSecurityUtil;
import org.apache.hadoop.ozone.audit.AuditAction;
import org.apache.hadoop.ozone.audit.AuditEventStatus;
@@ -111,7 +112,7 @@
import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
import org.apache.hadoop.ozone.om.ha.OMHANodeDetails;
-import org.apache.hadoop.ozone.om.ha.OMNodeDetails;
+import org.apache.hadoop.ozone.om.helpers.OMNodeDetails;
import org.apache.hadoop.ozone.om.helpers.DBUpdates;
import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
@@ -133,7 +134,10 @@
import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
import org.apache.hadoop.ozone.om.helpers.ServiceInfoEx;
+import org.apache.hadoop.ozone.om.protocol.OMInterServiceProtocol;
+import org.apache.hadoop.ozone.om.protocolPB.OMInterServiceProtocolClientSideImpl;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.hadoop.ozone.om.protocolPB.OMInterServiceProtocolPB;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
import org.apache.hadoop.ozone.common.ha.ratis.RatisSnapshotInfo;
import org.apache.hadoop.hdds.security.OzoneSecurityException;
@@ -150,6 +154,7 @@
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRoleInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServicePort;
+import org.apache.hadoop.ozone.protocolPB.OMInterServiceProtocolServerSideImpl;
import org.apache.hadoop.ozone.storage.proto.OzoneManagerStorageProtos.PersistedUserVolumeInfo;
import org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB;
import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager;
@@ -246,12 +251,11 @@
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_ERROR_OTHER;
import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
import static org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer.RaftServerStatus.LEADER_AND_READY;
-import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneManagerService.newReflectiveBlockingService;
+import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerInterServiceProtocolProtos.OzoneManagerInterService;
+import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneManagerService;
import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareStatusResponse.PrepareStatus;
-
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.server.protocol.TermIndex;
-import org.apache.ratis.util.ExitUtils;
import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.LifeCycle;
import org.bouncycastle.pkcs.PKCS10CertificationRequest;
@@ -263,7 +267,8 @@
*/
@InterfaceAudience.LimitedPrivate({"HDFS", "CBLOCK", "OZONE", "HBASE"})
public final class OzoneManager extends ServiceRuntimeInfoImpl
- implements OzoneManagerProtocol, OMMXBean, Auditor {
+ implements OzoneManagerProtocol, OMInterServiceProtocol,
+ OMMXBean, Auditor {
public static final Logger LOG =
LoggerFactory.getLogger(OzoneManager.class);
@@ -280,7 +285,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
private List caCertPemList = new ArrayList<>();
private static boolean testSecureOmFlag = false;
private final Text omRpcAddressTxt;
- private final OzoneConfiguration configuration;
+ private OzoneConfiguration configuration;
private RPC.Server omRpcServer;
private InetSocketAddress omRpcAddress;
private String omId;
@@ -328,7 +333,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
private OzoneManagerRatisServer omRatisServer;
private OzoneManagerSnapshotProvider omSnapshotProvider;
private OMNodeDetails omNodeDetails;
- private List peerNodes;
+ private Map peerNodesMap;
private File omRatisSnapshotDir;
private final RatisSnapshotInfo omRatisSnapshotInfo;
private final Map ratisMetricsMap =
@@ -358,12 +363,23 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
private OzoneManagerPrepareState prepareState;
+ /**
+ * OM Startup mode.
+ */
+ public enum StartupOption {
+ REGUALR,
+ BOOTSTRAP
+ }
+
private enum State {
INITIALIZED,
+ BOOTSTRAPPING,
RUNNING,
STOPPED
}
+ private boolean isBootstrapping = false;
+
// Used in MiniOzoneCluster testing
private State omState;
private Thread emptier;
@@ -371,16 +387,16 @@ private enum State {
private static final int MSECS_PER_MINUTE = 60 * 1000;
@SuppressWarnings("methodlength")
- private OzoneManager(OzoneConfiguration conf) throws IOException,
- AuthenticationException {
+ private OzoneManager(OzoneConfiguration conf, StartupOption startupOption)
+ throws IOException, AuthenticationException {
super(OzoneVersionInfo.OZONE_VERSION_INFO);
Preconditions.checkNotNull(conf);
- configuration = conf;
+ setConfiguration(conf);
// Load HA related configurations
OMHANodeDetails omhaNodeDetails =
OMHANodeDetails.loadOMHAConfig(configuration);
- this.peerNodes = omhaNodeDetails.getPeerNodeDetails();
+ this.peerNodesMap = omhaNodeDetails.getPeerNodesMap();
this.omNodeDetails = omhaNodeDetails.getLocalNodeDetails();
omStorage = new OMStorage(conf);
@@ -389,6 +405,17 @@ private OzoneManager(OzoneConfiguration conf) throws IOException,
versionManager = new OMLayoutVersionManager(omStorage.getLayoutVersion());
upgradeFinalizer = new OMUpgradeFinalizer(versionManager);
+ exitManager = new ExitManager();
+
+ // In case of single OM Node Service there will be no OM Node ID
+ // specified, set it to value from om storage
+ if (this.omNodeDetails.getNodeId() == null) {
+ this.omNodeDetails = OMHANodeDetails.getOMNodeDetails(conf,
+ omNodeDetails.getServiceId(),
+ omStorage.getOmId(), omNodeDetails.getRpcAddress(),
+ omNodeDetails.getRatisPort());
+ }
+
loginOMUserIfSecurityEnabled(conf);
this.allowListAllVolumes = conf.getBoolean(OZONE_OM_VOLUME_LISTALL_ALLOWED,
@@ -478,40 +505,14 @@ private OzoneManager(OzoneConfiguration conf) throws IOException,
// Create special volume s3v which is required for S3G.
addS3GVolumeToDB();
- this.omRatisSnapshotInfo = new RatisSnapshotInfo();
-
- if (isRatisEnabled) {
- // Create Ratis storage dir
- String omRatisDirectory =
- OzoneManagerRatisServer.getOMRatisDirectory(configuration);
- if (omRatisDirectory == null || omRatisDirectory.isEmpty()) {
- throw new IllegalArgumentException(HddsConfigKeys.OZONE_METADATA_DIRS +
- " must be defined.");
- }
- OmUtils.createOMDir(omRatisDirectory);
-
- // Create Ratis snapshot dir
- omRatisSnapshotDir = OmUtils.createOMDir(
- OzoneManagerRatisServer.getOMRatisSnapshotDirectory(configuration));
-
- // Before starting ratis server, check if previous installation has
- // snapshot directory in Ratis storage directory. if yes, move it to
- // new snapshot directory.
-
- File snapshotDir = new File(omRatisDirectory, OM_RATIS_SNAPSHOT_DIR);
-
- if (snapshotDir.isDirectory()) {
- FileUtils.moveDirectory(snapshotDir.toPath(),
- omRatisSnapshotDir.toPath());
- }
-
- if (peerNodes != null && !peerNodes.isEmpty()) {
- this.omSnapshotProvider = new OzoneManagerSnapshotProvider(
- configuration, omRatisSnapshotDir, peerNodes);
- }
+ if (startupOption == StartupOption.BOOTSTRAP) {
+ isBootstrapping = true;
}
- initializeRatisServer();
+ this.omRatisSnapshotInfo = new RatisSnapshotInfo();
+
+ initializeRatisDirs(conf);
+ initializeRatisServer(isBootstrapping);
metrics = OMMetrics.create();
omClientProtocolMetrics = ProtocolMessageMetrics
@@ -531,6 +532,24 @@ private OzoneManager(OzoneConfiguration conf) throws IOException,
omState = State.INITIALIZED;
}
+ /**
+ * Constructs OM instance based on the configuration.
+ *
+ * @param conf OzoneConfiguration
+ * @return OM instance
+ * @throws IOException, AuthenticationException in case OM instance
+ * creation fails.
+ */
+ public static OzoneManager createOm(OzoneConfiguration conf)
+ throws IOException, AuthenticationException {
+ return new OzoneManager(conf, StartupOption.REGUALR);
+ }
+
+ public static OzoneManager createOm(OzoneConfiguration conf,
+ StartupOption startupOption) throws IOException, AuthenticationException {
+ return new OzoneManager(conf, startupOption);
+ }
+
private void logVersionMismatch(OzoneConfiguration conf, ScmInfo scmInfo) {
List scmNodeInfoList = SCMNodeInfo.buildNodeInfo(conf);
StringBuilder scmBlockAddressBuilder = new StringBuilder("");
@@ -692,6 +711,13 @@ public void close() throws IOException {
stop();
}
+ public void shutdown(Exception ex) throws IOException {
+ if (omState != State.STOPPED) {
+ stop();
+ exitManager.exitSystem(1, ex.getLocalizedMessage(), ex, LOG);
+ }
+ }
+
/**
* Class which schedule saving metrics to a file.
*/
@@ -901,22 +927,57 @@ private static StorageContainerLocationProtocol getScmContainerClient(
}
/**
- * Starts an RPC server, if configured.
+ * Creates a new instance of rpc server. If an earlier instance is already
+ * running then returns the same.
+ */
+ private RPC.Server getRpcServer(OzoneConfiguration conf) throws IOException {
+ if (isOmRpcServerRunning) {
+ return omRpcServer;
+ }
+
+ InetSocketAddress omNodeRpcAddr = OmUtils.getOmAddress(conf);
+
+ final int handlerCount = conf.getInt(OZONE_OM_HANDLER_COUNT_KEY,
+ OZONE_OM_HANDLER_COUNT_DEFAULT);
+ RPC.setProtocolEngine(configuration, OzoneManagerProtocolPB.class,
+ ProtobufRpcEngine.class);
+
+ this.omServerProtocol = new OzoneManagerProtocolServerSideTranslatorPB(
+ this, omRatisServer, omClientProtocolMetrics, isRatisEnabled,
+ getLastTrxnIndexForNonRatis());
+ BlockingService omService =
+ OzoneManagerService.newReflectiveBlockingService(omServerProtocol);
+
+ OMInterServiceProtocolServerSideImpl omInterServerProtocol =
+ new OMInterServiceProtocolServerSideImpl(omRatisServer,
+ isRatisEnabled);
+ BlockingService omInterService =
+ OzoneManagerInterService.newReflectiveBlockingService(
+ omInterServerProtocol);
+
+ return startRpcServer(configuration, omNodeRpcAddr, omService,
+ omInterService, handlerCount);
+ }
+
+ /**
*
- * @param conf configuration
- * @param addr configured address of RPC server
- * @param protocol RPC protocol provided by RPC server
- * @param instance RPC protocol implementation instance
+ * @param conf configuration
+ * @param addr configured address of RPC server
+ * @param clientProtocolService RPC protocol for client communication
+ * (OzoneManagerProtocolPB impl)
+ * @param interOMProtocolService RPC protocol for inter OM communication
+ * (OMInterServiceProtocolPB impl)
* @param handlerCount RPC server handler count
* @return RPC server
* @throws IOException if there is an I/O error while creating RPC server
*/
private RPC.Server startRpcServer(OzoneConfiguration conf,
- InetSocketAddress addr, Class> protocol, BlockingService instance,
- int handlerCount) throws IOException {
+ InetSocketAddress addr, BlockingService clientProtocolService,
+ BlockingService interOMProtocolService, int handlerCount)
+ throws IOException {
RPC.Server rpcServer = new RPC.Builder(conf)
- .setProtocol(protocol)
- .setInstance(instance)
+ .setProtocol(OzoneManagerProtocolPB.class)
+ .setInstance(clientProtocolService)
.setBindAddress(addr.getHostString())
.setPort(addr.getPort())
.setNumHandlers(handlerCount)
@@ -924,7 +985,8 @@ private RPC.Server startRpcServer(OzoneConfiguration conf,
.setSecretManager(delegationTokenMgr)
.build();
- HddsServerUtil.addPBProtocol(conf, protocol, instance, rpcServer);
+ HddsServerUtil.addPBProtocol(conf, OMInterServiceProtocolPB.class,
+ interOMProtocolService, rpcServer);
if (conf.getBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION,
false)) {
@@ -941,19 +1003,6 @@ private static boolean isOzoneSecurityEnabled() {
return securityEnabled;
}
- /**
- * Constructs OM instance based on the configuration.
- *
- * @param conf OzoneConfiguration
- * @return OM instance
- * @throws IOException, AuthenticationException in case OM instance
- * creation fails.
- */
- public static OzoneManager createOm(OzoneConfiguration conf)
- throws IOException, AuthenticationException {
- return new OzoneManager(conf);
- }
-
/**
* Logs in the OM user if security is enabled in the configuration.
*
@@ -1067,6 +1116,39 @@ public static void initializeSecurity(OzoneConfiguration conf,
}
}
+ private void initializeRatisDirs(OzoneConfiguration conf) throws IOException {
+ if (isRatisEnabled) {
+ // Create Ratis storage dir
+ String omRatisDirectory =
+ OzoneManagerRatisUtils.getOMRatisDirectory(conf);
+ if (omRatisDirectory == null || omRatisDirectory.isEmpty()) {
+ throw new IllegalArgumentException(HddsConfigKeys.OZONE_METADATA_DIRS +
+ " must be defined.");
+ }
+ OmUtils.createOMDir(omRatisDirectory);
+
+ // Create Ratis snapshot dir
+ omRatisSnapshotDir = OmUtils.createOMDir(
+ OzoneManagerRatisUtils.getOMRatisSnapshotDirectory(conf));
+
+ // Before starting ratis server, check if previous installation has
+ // snapshot directory in Ratis storage directory. if yes, move it to
+ // new snapshot directory.
+
+ File snapshotDir = new File(omRatisDirectory, OM_RATIS_SNAPSHOT_DIR);
+
+ if (snapshotDir.isDirectory()) {
+ FileUtils.moveDirectory(snapshotDir.toPath(),
+ omRatisSnapshotDir.toPath());
+ }
+
+ if (peerNodesMap != null && !peerNodesMap.isEmpty()) {
+ this.omSnapshotProvider = new OzoneManagerSnapshotProvider(
+ configuration, omRatisSnapshotDir, peerNodesMap);
+ }
+ }
+ }
+
/**
* Builds a message for logging startup information about an RPC server.
*
@@ -1223,6 +1305,11 @@ public void start() throws IOException {
startJVMPauseMonitor();
setStartTime();
+ if (isBootstrapping) {
+ omState = State.BOOTSTRAPPING;
+ bootstrap(omNodeDetails);
+ }
+
omState = State.RUNNING;
}
@@ -1261,7 +1348,7 @@ public void restart() throws IOException {
metricsTimer = new Timer();
metricsTimer.schedule(scheduleOMMetricsWriteTask, 0, period);
- initializeRatisServer();
+ initializeRatisServer(false);
if (omRatisServer != null) {
omRatisServer.start();
}
@@ -1286,6 +1373,113 @@ public void restart() throws IOException {
omState = State.RUNNING;
}
+ @Override
+ public void bootstrap(OMNodeDetails newOMNode) throws IOException {
+ // Create InterOmServiceProtocol client to send request to other OMs
+ if (isRatisEnabled) {
+ try (OMInterServiceProtocolClientSideImpl omInterServiceProtocol =
+ new OMInterServiceProtocolClientSideImpl(configuration,
+ getRemoteUser(), getOMServiceId())) {
+
+ omInterServiceProtocol.bootstrap(omNodeDetails);
+
+ LOG.info("Successfully bootstrapped OM {} and joined the Ratis group " +
+ "{}", getOMNodeId(), omRatisServer.getRaftGroup());
+ }
+ } else {
+ throw new IOException("OzoneManager can be bootstrapped only when ratis" +
+ " is enabled and there is atleast one OzoneManager to bootstrap" +
+ " from.");
+ }
+ }
+
+ /**
+ * When OMStateMachine receives a configuration change update, it calls
+ * this function to update the peers list, if required.
+ */
+ public void updatePeerList(List omNodeIds) {
+ List ratisServerPeerIdsList = omRatisServer.getPeerIds();
+ for (String omNodeId : omNodeIds) {
+ // Check if the OM NodeID is already present in the peer list or its
+ // the local NodeID.
+ if (!peerNodesMap.containsKey(omNodeId) && !isCurrentNode(omNodeId)) {
+ addOMNodeToPeers(omNodeId);
+ } else {
+ // Check if the OMNodeID is present in the RatisServer's peer list
+ if (!ratisServerPeerIdsList.contains(omNodeId)) {
+ // This can happen on a bootstrapping OM. The peer information
+ // would be present in OzoneManager but OMRatisServer peer list
+ // would not have the peers list. OMRatisServer peer list of
+ // bootstrapping node should be updated after it gets the RaftConf
+ // through Ratis.
+ if (isCurrentNode(omNodeId)) {
+ // OM Ratis server has the current node also in the peer list as
+ // this is the Raft Group peers list. Hence, add the current node
+ // also to Ratis peers list if not present.
+ omRatisServer.addRaftPeer(omNodeDetails);
+ } else {
+ omRatisServer.addRaftPeer(peerNodesMap.get(omNodeId));
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Check if the given nodeId is the current nodeId.
+ */
+ private boolean isCurrentNode(String omNodeID) {
+ return getOMNodeId().equals(omNodeID);
+ }
+
+ /**
+ * Add an OM Node to the peers list. This call comes from OMStateMachine
+ * after a SetConfiguration request has been successfully executed by the
+ * Ratis server.
+ */
+ public void addOMNodeToPeers(String newOMNodeId) {
+ OMNodeDetails newOMNodeDetails = OMNodeDetails.getOMNodeDetailsFromConf(
+ getConfiguration(), getOMServiceId(), newOMNodeId);
+ if (newOMNodeDetails == null) {
+ // Load new configuration object to read in new peer information
+ setConfiguration(new OzoneConfiguration());
+ newOMNodeDetails = OMNodeDetails.getOMNodeDetailsFromConf(
+ getConfiguration(), getOMServiceId(), newOMNodeId);
+
+ if (newOMNodeDetails == null) {
+ // If new node information is not present in the newly loaded
+ // configuration also, throw an exception
+ throw new OzoneIllegalArgumentException("There is no OM configuration "
+ + "for node ID " + newOMNodeId + " in ozone-site.xml.");
+ }
+ }
+
+ if (omSnapshotProvider == null) {
+ omSnapshotProvider = new OzoneManagerSnapshotProvider(
+ configuration, omRatisSnapshotDir, peerNodesMap);
+ } else {
+ omSnapshotProvider.addNewPeerNode(newOMNodeDetails);
+ }
+ omRatisServer.addRaftPeer(newOMNodeDetails);
+ peerNodesMap.put(newOMNodeId, newOMNodeDetails);
+ LOG.info("Added OM {} to the Peer list.", newOMNodeId);
+ }
+
+ /**
+ * Check if the input nodeId exists in the peers list.
+ * @return true if the nodeId is self or it exists in peer node list,
+ * false otherwise.
+ */
+ @VisibleForTesting
+ public boolean doesPeerExist(String omNodeId) {
+ if (getOMNodeId().equals(omNodeId)) {
+ return true;
+ }
+ if (peerNodesMap != null && !peerNodesMap.isEmpty()) {
+ return peerNodesMap.containsKey(omNodeId);
+ }
+ return false;
+ }
/**
* Starts a Trash Emptier thread that does an fs.trashRoots and performs
@@ -1325,43 +1519,25 @@ public FileSystem run() throws IOException {
}
/**
- * Creates a new instance of rpc server. If an earlier instance is already
- * running then returns the same.
+ * Creates an instance of ratis server.
*/
- private RPC.Server getRpcServer(OzoneConfiguration conf) throws IOException {
- if (isOmRpcServerRunning) {
- return omRpcServer;
- }
-
- InetSocketAddress omNodeRpcAddr = OmUtils.getOmAddress(conf);
-
- final int handlerCount = conf.getInt(OZONE_OM_HANDLER_COUNT_KEY,
- OZONE_OM_HANDLER_COUNT_DEFAULT);
- RPC.setProtocolEngine(configuration, OzoneManagerProtocolPB.class,
- ProtobufRpcEngine.class);
- this.omServerProtocol = new OzoneManagerProtocolServerSideTranslatorPB(
- this, omRatisServer, omClientProtocolMetrics, isRatisEnabled,
- getLastTrxnIndexForNonRatis());
-
- BlockingService omService = newReflectiveBlockingService(omServerProtocol);
-
- return startRpcServer(configuration, omNodeRpcAddr,
- OzoneManagerProtocolPB.class, omService,
- handlerCount);
- }
-
/**
* Creates an instance of ratis server.
+ * @param shouldBootstrap If OM is started in Bootstrap mode, then Ratis
+ * server will be initialized without adding self to
+ * Ratis group
+ * @throws IOException
*/
- private void initializeRatisServer() throws IOException {
+ private void initializeRatisServer(boolean shouldBootstrap)
+ throws IOException {
if (isRatisEnabled) {
if (omRatisServer == null) {
// This needs to be done before initializing Ratis.
RatisDropwizardExports.
registerRatisMetricReporters(ratisMetricsMap);
omRatisServer = OzoneManagerRatisServer.newOMRatisServer(
- configuration, this, omNodeDetails, peerNodes,
- secConfig, certClient);
+ configuration, this, omNodeDetails, peerNodesMap,
+ secConfig, certClient, shouldBootstrap);
}
LOG.info("OzoneManager Ratis server initialized at port {}",
omRatisServer.getServerPort());
@@ -1422,7 +1598,9 @@ public long getRatisSnapshotIndex() throws IOException {
* Stop service.
*/
public void stop() {
+ LOG.info("Stopping Ozone Manager");
try {
+ omState = State.STOPPED;
// Cancel the metrics timer and set to null.
if (metricsTimer != null) {
metricsTimer.cancel();
@@ -2747,7 +2925,7 @@ public List getServiceList() throws IOException {
.build());
}
- for (OMNodeDetails peerNode : peerNodes) {
+ for (OMNodeDetails peerNode : peerNodesMap.values()) {
ServiceInfo.Builder peerOmServiceInfoBuilder = ServiceInfo.newBuilder()
.setNodeType(HddsProtos.NodeType.OM)
.setHostname(peerNode.getHostName())
@@ -3546,7 +3724,7 @@ File replaceOMDBWithCheckpoint(long lastAppliedIndex, File oldDB,
} catch (IOException ex) {
String errorMsg = "Failed to reset to original DB. OM is in an " +
"inconsistent state.";
- ExitUtils.terminate(1, errorMsg, ex, LOG);
+ exitManager.exitSystem(1, errorMsg, ex, LOG);
}
throw e;
}
@@ -3594,6 +3772,11 @@ public OzoneConfiguration getConfiguration() {
return configuration;
}
+ @VisibleForTesting
+ public void setConfiguration(OzoneConfiguration conf) {
+ this.configuration = conf;
+ }
+
public static void setTestSecureOmFlag(boolean testSecureOmFlag) {
OzoneManager.testSecureOmFlag = testSecureOmFlag;
}
@@ -3608,7 +3791,7 @@ public String getOMServiceId() {
@VisibleForTesting
public List getPeerNodes() {
- return peerNodes;
+ return new ArrayList<>(peerNodesMap.values());
}
@VisibleForTesting
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerStarter.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerStarter.java
index 39069a85b804..e087fdbe32bc 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerStarter.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerStarter.java
@@ -123,6 +123,26 @@ public void startOmUpgrade() throws Exception {
}
}
+ /**
+ * This function implements a sub-command to allow the OM to be bootstrapped
+ * initialized from the command line. After OM is initialized, it will
+ * contact the leader OM to add itself to the ring. Once the leader OM
+ * responds back affirmatively, bootstrap step is complete and the OM is
+ * functional.
+ */
+ @CommandLine.Command(name = "--bootstrap",
+ customSynopsis = "ozone om [global options] --bootstrap",
+ hidden = false,
+ description = "Initialize if not already initialized and Bootstrap " +
+ "the Ozone Manager",
+ mixinStandardHelpOptions = true,
+ versionProvider = HddsVersionProvider.class)
+ public void bootstrapOM()
+ throws Exception {
+ commonInit();
+ receiver.bootstrap(conf);
+ }
+
/**
* This function should be called by each command to ensure the configuration
* is set and print the startup banner message.
@@ -165,6 +185,21 @@ public boolean init(OzoneConfiguration conf) throws IOException,
return OzoneManager.omInit(conf);
}
+ @Override
+ public void bootstrap(OzoneConfiguration conf)
+ throws IOException, AuthenticationException {
+ // Initialize the Ozone Manager, if not already initialized.
+ boolean initialize = OzoneManager.omInit(conf);
+ if (!initialize) {
+ throw new IOException("OM Init failed.");
+ }
+ // Bootstrap the OM
+ OzoneManager om = OzoneManager.createOm(conf,
+ OzoneManager.StartupOption.BOOTSTRAP);
+ om.start();
+ om.join();
+ }
+
@Override
public void startAndCancelPrepare(OzoneConfiguration conf)
throws IOException, AuthenticationException {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ha/OMHANodeDetails.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ha/OMHANodeDetails.java
index dd44868a56f1..63fdf2b6b819 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ha/OMHANodeDetails.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ha/OMHANodeDetails.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.ozone.om.ha;
import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.OmUtils;
@@ -25,6 +27,7 @@
import org.apache.hadoop.ozone.OzoneIllegalArgumentException;
import org.apache.hadoop.ozone.ha.ConfUtils;
import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.helpers.OMNodeDetails;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,8 +74,12 @@ public OMNodeDetails getLocalNodeDetails() {
return localNodeDetails;
}
- public List< OMNodeDetails > getPeerNodeDetails() {
- return peerNodeDetails;
+ public Map getPeerNodesMap() {
+ Map peerNodesMap = new HashMap<>();
+ for (OMNodeDetails peeNode : peerNodeDetails) {
+ peerNodesMap.put(peeNode.getNodeId(), peeNode);
+ }
+ return peerNodesMap;
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
index 551a2847dc11..c187d6d5358d 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
@@ -18,25 +18,30 @@
package org.apache.hadoop.ozone.om.ratis;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.ServiceException;
+
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
-import java.nio.file.Paths;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.TimeUnit;
-import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
-import org.apache.hadoop.hdds.server.ServerUtils;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.hdds.utils.HAUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine.Server;
@@ -45,17 +50,14 @@
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
-import org.apache.hadoop.ozone.om.ha.OMNodeDetails;
+import org.apache.hadoop.ozone.om.helpers.OMNodeDetails;
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Strings;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.ServiceException;
import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
@@ -63,6 +65,7 @@
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.SetConfigurationRequest;
import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.protocol.exceptions.StateMachineException;
@@ -85,12 +88,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
import static org.apache.hadoop.ipc.RpcConstants.DUMMY_CLIENT_ID;
import static org.apache.hadoop.ipc.RpcConstants.INVALID_CALL_ID;
-import static org.apache.hadoop.ozone.OzoneConsts.OM_RATIS_SNAPSHOT_DIR;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HA_PREFIX;
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_DIR;
/**
* Creates a Ratis server endpoint for OM.
@@ -105,9 +105,130 @@ public final class OzoneManagerRatisServer {
private final RaftGroupId raftGroupId;
private final RaftGroup raftGroup;
private final RaftPeerId raftPeerId;
+ private final List raftPeers;
private final OzoneManager ozoneManager;
private final OzoneManagerStateMachine omStateMachine;
+ private final String ratisStorageDir;
+
+ private final ClientId clientId = ClientId.randomId();
+ private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
+
+ private static long nextCallId() {
+ return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
+ }
+
+ /**
+ * Returns an OM Ratis server.
+ * @param conf configuration
+ * @param om the OM instance starting the ratis server
+ * @param raftGroupIdStr raft group id string
+ * @param localRaftPeerId raft peer id of this Ratis server
+ * @param addr address of the ratis server
+ * @param peers peer nodes in the raft ring
+ * @throws IOException
+ */
+ @SuppressWarnings({"parameternumber", "java:S107"})
+ private OzoneManagerRatisServer(ConfigurationSource conf, OzoneManager om,
+ String raftGroupIdStr, RaftPeerId localRaftPeerId,
+ InetSocketAddress addr, List peers, boolean isBootstrapping,
+ SecurityConfig secConfig, CertificateClient certClient)
+ throws IOException {
+ this.ozoneManager = om;
+ this.omRatisAddress = addr;
+ this.port = addr.getPort();
+ this.ratisStorageDir = OzoneManagerRatisUtils.getOMRatisDirectory(conf);
+ RaftProperties serverProperties = newRaftProperties(conf);
+
+ this.raftPeerId = localRaftPeerId;
+ this.raftGroupId = RaftGroupId.valueOf(
+ getRaftGroupIdFromOmServiceId(raftGroupIdStr));
+ this.raftPeers = Lists.newArrayList();
+ this.raftPeers.addAll(peers);
+ this.raftGroup = RaftGroup.valueOf(raftGroupId, peers);
+
+ if (isBootstrapping) {
+ LOG.info("OM started in Bootstrap mode. Instantiating OM Ratis server " +
+ "with groupID: {}", raftGroupIdStr);
+ } else {
+ StringBuilder raftPeersStr = new StringBuilder();
+ for (RaftPeer peer : peers) {
+ raftPeersStr.append(", ").append(peer.getAddress());
+ }
+ LOG.info("Instantiating OM Ratis server with groupID: {} and peers: {}",
+ raftGroupIdStr, raftPeersStr.toString().substring(2));
+ }
+ this.omStateMachine = getStateMachine(conf);
+
+ Parameters parameters = createServerTlsParameters(secConfig, certClient);
+ this.server = RaftServer.newBuilder()
+ .setServerId(this.raftPeerId)
+ .setGroup(this.raftGroup)
+ .setProperties(serverProperties)
+ .setParameters(parameters)
+ .setStateMachine(omStateMachine)
+ .build();
+ }
+
+ /**
+ * Creates an instance of OzoneManagerRatisServer.
+ */
+ public static OzoneManagerRatisServer newOMRatisServer(
+ ConfigurationSource ozoneConf, OzoneManager omProtocol,
+ OMNodeDetails omNodeDetails, Map peerNodes,
+ SecurityConfig secConfig, CertificateClient certClient,
+ boolean isBootstrapping) throws IOException {
+
+ // RaftGroupId is the omServiceId
+ String omServiceId = omNodeDetails.getServiceId();
+
+ String omNodeId = omNodeDetails.getNodeId();
+ RaftPeerId localRaftPeerId = RaftPeerId.getRaftPeerId(omNodeId);
+
+ InetSocketAddress ratisAddr = new InetSocketAddress(
+ omNodeDetails.getInetAddress(), omNodeDetails.getRatisPort());
+
+ RaftPeer localRaftPeer = RaftPeer.newBuilder()
+ .setId(localRaftPeerId)
+ .setAddress(ratisAddr)
+ .build();
+
+ // If OM is started in bootstrap mode, do not add peers to the RaftGroup.
+ // Raft peers will be added after SetConfiguration transaction is
+ // committed by leader and propagated to followers.
+ List raftPeers = new ArrayList<>();
+ if (!isBootstrapping) {
+ // On regular startup, add all OMs to Ratis ring
+ raftPeers.add(localRaftPeer);
+
+ for (Map.Entry peerInfo : peerNodes.entrySet()) {
+ String peerNodeId = peerInfo.getKey();
+ OMNodeDetails peerNode = peerInfo.getValue();
+ RaftPeerId raftPeerId = RaftPeerId.valueOf(peerNodeId);
+ RaftPeer raftPeer;
+ if (peerNode.isHostUnresolved()) {
+ raftPeer = RaftPeer.newBuilder()
+ .setId(raftPeerId)
+ .setAddress(peerNode.getRatisHostPortStr())
+ .build();
+ } else {
+ InetSocketAddress peerRatisAddr = new InetSocketAddress(
+ peerNode.getInetAddress(), peerNode.getRatisPort());
+ raftPeer = RaftPeer.newBuilder()
+ .setId(raftPeerId)
+ .setAddress(peerRatisAddr)
+ .build();
+ }
+
+ // Add other OM nodes belonging to the same OM service to the Ratis ring
+ raftPeers.add(raftPeer);
+ }
+ }
+
+ return new OzoneManagerRatisServer(ozoneConf, omProtocol, omServiceId,
+ localRaftPeerId, ratisAddr, raftPeers, isBootstrapping, secConfig,
+ certClient);
+ }
/**
* Submit request to Ratis server.
@@ -165,6 +286,93 @@ private RaftClientReply submitRequestToRatis(
}
}
+ /**
+ * Add new OM to the Ratis ring.
+ */
+ public void addOMToRatisRing(OMNodeDetails newOMNode) throws IOException {
+
+ Preconditions.checkNotNull(newOMNode);
+
+ String newOMNodeId = newOMNode.getNodeId();
+ RaftPeerId newOMRaftPeerId = RaftPeerId.valueOf(newOMNodeId);
+ InetSocketAddress newOMRatisAddr = new InetSocketAddress(
+ newOMNode.getHostAddress(), newOMNode.getRatisPort());
+ RaftPeer newRaftPeer = RaftPeer.newBuilder()
+ .setId(newOMRaftPeerId)
+ .setAddress(newOMRatisAddr)
+ .build();
+
+ LOG.info("{}: Submitting SetConfiguration request to Ratis server to add" +
+ " new OM peer {} to the Ratis group {}", ozoneManager.getOMNodeId(),
+ newRaftPeer, raftGroup);
+
+ List newPeersList = new ArrayList<>();
+ newPeersList.addAll(raftPeers);
+ newPeersList.add(newRaftPeer);
+
+ checkLeaderStatus();
+ SetConfigurationRequest request = new SetConfigurationRequest(clientId,
+ server.getId(), raftGroupId, nextCallId(), newPeersList);
+
+ try {
+ RaftClientReply raftClientReply = server.setConfiguration(request);
+ if (raftClientReply.isSuccess()) {
+ LOG.info("Added OM {} to Ratis group {}.", newOMNodeId, raftGroupId);
+ } else {
+ LOG.error("Failed to add OM {} to Ratis group {}. Ratis " +
+ "SetConfiguration reply: {}", newOMNodeId, raftGroupId,
+ raftClientReply);
+ throw new IOException("Failed to add OM " + newOMNodeId + " to Ratis " +
+ "ring.");
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to update Ratis configuration and add OM {} to " +
+ "Ratis group {}", newOMNodeId, raftGroupId, e);
+ throw e;
+ }
+ }
+
+ /**
+ * Return a list of peer NodeIds.
+ */
+ public List getPeerIds() {
+ List peerIds = new ArrayList<>();
+ for (RaftPeer raftPeer : raftPeers) {
+ peerIds.add(raftPeer.getId().toString());
+ }
+ return peerIds;
+ }
+
+ /**
+ * Check if the input peerId exists in the peers list.
+ * @return true if the nodeId is self or it exists in peer node list,
+ * false otherwise.
+ */
+ @VisibleForTesting
+ public boolean doesPeerExist(String peerId) {
+ for (RaftPeer raftPeer : raftPeers) {
+ if (raftPeer.getId().toString().equals(peerId)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Add given node to list of RaftPeers.
+ */
+ public void addRaftPeer(OMNodeDetails omNodeDetails) {
+ InetSocketAddress newOMRatisAddr = new InetSocketAddress(
+ omNodeDetails.getHostAddress(), omNodeDetails.getRatisPort());
+
+ raftPeers.add(RaftPeer.newBuilder()
+ .setId(RaftPeerId.valueOf(omNodeDetails.getNodeId()))
+ .setAddress(newOMRatisAddr)
+ .build());
+
+ LOG.info("Added OM {} to Ratis Peers list.", omNodeDetails.getNodeId());
+ }
+
/**
* Create Write RaftClient request from OMRequest.
* @param omRequest
@@ -273,106 +481,6 @@ private OzoneManagerProtocolProtos.Status exceptionToResponseStatus(
}
}
-
- /**
- * Returns an OM Ratis server.
- * @param conf configuration
- * @param om the OM instance starting the ratis server
- * @param raftGroupIdStr raft group id string
- * @param localRaftPeerId raft peer id of this Ratis server
- * @param addr address of the ratis server
- * @param raftPeers peer nodes in the raft ring
- * @throws IOException
- */
- @SuppressWarnings({"parameternumber", "java:S107"})
- private OzoneManagerRatisServer(ConfigurationSource conf,
- OzoneManager om,
- String raftGroupIdStr, RaftPeerId localRaftPeerId,
- InetSocketAddress addr, List raftPeers,
- SecurityConfig secConfig, CertificateClient certClient)
- throws IOException {
- this.ozoneManager = om;
- this.omRatisAddress = addr;
- this.port = addr.getPort();
- RaftProperties serverProperties = newRaftProperties(conf);
-
- this.raftPeerId = localRaftPeerId;
- this.raftGroupId = RaftGroupId.valueOf(
- getRaftGroupIdFromOmServiceId(raftGroupIdStr));
- this.raftGroup = RaftGroup.valueOf(raftGroupId, raftPeers);
-
- StringBuilder raftPeersStr = new StringBuilder();
- for (RaftPeer peer : raftPeers) {
- raftPeersStr.append(", ").append(peer.getAddress());
- }
- LOG.info("Instantiating OM Ratis server with GroupID: {} and " +
- "Raft Peers: {}", raftGroupIdStr, raftPeersStr.toString().substring(2));
-
- this.omStateMachine = getStateMachine(conf);
-
- Parameters parameters = createServerTlsParameters(secConfig, certClient);
- this.server = RaftServer.newBuilder()
- .setServerId(this.raftPeerId)
- .setGroup(this.raftGroup)
- .setProperties(serverProperties)
- .setParameters(parameters)
- .setStateMachine(omStateMachine)
- .build();
- }
-
- /**
- * Creates an instance of OzoneManagerRatisServer.
- */
- public static OzoneManagerRatisServer newOMRatisServer(
- ConfigurationSource ozoneConf, OzoneManager omProtocol,
- OMNodeDetails omNodeDetails, List peerNodes,
- SecurityConfig secConfig, CertificateClient certClient)
- throws IOException {
-
- // RaftGroupId is the omServiceId
- String omServiceId = omNodeDetails.getServiceId();
-
- String omNodeId = omNodeDetails.getNodeId();
- RaftPeerId localRaftPeerId = RaftPeerId.getRaftPeerId(omNodeId);
-
- InetSocketAddress ratisAddr = new InetSocketAddress(
- omNodeDetails.getInetAddress(), omNodeDetails.getRatisPort());
-
- RaftPeer localRaftPeer = RaftPeer.newBuilder()
- .setId(localRaftPeerId)
- .setAddress(ratisAddr)
- .build();
-
- List raftPeers = new ArrayList<>();
- // Add this Ratis server to the Ratis ring
- raftPeers.add(localRaftPeer);
-
- for (OMNodeDetails peerInfo : peerNodes) {
- String peerNodeId = peerInfo.getNodeId();
- RaftPeerId raftPeerId = RaftPeerId.valueOf(peerNodeId);
- RaftPeer raftPeer;
- if (peerInfo.isHostUnresolved()) {
- raftPeer = RaftPeer.newBuilder()
- .setId(raftPeerId)
- .setAddress(peerInfo.getRatisHostPortStr())
- .build();
- } else {
- InetSocketAddress peerRatisAddr = new InetSocketAddress(
- peerInfo.getInetAddress(), peerInfo.getRatisPort());
- raftPeer = RaftPeer.newBuilder()
- .setId(raftPeerId)
- .setAddress(peerRatisAddr)
- .build();
- }
-
- // Add other OM nodes belonging to the same OM service to the Ratis ring
- raftPeers.add(raftPeer);
- }
-
- return new OzoneManagerRatisServer(ozoneConf, omProtocol, omServiceId,
- localRaftPeerId, ratisAddr, raftPeers, secConfig, certClient);
- }
-
public RaftGroup getRaftGroup() {
return this.raftGroup;
}
@@ -439,9 +547,8 @@ private RaftProperties newRaftProperties(ConfigurationSource conf) {
}
// Set Ratis storage directory
- String storageDir = OzoneManagerRatisServer.getOMRatisDirectory(conf);
RaftServerConfigKeys.setStorageDir(properties,
- Collections.singletonList(new File(storageDir)));
+ Collections.singletonList(new File(ratisStorageDir)));
// Disable the pre vote feature in Ratis
RaftServerConfigKeys.LeaderElection.setPreVote(properties, false);
@@ -616,6 +723,24 @@ public RaftServerStatus checkLeaderStatus() {
return RaftServerStatus.NOT_LEADER;
}
+ /**
+ * Get list of peer NodeIds from Ratis.
+ * @return List of Peer NodeId's.
+ */
+ @VisibleForTesting
+ public List getCurrentPeersFromRaftConf() throws IOException {
+ try {
+ Collection currentPeers =
+ server.getDivision(raftGroupId).getRaftConf().getCurrentPeers();
+ List currentPeerList = new ArrayList<>();
+ currentPeers.forEach(e -> currentPeerList.add(e.getId().toString()));
+ return currentPeerList;
+ } catch (IOException e) {
+ // In this case we return not a leader.
+ throw new IOException("Failed to get peer information from Ratis.", e);
+ }
+ }
+
public int getServerPort() {
return port;
}
@@ -634,30 +759,8 @@ private UUID getRaftGroupIdFromOmServiceId(String omServiceId) {
return UUID.nameUUIDFromBytes(omServiceId.getBytes(StandardCharsets.UTF_8));
}
- /**
- * Get the local directory where ratis logs will be stored.
- */
- public static String getOMRatisDirectory(ConfigurationSource conf) {
- String storageDir = conf.get(OMConfigKeys.OZONE_OM_RATIS_STORAGE_DIR);
-
- if (Strings.isNullOrEmpty(storageDir)) {
- storageDir = ServerUtils.getDefaultRatisDirectory(conf);
- }
- return storageDir;
- }
-
- public static String getOMRatisSnapshotDirectory(ConfigurationSource conf) {
- String snapshotDir = conf.get(OZONE_OM_RATIS_SNAPSHOT_DIR);
-
- // If ratis snapshot directory is not set, fall back to ozone.metadata.dir.
- if (Strings.isNullOrEmpty(snapshotDir)) {
- LOG.warn("{} is not configured. Falling back to {} config",
- OZONE_OM_RATIS_SNAPSHOT_DIR, OZONE_METADATA_DIRS);
- File metaDirPath = ServerUtils.getOzoneMetaDirPath(conf);
- snapshotDir = Paths.get(metaDirPath.getPath(),
- OM_RATIS_SNAPSHOT_DIR).toString();
- }
- return snapshotDir;
+ public String getRatisStorageDir() {
+ return ratisStorageDir;
}
public TermIndex getLastAppliedTermIndex() {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
index e51d40db439f..1b7dc467dc7c 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
@@ -53,6 +53,7 @@
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.exceptions.RaftException;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.protocol.TermIndex;
@@ -175,6 +176,26 @@ public void notifyTermIndexUpdated(long currentTerm, long index) {
computeAndUpdateLastAppliedIndex(index, currentTerm, null, false);
}
+ /**
+ * Called to notify state machine about configuration changes.
+ * Configurations changes include addition of newly bootstrapped OM.
+ */
+ @Override
+ public void notifyConfigurationChanged(long term, long index,
+ RaftProtos.RaftConfigurationProto newRaftConfiguration) {
+ List newPeers =
+ newRaftConfiguration.getPeersList();
+ LOG.info("Received Configuration change notification from Ratis. New Peer" +
+ " list:\n{}", newPeers);
+
+ List newPeerIds = new ArrayList<>();
+ for (RaftProtos.RaftPeerProto raftPeerProto : newPeers) {
+ newPeerIds.add(RaftPeerId.valueOf(raftPeerProto.getId()).toString());
+ }
+ // Check and update the peer list in OzoneManager
+ ozoneManager.updatePeerList(newPeerIds);
+ }
+
/**
* Validate/pre-process the incoming update request in the state machine.
* @return the content to be written to the log entry. Null means the request
@@ -444,6 +465,15 @@ public String toStateMachineLogEntryString(StateMachineLogEntryProto proto) {
return OMRatisHelper.smProtoToString(proto);
}
+ @Override
+ public void close() throws IOException {
+ // OM should be shutdown as the StateMachine has shutdown.
+ LOG.info("StateMachine has shutdown. Shutdown OzoneManager if not " +
+ "already shutdown.");
+ ozoneManager.shutdown(new RaftException("RaftServer called shutdown on " +
+ "StateMachine"));
+ }
+
/**
* Handle the RaftClientRequest and return TransactionContext object.
* @param raftClientRequest
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
index d4822cd45940..e655b2887cfb 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
@@ -18,12 +18,22 @@
package org.apache.hadoop.ozone.om.ratis.utils;
import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.protobuf.ServiceException;
+import java.io.File;
+import java.nio.file.Paths;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.server.ServerUtils;
import org.apache.hadoop.hdds.utils.HAUtils;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.codec.OMDBDefinition;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.hdds.utils.TransactionInfo;
+import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
+import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer.RaftServerStatus;
import org.apache.hadoop.ozone.om.request.bucket.OMBucketCreateRequest;
import org.apache.hadoop.ozone.om.request.bucket.OMBucketDeleteRequest;
import org.apache.hadoop.ozone.om.request.bucket.OMBucketSetPropertyRequest;
@@ -87,11 +97,17 @@
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneObj.ObjectType;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.ratis.protocol.RaftPeerId;
import org.rocksdb.RocksDBException;
import java.io.IOException;
import java.nio.file.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_RATIS_SNAPSHOT_DIR;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_DIR;
/**
* Utility class used by OzoneManager HA.
@@ -102,6 +118,9 @@ public final class OzoneManagerRatisUtils {
// upgrade HDDS-3698 story reaches consensus.
private static boolean isBucketFSOptimized = false;
+ private static final Logger LOG = LoggerFactory
+ .getLogger(OzoneManagerRatisUtils.class);
+
private OzoneManagerRatisUtils() {
}
@@ -349,4 +368,73 @@ public static boolean isBucketFSOptimized() {
return isBucketFSOptimized;
}
+ /**
+ * Get the local directory where ratis logs will be stored.
+ */
+ public static String getOMRatisDirectory(ConfigurationSource conf) {
+ String storageDir = conf.get(OMConfigKeys.OZONE_OM_RATIS_STORAGE_DIR);
+
+ if (Strings.isNullOrEmpty(storageDir)) {
+ storageDir = ServerUtils.getDefaultRatisDirectory(conf);
+ }
+ return storageDir;
+ }
+
+ /**
+ * Get the local directory where ratis snapshots will be stored.
+ */
+ public static String getOMRatisSnapshotDirectory(ConfigurationSource conf) {
+ String snapshotDir = conf.get(OZONE_OM_RATIS_SNAPSHOT_DIR);
+
+ // If ratis snapshot directory is not set, fall back to ozone.metadata.dir.
+ if (Strings.isNullOrEmpty(snapshotDir)) {
+ LOG.warn("{} is not configured. Falling back to {} config",
+ OZONE_OM_RATIS_SNAPSHOT_DIR, OZONE_METADATA_DIRS);
+ File metaDirPath = ServerUtils.getOzoneMetaDirPath(conf);
+ snapshotDir = Paths.get(metaDirPath.getPath(),
+ OM_RATIS_SNAPSHOT_DIR).toString();
+ }
+ return snapshotDir;
+ }
+
+ public static void checkLeaderStatus(RaftServerStatus raftServerStatus,
+ RaftPeerId raftPeerId) throws ServiceException {
+ switch (raftServerStatus) {
+ case LEADER_AND_READY: return;
+
+ case LEADER_AND_NOT_READY: throw createLeaderNotReadyException(raftPeerId);
+
+ case NOT_LEADER: throw createNotLeaderException(raftPeerId);
+
+ default: throw new IllegalStateException(
+ "Unknown Ratis Server state: " + raftServerStatus);
+ }
+ }
+
+ private static ServiceException createNotLeaderException(
+ RaftPeerId raftPeerId) {
+
+ // TODO: Set suggest leaderID. Right now, client is not using suggest
+ // leaderID. Need to fix this.
+
+ OMNotLeaderException notLeaderException =
+ new OMNotLeaderException(raftPeerId);
+
+ LOG.debug(notLeaderException.getMessage());
+
+ return new ServiceException(notLeaderException);
+ }
+
+ private static ServiceException createLeaderNotReadyException(
+ RaftPeerId raftPeerId) {
+
+ OMLeaderNotReadyException leaderNotReadyException =
+ new OMLeaderNotReadyException(raftPeerId.toString() + " is Leader " +
+ "but not ready to process request yet.");
+
+ LOG.debug(leaderNotReadyException.getMessage());
+
+ return new ServiceException(leaderNotReadyException);
+ }
+
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OzoneManagerSnapshotProvider.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OzoneManagerSnapshotProvider.java
index 097d1421d5c7..b24ca4b08fde 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OzoneManagerSnapshotProvider.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OzoneManagerSnapshotProvider.java
@@ -26,7 +26,6 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -37,7 +36,7 @@
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
import org.apache.hadoop.hdds.utils.db.RocksDBCheckpoint;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
-import org.apache.hadoop.ozone.om.ha.OMNodeDetails;
+import org.apache.hadoop.ozone.om.helpers.OMNodeDetails;
import static java.net.HttpURLConnection.HTTP_CREATED;
import static java.net.HttpURLConnection.HTTP_OK;
@@ -72,15 +71,13 @@ public class OzoneManagerSnapshotProvider {
private static final String OM_SNAPSHOT_DB = "om.snapshot.db";
public OzoneManagerSnapshotProvider(MutableConfigurationSource conf,
- File omRatisSnapshotDir, List peerNodes) {
+ File omRatisSnapshotDir, Map peerNodeDetails) {
LOG.info("Initializing OM Snapshot Provider");
this.omSnapshotDir = omRatisSnapshotDir;
this.peerNodesMap = new HashMap<>();
- for (OMNodeDetails peerNode : peerNodes) {
- this.peerNodesMap.put(peerNode.getNodeId(), peerNode);
- }
+ peerNodesMap.putAll(peerNodeDetails);
this.httpPolicy = HttpConfig.getHttpPolicy(conf);
this.spnegoEnabled = conf.get(OZONE_OM_HTTP_AUTH_TYPE, "simple")
@@ -120,7 +117,7 @@ public DBCheckpoint getOzoneManagerDBSnapshot(String leaderOMNodeID)
File targetFile = new File(snapshotFileName + ".tar.gz");
String omCheckpointUrl = peerNodesMap.get(leaderOMNodeID)
- .getOMDBCheckpointEnpointUrl(httpPolicy);
+ .getOMDBCheckpointEnpointUrl(httpPolicy.isHttpEnabled());
LOG.info("Downloading latest checkpoint from Leader OM {}. Checkpoint " +
"URL: {}", leaderOMNodeID, omCheckpointUrl);
@@ -159,4 +156,11 @@ public void stop() {
connectionFactory.destroy();
}
}
+
+ /**
+ * When a new OM is bootstrapped, add it to the peerNode map.
+ */
+ public void addNewPeerNode(OMNodeDetails newOMNode) {
+ peerNodesMap.put(newOMNode.getNodeId(), newOMNode);
+ }
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OMInterServiceProtocolServerSideImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OMInterServiceProtocolServerSideImpl.java
new file mode 100644
index 000000000000..d5ceb4e8b853
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OMInterServiceProtocolServerSideImpl.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.protocolPB;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import java.io.IOException;
+import org.apache.hadoop.ozone.om.helpers.OMNodeDetails;
+import org.apache.hadoop.ozone.om.protocolPB.OMInterServiceProtocolPB;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerInterServiceProtocolProtos.BootstrapOMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerInterServiceProtocolProtos.BootstrapOMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerInterServiceProtocolProtos.ErrorCode;
+
+/**
+ * This class is the server-side translator that forwards requests received on
+ * {@link OMInterServiceProtocolPB}
+ * to the OzoneManagerInterService server implementation.
+ */
+public class OMInterServiceProtocolServerSideImpl implements
+ OMInterServiceProtocolPB {
+
+ private final OzoneManagerRatisServer omRatisServer;
+ private final boolean isRatisEnabled;
+
+ public OMInterServiceProtocolServerSideImpl(
+ OzoneManagerRatisServer ratisServer, boolean enableRatis) {
+ this.omRatisServer = ratisServer;
+ this.isRatisEnabled = enableRatis;
+ }
+
+ @Override
+ public BootstrapOMResponse bootstrap(RpcController controller,
+ BootstrapOMRequest request) throws ServiceException {
+ if (request == null) {
+ return null;
+ }
+ if (!isRatisEnabled) {
+ return BootstrapOMResponse.newBuilder()
+ .setSuccess(false)
+ .setErrorCode(ErrorCode.RATIS_NOT_ENABLED)
+ .setErrorMsg("New OM node cannot be bootstrapped as Ratis " +
+ "is not enabled on existing OM")
+ .build();
+ }
+
+ checkLeaderStatus();
+
+ OMNodeDetails newOmNode = new OMNodeDetails.Builder()
+ .setOMNodeId(request.getNodeId())
+ .setHostAddress(request.getHostAddress())
+ .setRatisPort(request.getRatisPort())
+ .build();
+
+ try {
+ omRatisServer.addOMToRatisRing(newOmNode);
+ } catch (IOException ex) {
+ return BootstrapOMResponse.newBuilder()
+ .setSuccess(false)
+ .setErrorCode(ErrorCode.RATIS_BOOTSTRAP_ERROR)
+ .setErrorMsg(ex.getMessage())
+ .build();
+ }
+
+ return BootstrapOMResponse.newBuilder()
+ .setSuccess(true)
+ .build();
+ }
+
+ private void checkLeaderStatus() throws ServiceException {
+ OzoneManagerRatisUtils.checkLeaderStatus(omRatisServer.checkLeaderStatus(),
+ omRatisServer.getRaftPeerId());
+ }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
index d9dcece8addd..7a256343f15e 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
@@ -16,6 +16,8 @@
*/
package org.apache.hadoop.ozone.protocolPB;
+import static org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer.RaftServerStatus.LEADER_AND_READY;
+import static org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer.RaftServerStatus.NOT_LEADER;
import static org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils.createClientRequest;
import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.PrepareStatus;
@@ -48,9 +50,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer.RaftServerStatus.LEADER_AND_READY;
-import static org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer.RaftServerStatus.NOT_LEADER;
-
/**
* This class is the server-side translator that forwards requests received on
* {@link OzoneManagerProtocolPB}
@@ -133,48 +132,21 @@ private OMResponse processRequest(OMRequest request) throws
if (OmUtils.isReadOnly(request)) {
return submitReadRequestToOM(request);
} else {
- raftServerStatus = omRatisServer.checkLeaderStatus();
- if (raftServerStatus == LEADER_AND_READY) {
- try {
- OMClientRequest omClientRequest = createClientRequest(request);
- request = omClientRequest.preExecute(ozoneManager);
- } catch (IOException ex) {
- // As some of the preExecute returns error. So handle here.
- return createErrorResponse(request, ex);
- }
- return submitRequestToRatis(request);
- } else {
- throw createLeaderErrorException(raftServerStatus);
+ checkLeaderStatus();
+ try {
+ OMClientRequest omClientRequest = createClientRequest(request);
+ request = omClientRequest.preExecute(ozoneManager);
+ } catch (IOException ex) {
+ // As some of the preExecute returns error. So handle here.
+ return createErrorResponse(request, ex);
}
+ return submitRequestToRatis(request);
}
} else {
return submitRequestDirectlyToOM(request);
}
}
- /**
- * Create OMResponse from the specified OMRequest and exception.
- *
- * @param omRequest
- * @param exception
- * @return OMResponse
- */
- private OMResponse createErrorResponse(
- OMRequest omRequest, IOException exception) {
- // Added all write command types here, because in future if any of the
- // preExecute is changed to return IOException, we can return the error
- // OMResponse to the client.
- OMResponse.Builder omResponse = OMResponse.newBuilder()
- .setStatus(OzoneManagerRatisUtils.exceptionToResponseStatus(exception))
- .setCmdType(omRequest.getCmdType())
- .setTraceID(omRequest.getTraceID())
- .setSuccess(false);
- if (exception.getMessage() != null) {
- omResponse.setMessage(exception.getMessage());
- }
- return omResponse.build();
- }
-
/**
* Submits request to OM's Ratis server.
*/
@@ -264,6 +236,34 @@ private OMResponse submitRequestDirectlyToOM(OMRequest request) {
return omClientResponse.getOMResponse();
}
+ private void checkLeaderStatus() throws ServiceException {
+ OzoneManagerRatisUtils.checkLeaderStatus(omRatisServer.checkLeaderStatus(),
+ omRatisServer.getRaftPeerId());
+ }
+
+ /**
+ * Create OMResponse from the specified OMRequest and exception.
+ *
+ * @param omRequest
+ * @param exception
+ * @return OMResponse
+ */
+ private OMResponse createErrorResponse(
+ OMRequest omRequest, IOException exception) {
+ // Added all write command types here, because in future if any of the
+ // preExecute is changed to return IOException, we can return the error
+ // OMResponse to the client.
+ OMResponse.Builder omResponse = OMResponse.newBuilder()
+ .setStatus(OzoneManagerRatisUtils.exceptionToResponseStatus(exception))
+ .setCmdType(omRequest.getCmdType())
+ .setTraceID(omRequest.getTraceID())
+ .setSuccess(false);
+ if (exception.getMessage() != null) {
+ omResponse.setMessage(exception.getMessage());
+ }
+ return omResponse.build();
+ }
+
public void stop() {
if (!isRatisEnabled) {
ozoneManagerDoubleBuffer.stop();
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerStarter.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerStarter.java
index b64edce5ec1b..e140fd50dcf4 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerStarter.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerStarter.java
@@ -177,6 +177,12 @@ public boolean init(OzoneConfiguration conf) throws IOException,
return initStatus;
}
+ @Override
+ public void bootstrap(OzoneConfiguration conf) throws IOException,
+ AuthenticationException {
+ //TODO: Add test for bootstrap
+ }
+
@Override
public void startAndCancelPrepare(OzoneConfiguration conf)
throws IOException, AuthenticationException {
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java
index 84b207ef5c8d..7d7c310f6858 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java
@@ -118,7 +118,7 @@ private final class MockFailoverProxyProvider
private MockFailoverProxyProvider(ConfigurationSource configuration)
throws IOException {
- super(configuration, null, null);
+ super(configuration, null, null, OzoneManagerProtocolPB.class);
}
@Override
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java
index 07455edbc899..2b2b75af240f 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java
@@ -36,7 +36,7 @@
import org.apache.hadoop.ozone.common.ha.ratis.RatisSnapshotInfo;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OMMetadataManager;
-import org.apache.hadoop.ozone.om.ha.OMNodeDetails;
+import org.apache.hadoop.ozone.om.helpers.OMNodeDetails;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
@@ -115,7 +115,7 @@ public void init() throws Exception {
secConfig = new SecurityConfig(conf);
certClient = new OMCertificateClient(secConfig);
omRatisServer = OzoneManagerRatisServer.newOMRatisServer(conf, ozoneManager,
- omNodeDetails, Collections.emptyList(), secConfig, certClient);
+ omNodeDetails, Collections.emptyMap(), secConfig, certClient, false);
omRatisServer.start();
}
@@ -155,7 +155,7 @@ public void testLoadSnapshotInfoOnStart() throws Exception {
// Start new Ratis server. It should pick up and load the new SnapshotInfo
omRatisServer = OzoneManagerRatisServer.newOMRatisServer(conf, ozoneManager,
- omNodeDetails, Collections.emptyList(), secConfig, certClient);
+ omNodeDetails, Collections.emptyMap(), secConfig, certClient, false);
omRatisServer.start();
TermIndex lastAppliedTermIndex =
omRatisServer.getLastAppliedTermIndex();
@@ -225,7 +225,7 @@ public void verifyRaftGroupIdGenerationWithCustomOmServiceId() throws
omRatisServer.stop();
OzoneManagerRatisServer newOmRatisServer = OzoneManagerRatisServer
.newOMRatisServer(newConf, ozoneManager, nodeDetails,
- Collections.emptyList(), secConfig, certClient);
+ Collections.emptyMap(), secConfig, certClient, false);
newOmRatisServer.start();
UUID uuid = UUID.nameUUIDFromBytes(customOmServiceId.getBytes(UTF_8));
diff --git a/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java b/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
index b2a35704a8fb..c0de6d632544 100644
--- a/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
+++ b/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
@@ -57,7 +57,7 @@ public Hadoop27RpcTransport(ConfigurationSource conf,
ProtobufRpcEngine.class);
this.omFailoverProxyProvider = new OMFailoverProxyProvider(conf, ugi,
- omServiceId);
+ omServiceId, OzoneManagerProtocolPB.class);
int maxFailovers = conf.getInt(
OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,