From e657248bed9ae90b3c8ec28b447a411e134214a6 Mon Sep 17 00:00:00 2001 From: Shashikant Banerjee Date: Wed, 14 Apr 2021 10:37:30 +0530 Subject: [PATCH 01/10] HDDS-5103. Fix Install Snapshot Mechanism in SCMStateMachine. --- .../hdds/scm/ha/InterSCMGrpcClient.java | 10 +--- .../scm/ha/InterSCMGrpcProtocolService.java | 1 + .../hadoop/hdds/scm/ha/SCMHAManagerImpl.java | 12 +++- .../hadoop/hdds/scm/ha/SCMNodeDetails.java | 4 ++ .../hdds/scm/ha/SCMSnapshotProvider.java | 36 ++++++++---- .../hadoop/hdds/scm/ha/SCMStateMachine.java | 13 ++++- .../scm/TestSCMInstallSnapshotWithHA.java | 55 +++---------------- 7 files changed, 61 insertions(+), 70 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcClient.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcClient.java index 2c4b98fa2e04..02a26b2f88aa 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcClient.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcClient.java @@ -52,14 +52,8 @@ public class InterSCMGrpcClient implements SCMSnapshotDownloader{ private final InterSCMProtocolServiceGrpc.InterSCMProtocolServiceStub client; - private final long timeout; - - public InterSCMGrpcClient(final String host, final ConfigurationSource conf) { - Preconditions.checkNotNull(conf); - int port = conf.getInt(ScmConfigKeys.OZONE_SCM_GRPC_PORT_KEY, - ScmConfigKeys.OZONE_SCM_GRPC_PORT_DEFAULT); - timeout = - conf.getObject(SCMHAConfiguration.class).getGrpcDeadlineInterval(); + public InterSCMGrpcClient(final String host, final int port, + final long timeout) { NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(host, port).usePlaintext() .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcProtocolService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcProtocolService.java index d92220ae53d5..b6f08a5fcdf2 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcProtocolService.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcProtocolService.java @@ -69,6 +69,7 @@ public void start() throws IOException { LOG.info("Ignore. already started."); return; } else { + LOG.info("Starting SCM Grpc Service at port {}", port); server.start(); } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java index dca469ac9b7f..e448a3b41f27 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java @@ -101,7 +101,9 @@ public void start() throws IOException { if (ratisServer.getDivision().getGroup().getPeers().isEmpty()) { // this is a bootstrapped node // It will first try to add itself to existing ring - boolean success = HAUtils.addSCM(OzoneConfiguration.of(conf), + final SCMNodeDetails details = + scm.getSCMHANodeDetails().getLocalNodeDetails(); + final boolean success = HAUtils.addSCM(OzoneConfiguration.of(conf), new AddSCMRequest.Builder().setClusterId(scm.getClusterId()) .setScmId(scm.getScmId()) .setRatisAddr(scm.getSCMHANodeDetails().getLocalNodeDetails() @@ -109,6 +111,9 @@ public void start() throws IOException { .getRatisHostPortStr()).build(), scm.getSCMNodeId()); if (!success) { throw new IOException("Adding SCM to existing HA group failed"); + } else { + LOG.info("Successfully added SCM {} to group {}", details.getNodeId(), + ratisServer.getDivision().getGroup()); } } else { LOG.info(" scm role is {} peers {}", @@ -356,6 +361,11 @@ public void setExitManagerForTesting(ExitManager exitManagerForTesting) { this.exitManager = exitManagerForTesting; } + @VisibleForTesting + public void stopGrpcService() { + grpcServer.stop(); + } + @VisibleForTesting public static Logger getLogger() { return LOG; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java index 47b0a2335925..17901ecfde5c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java @@ -210,4 +210,8 @@ public InetSocketAddress getDatanodeProtocolServerAddress() { public String getDatanodeAddressKey() { return datanodeAddressKey; } + + public int getGrpcPort() { + return grpcPort; + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotProvider.java index 093b810909dc..6314791201be 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotProvider.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.utils.db.DBCheckpoint; import org.apache.hadoop.hdds.utils.db.RocksDBCheckpoint; @@ -53,16 +54,21 @@ public class SCMSnapshotProvider { private final File scmSnapshotDir; - private final ConfigurationSource conf; + private final int downloadClientPort; - private SCMSnapshotDownloader client; + private final long downloadClientTimeout; + + private SCMSnapshotDownloader downloadClient; private Map peerNodesMap; public SCMSnapshotProvider(ConfigurationSource conf, List peerNodes) { LOG.info("Initializing SCM Snapshot Provider"); - this.conf = conf; + downloadClientPort = conf.getInt(ScmConfigKeys.OZONE_SCM_GRPC_PORT_KEY, + ScmConfigKeys.OZONE_SCM_GRPC_PORT_DEFAULT); + downloadClientTimeout = + conf.getObject(SCMHAConfiguration.class).getGrpcDeadlineInterval(); // Create Ratis storage dir String scmRatisDirectory = SCMHAUtils.getSCMRatisDirectory(conf); @@ -81,7 +87,7 @@ public SCMSnapshotProvider(ConfigurationSource conf, this.peerNodesMap.put(peerNode.getNodeId(), peerNode); } } - this.client = null; + this.downloadClient = null; } @VisibleForTesting @@ -103,15 +109,21 @@ public DBCheckpoint getSCMDBSnapshot(String leaderSCMNodeID) .getAbsolutePath(); File targetFile = new File(snapshotFilePath + ".tar.gz"); - // the client instance will be initialized only when first install snapshot - // notification from ratis leader will be received. - if (client == null) { - client = new InterSCMGrpcClient( + // the downloadClient instance will be initialized only when first install + // snapshot notification from ratis leader will be received. + if (downloadClient == null) { + int port = peerNodesMap.get(leaderSCMNodeID).getGrpcPort(); + // if the leader grpc port details are not setup in the peer Map, + // fall back to default grpc port. + if (port == 0) { + port = downloadClientPort; + } + downloadClient = new InterSCMGrpcClient( peerNodesMap.get(leaderSCMNodeID).getInetAddress().getHostAddress(), - conf); + port, downloadClientTimeout); } try { - client.download(targetFile.toPath()).get(); + downloadClient.download(targetFile.toPath()).get(); } catch (InterruptedException | ExecutionException e) { LOG.error("Rocks DB checkpoint downloading failed", e); throw new IOException(e); @@ -137,8 +149,8 @@ public File getScmSnapshotDir() { } public void stop() throws Exception { - if (client != null) { - client.close(); + if (downloadClient != null) { + downloadClient.close(); } } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java index 29dcf7583b70..981ed2172e27 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java @@ -22,6 +22,7 @@ import java.util.EnumMap; import java.util.Map; import java.util.Collection; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -185,10 +186,16 @@ public void notifyNotLeader(Collection pendingEntries) { public CompletableFuture notifyInstallSnapshotFromLeader( RaftProtos.RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog) { - String leaderNodeId = RaftPeerId.valueOf(roleInfoProto.getFollowerInfo() - .getLeaderInfo().getId().getId()).toString(); + String leaderNode = roleInfoProto.getFollowerInfo() + .getLeaderInfo().getId().getAddress(); + Optional leaderDetails = + scm.getSCMHANodeDetails().getPeerNodeDetails().stream().filter( + p -> p.getRatisHostPortStr().equals(p.getRatisHostPortStr())) + .findFirst(); + Preconditions.checkNotNull(leaderDetails); + final String leaderNodeId = leaderDetails.get().getNodeId(); LOG.info("Received install snapshot notification from SCM leader: {} with " - + "term index: {}", leaderNodeId, firstTermIndexInLog); + + "term index: {}", leaderNode, firstTermIndexInLog); CompletableFuture future = CompletableFuture.supplyAsync( () -> scm.getScmHAManager().installSnapshotFromLeader(leaderNodeId), diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMInstallSnapshotWithHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMInstallSnapshotWithHA.java index e2513244571b..efe492302261 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMInstallSnapshotWithHA.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMInstallSnapshotWithHA.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.ha.SCMHAConfiguration; import org.apache.hadoop.hdds.scm.ha.SCMHAManagerImpl; @@ -70,7 +71,7 @@ public class TestSCMInstallSnapshotWithHA { private int numOfSCMs = 3; private static final long SNAPSHOT_THRESHOLD = 5; - // private static final int LOG_PURGE_GAP = 5; + private static final int LOG_PURGE_GAP = 5; /** * Create a MiniOzoneCluster for testing. @@ -86,8 +87,8 @@ public void init() throws Exception { scmServiceId = "scm-service-test1"; SCMHAConfiguration scmhaConfiguration = conf.getObject(SCMHAConfiguration.class); - // scmhaConfiguration.setRaftLogPurgeEnabled(true); - // scmhaConfiguration.setRaftLogPurgeGap(LOG_PURGE_GAP); + scmhaConfiguration.setRaftLogPurgeEnabled(true); + scmhaConfiguration.setRaftLogPurgeGap(LOG_PURGE_GAP); scmhaConfiguration.setRatisSnapshotThreshold(SNAPSHOT_THRESHOLD); conf.setFromObject(scmhaConfiguration); @@ -113,18 +114,7 @@ public void shutdown() { } } - /** - * This test is disabled for now as there seems to be an issue with - * Ratis install Snapshot code. In ratis while a new node gets added, - * unless and until the node gets added to the voter list, the follower state - * is not updated with leader info. So, while an install snapshot notification - * is received in the leader, the leader info is not set and hence, out of - * ratis transfer using the same leader info doesn't work. - * - * TODO: Fix this - * */ @Test - @Disabled public void testInstallSnapshot() throws Exception { // Get the leader SCM StorageContainerManager leaderSCM = getLeader(cluster); @@ -137,20 +127,9 @@ public void testInstallSnapshot() throws Exception { // Do some transactions so that the log index increases List containers = writeToIncreaseLogIndex(leaderSCM, 200); - // Get the latest db checkpoint from the leader SCM. - TransactionInfo transactionInfo = - leaderSCM.getScmHAManager().asSCMHADBTransactionBuffer() - .getLatestTrxInfo(); - TermIndex leaderTermIndex = - TermIndex.valueOf(transactionInfo.getTerm(), - transactionInfo.getTransactionIndex()); - long leaderSnaphsotIndex = leaderTermIndex.getIndex(); - long leaderSnapshotTermIndex = leaderTermIndex.getTerm(); - - DBCheckpoint leaderDbCheckpoint = - leaderSCM.getScmMetadataStore().getStore().getCheckpoint(false); - - // Start the inactive + // Start the inactive SCM. Install Snapshot will happen as part + // of setConfiguration() call to ratis leader and the follower will catch + // up cluster.startInactiveSCM(followerId); // The recently started should be lagging behind the leader . @@ -158,23 +137,7 @@ public void testInstallSnapshot() throws Exception { follower.getScmHAManager().getRatisServer().getSCMStateMachine() .getLastAppliedTermIndex().getIndex(); assertTrue( - followerLastAppliedIndex < leaderSnaphsotIndex); - - SCMHAManagerImpl scmhaManager = - (SCMHAManagerImpl) (follower.getScmHAManager()); - // Install leader 's db checkpoint on the lagging . - scmhaManager.installCheckpoint(leaderNodeId, leaderDbCheckpoint); - - SCMStateMachine followerStateMachine = - follower.getScmHAManager().getRatisServer().getSCMStateMachine(); - // After the new checkpoint is installed, the follower - // lastAppliedIndex must >= the snapshot index of the checkpoint. It - // could be great than snapshot index if there is any conf entry from ratis. - followerLastAppliedIndex = followerStateMachine - .getLastAppliedTermIndex().getIndex(); - assertTrue(followerLastAppliedIndex >= leaderSnaphsotIndex); - assertTrue(followerStateMachine - .getLastAppliedTermIndex().getTerm() >= leaderSnapshotTermIndex); + followerLastAppliedIndex >= 200); // Verify that the follower 's DB contains the transactions which were // made while it was inactive. @@ -317,7 +280,7 @@ private List writeToIncreaseLogIndex( scm.getScmHAManager().getRatisServer().getSCMStateMachine(); long logIndex = scm.getScmHAManager().getRatisServer().getSCMStateMachine() .getLastAppliedTermIndex().getIndex(); - while (logIndex < targetLogIndex) { + while (logIndex <= targetLogIndex) { containers.add(scm.getContainerManager() .allocateContainer(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE, From 17d5caf5113e380d8b09be6d0523f8902804bef5 Mon Sep 17 00:00:00 2001 From: Shashikant Banerjee Date: Mon, 19 Apr 2021 16:53:10 +0530 Subject: [PATCH 02/10] updated ratis version and address unit test failures. --- .../hdds/scm/ha/InterSCMGrpcClient.java | 14 +++++-- .../hadoop/hdds/scm/ha/SCMHAManagerImpl.java | 8 ++-- .../hdds/scm/ha/SCMSnapshotDownloader.java | 2 +- .../hdds/scm/ha/SCMSnapshotProvider.java | 39 ++++++------------- .../hadoop/hdds/scm/ha/SCMStateMachine.java | 12 ++++-- .../scm/TestSCMInstallSnapshotWithHA.java | 3 -- pom.xml | 2 +- 7 files changed, 37 insertions(+), 43 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcClient.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcClient.java index 02a26b2f88aa..b04e5658968f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcClient.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcClient.java @@ -52,8 +52,16 @@ public class InterSCMGrpcClient implements SCMSnapshotDownloader{ private final InterSCMProtocolServiceGrpc.InterSCMProtocolServiceStub client; - public InterSCMGrpcClient(final String host, final int port, - final long timeout) { + public InterSCMGrpcClient(final String host, final int leaderPort, + final ConfigurationSource conf) { + // if the leader grpc port details are not setup in the peer Map, + // fall back to default grpc port. + final int port = leaderPort == 0 ? + conf.getInt(ScmConfigKeys.OZONE_SCM_GRPC_PORT_KEY, + ScmConfigKeys.OZONE_SCM_GRPC_PORT_DEFAULT) : + leaderPort; + final long timeout = + conf.getObject(SCMHAConfiguration.class).getGrpcDeadlineInterval(); NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(host, port).usePlaintext() .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE); @@ -89,7 +97,7 @@ public void shutdown() { } @Override - public void close() throws Exception { + public void close() { shutdown(); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java index e448a3b41f27..00aff5ec8f71 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java @@ -101,19 +101,19 @@ public void start() throws IOException { if (ratisServer.getDivision().getGroup().getPeers().isEmpty()) { // this is a bootstrapped node // It will first try to add itself to existing ring - final SCMNodeDetails details = + final SCMNodeDetails nodeDetails = scm.getSCMHANodeDetails().getLocalNodeDetails(); final boolean success = HAUtils.addSCM(OzoneConfiguration.of(conf), new AddSCMRequest.Builder().setClusterId(scm.getClusterId()) .setScmId(scm.getScmId()) - .setRatisAddr(scm.getSCMHANodeDetails().getLocalNodeDetails() + .setRatisAddr(nodeDetails // TODO : Should we use IP instead of hostname?? .getRatisHostPortStr()).build(), scm.getSCMNodeId()); if (!success) { throw new IOException("Adding SCM to existing HA group failed"); } else { - LOG.info("Successfully added SCM {} to group {}", details.getNodeId(), - ratisServer.getDivision().getGroup()); + LOG.info("Successfully added SCM {} to group {}", + nodeDetails.getNodeId(), ratisServer.getDivision().getGroup()); } } else { LOG.info(" scm role is {} peers {}", diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotDownloader.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotDownloader.java index 7d5d3eb58c1e..7713a8cce52c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotDownloader.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotDownloader.java @@ -39,5 +39,5 @@ public interface SCMSnapshotDownloader { */ CompletableFuture download(Path destination) throws IOException; - void close() throws Exception; + void close(); } \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotProvider.java index 6314791201be..4ce92c3fc1f7 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotProvider.java @@ -54,21 +54,14 @@ public class SCMSnapshotProvider { private final File scmSnapshotDir; - private final int downloadClientPort; - - private final long downloadClientTimeout; - - private SCMSnapshotDownloader downloadClient; + private final ConfigurationSource conf; private Map peerNodesMap; public SCMSnapshotProvider(ConfigurationSource conf, List peerNodes) { LOG.info("Initializing SCM Snapshot Provider"); - downloadClientPort = conf.getInt(ScmConfigKeys.OZONE_SCM_GRPC_PORT_KEY, - ScmConfigKeys.OZONE_SCM_GRPC_PORT_DEFAULT); - downloadClientTimeout = - conf.getObject(SCMHAConfiguration.class).getGrpcDeadlineInterval(); + this.conf = conf; // Create Ratis storage dir String scmRatisDirectory = SCMHAUtils.getSCMRatisDirectory(conf); @@ -87,13 +80,13 @@ public SCMSnapshotProvider(ConfigurationSource conf, this.peerNodesMap.put(peerNode.getNodeId(), peerNode); } } - this.downloadClient = null; } @VisibleForTesting public void setPeerNodesMap(Map peerNodesMap) { this.peerNodesMap = peerNodesMap; } + /** * Download the latest checkpoint from SCM Leader . * @param leaderSCMNodeID leader SCM Node ID. @@ -109,24 +102,19 @@ public DBCheckpoint getSCMDBSnapshot(String leaderSCMNodeID) .getAbsolutePath(); File targetFile = new File(snapshotFilePath + ".tar.gz"); - // the downloadClient instance will be initialized only when first install - // snapshot notification from ratis leader will be received. - if (downloadClient == null) { - int port = peerNodesMap.get(leaderSCMNodeID).getGrpcPort(); - // if the leader grpc port details are not setup in the peer Map, - // fall back to default grpc port. - if (port == 0) { - port = downloadClientPort; - } - downloadClient = new InterSCMGrpcClient( - peerNodesMap.get(leaderSCMNodeID).getInetAddress().getHostAddress(), - port, downloadClientTimeout); - } + // the downloadClient instance will be craeted as and when install snapshot + // request is received. No caching of the client as it should be a very rare + int port = peerNodesMap.get(leaderSCMNodeID).getGrpcPort(); + SCMSnapshotDownloader downloadClient = new InterSCMGrpcClient( + peerNodesMap.get(leaderSCMNodeID).getInetAddress().getHostAddress(), + port, conf); try { downloadClient.download(targetFile.toPath()).get(); - } catch (InterruptedException | ExecutionException e) { + } catch (Exception e) { LOG.error("Rocks DB checkpoint downloading failed", e); throw new IOException(e); + } finally { + downloadClient.close(); } @@ -149,8 +137,5 @@ public File getScmSnapshotDir() { } public void stop() throws Exception { - if (downloadClient != null) { - downloadClient.close(); - } } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java index 981ed2172e27..a2686a182bb7 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType; import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; import org.apache.ratis.util.ExitUtils; +import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.LifeCycle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -185,17 +186,20 @@ public void notifyNotLeader(Collection pendingEntries) { @Override public CompletableFuture notifyInstallSnapshotFromLeader( RaftProtos.RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog) { - - String leaderNode = roleInfoProto.getFollowerInfo() + if (!roleInfoProto.getFollowerInfo().hasLeaderInfo()) { + return JavaUtils.completeExceptionally(new IOException("Failed " + + "notifyInstallSnapshotFromLeader due to missing leader info")); + } + String leaderAddress = roleInfoProto.getFollowerInfo() .getLeaderInfo().getId().getAddress(); Optional leaderDetails = scm.getSCMHANodeDetails().getPeerNodeDetails().stream().filter( - p -> p.getRatisHostPortStr().equals(p.getRatisHostPortStr())) + p -> p.getRatisHostPortStr().equals(leaderAddress)) .findFirst(); Preconditions.checkNotNull(leaderDetails); final String leaderNodeId = leaderDetails.get().getNodeId(); LOG.info("Received install snapshot notification from SCM leader: {} with " - + "term index: {}", leaderNode, firstTermIndexInLog); + + "term index: {}", leaderAddress, firstTermIndexInLog); CompletableFuture future = CompletableFuture.supplyAsync( () -> scm.getScmHAManager().installSnapshotFromLeader(leaderNodeId), diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMInstallSnapshotWithHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMInstallSnapshotWithHA.java index efe492302261..008bc1087924 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMInstallSnapshotWithHA.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMInstallSnapshotWithHA.java @@ -26,7 +26,6 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.ha.SCMHAConfiguration; import org.apache.hadoop.hdds.scm.ha.SCMHAManagerImpl; @@ -51,7 +50,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; -import org.junit.jupiter.api.Disabled; import org.slf4j.Logger; import org.slf4j.event.Level; @@ -118,7 +116,6 @@ public void shutdown() { public void testInstallSnapshot() throws Exception { // Get the leader SCM StorageContainerManager leaderSCM = getLeader(cluster); - String leaderNodeId = leaderSCM.getScmNodeDetails().getNodeId(); Assert.assertNotNull(leaderSCM); // Find the inactive SCM String followerId = getInactiveSCM(cluster).getScmId(); diff --git a/pom.xml b/pom.xml index 4d13c35675b3..5c8e86a86321 100644 --- a/pom.xml +++ b/pom.xml @@ -79,7 +79,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs ${ozone.version} - 2.0.0 + 2.1.0-80f6e4b-SNAPSHOT 0.6.0 From 5ef2c185580471f1c6c48cb195cc7e1578cb3b32 Mon Sep 17 00:00:00 2001 From: Shashikant Banerjee Date: Mon, 19 Apr 2021 16:57:55 +0530 Subject: [PATCH 03/10] Removed stop() method in SCMSnapshotDownloader. --- .../java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotProvider.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotProvider.java index 4ce92c3fc1f7..3c741714df71 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotProvider.java @@ -136,6 +136,4 @@ public File getScmSnapshotDir() { return scmSnapshotDir; } - public void stop() throws Exception { - } } From 991d1c4dedf4ef39220a4030aa9195c62287339b Mon Sep 17 00:00:00 2001 From: Shashikant Banerjee Date: Mon, 19 Apr 2021 17:07:05 +0530 Subject: [PATCH 04/10] Fixed checkstyle --- .../org/apache/hadoop/hdds/scm/ha/SCMSnapshotProvider.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotProvider.java index 3c741714df71..295a5a01624f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotProvider.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.conf.ConfigurationSource; -import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.utils.db.DBCheckpoint; import org.apache.hadoop.hdds.utils.db.RocksDBCheckpoint; @@ -110,7 +109,7 @@ public DBCheckpoint getSCMDBSnapshot(String leaderSCMNodeID) port, conf); try { downloadClient.download(targetFile.toPath()).get(); - } catch (Exception e) { + } catch (ExecutionException | InterruptedException e) { LOG.error("Rocks DB checkpoint downloading failed", e); throw new IOException(e); } finally { From 01f67b5d292083b3f3d9154334d31db32a514dcb Mon Sep 17 00:00:00 2001 From: Shashikant Banerjee Date: Tue, 20 Apr 2021 13:01:03 +0530 Subject: [PATCH 05/10] Addressed review comments. --- .../org/apache/hadoop/hdds/scm/ha/SCMSnapshotProvider.java | 2 +- .../java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotProvider.java index 295a5a01624f..e5bdfbe38849 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotProvider.java @@ -101,7 +101,7 @@ public DBCheckpoint getSCMDBSnapshot(String leaderSCMNodeID) .getAbsolutePath(); File targetFile = new File(snapshotFilePath + ".tar.gz"); - // the downloadClient instance will be craeted as and when install snapshot + // the downloadClient instance will be created as and when install snapshot // request is received. No caching of the client as it should be a very rare int port = peerNodesMap.get(leaderSCMNodeID).getGrpcPort(); SCMSnapshotDownloader downloadClient = new InterSCMGrpcClient( diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java index a2686a182bb7..bef743b5ccb5 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java @@ -187,7 +187,7 @@ public void notifyNotLeader(Collection pendingEntries) { public CompletableFuture notifyInstallSnapshotFromLeader( RaftProtos.RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog) { if (!roleInfoProto.getFollowerInfo().hasLeaderInfo()) { - return JavaUtils.completeExceptionally(new IOException("Failed " + + return JavaUtils.completeExceptionally(new IOException("Failed to " + "notifyInstallSnapshotFromLeader due to missing leader info")); } String leaderAddress = roleInfoProto.getFollowerInfo() @@ -196,7 +196,7 @@ public CompletableFuture notifyInstallSnapshotFromLeader( scm.getSCMHANodeDetails().getPeerNodeDetails().stream().filter( p -> p.getRatisHostPortStr().equals(leaderAddress)) .findFirst(); - Preconditions.checkNotNull(leaderDetails); + Preconditions.checkState(leaderDetails.isPresent()); final String leaderNodeId = leaderDetails.get().getNodeId(); LOG.info("Received install snapshot notification from SCM leader: {} with " + "term index: {}", leaderAddress, firstTermIndexInLog); From 16f1f50a327e7898fa7184b5ad5c1d9fb1508640 Mon Sep 17 00:00:00 2001 From: Shashikant Banerjee Date: Tue, 20 Apr 2021 13:09:30 +0530 Subject: [PATCH 06/10] Addressed review comments. --- .../org/apache/hadoop/hdds/scm/ha/InterSCMGrpcClient.java | 8 +------- .../apache/hadoop/hdds/scm/TestSCMInstallSnapshot.java | 4 +++- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcClient.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcClient.java index b04e5658968f..08dd307c1b2c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcClient.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcClient.java @@ -22,7 +22,6 @@ import org.apache.hadoop.hdds.protocol.scm.proto.InterSCMProtocolProtos; import org.apache.hadoop.hdds.protocol.scm.proto.InterSCMProtocolProtos.CopyDBCheckpointResponseProto; import org.apache.hadoop.hdds.protocol.scm.proto.InterSCMProtocolServiceGrpc; -import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.ratis.thirdparty.io.grpc.ManagedChannel; import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder; @@ -54,12 +53,7 @@ public class InterSCMGrpcClient implements SCMSnapshotDownloader{ public InterSCMGrpcClient(final String host, final int leaderPort, final ConfigurationSource conf) { - // if the leader grpc port details are not setup in the peer Map, - // fall back to default grpc port. - final int port = leaderPort == 0 ? - conf.getInt(ScmConfigKeys.OZONE_SCM_GRPC_PORT_KEY, - ScmConfigKeys.OZONE_SCM_GRPC_PORT_DEFAULT) : - leaderPort; + final int port = leaderPort; final long timeout = conf.getObject(SCMHAConfiguration.class).getGrpcDeadlineInterval(); NettyChannelBuilder channelBuilder = diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMInstallSnapshot.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMInstallSnapshot.java index f3682edca3a0..288365e9d9ea 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMInstallSnapshot.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMInstallSnapshot.java @@ -100,7 +100,9 @@ private DBCheckpoint downloadSnapshot() throws Exception { RATIS, ONE, "Owner2").getPipelineID()); pipelineManager.openPipeline(ratisPipeline2.getId()); SCMNodeDetails scmNodeDetails = new SCMNodeDetails.Builder() - .setRpcAddress(new InetSocketAddress("0.0.0.0", 0)).setSCMNodeId("scm1") + .setRpcAddress(new InetSocketAddress("0.0.0.0", 0)) + .setGrpcPort(ScmConfigKeys.OZONE_SCM_GRPC_PORT_DEFAULT) + .setSCMNodeId("scm1") .build(); Map peerMap = new HashMap<>(); peerMap.put(scmNodeDetails.getNodeId(), scmNodeDetails); From e8e04e300ee04f9d81d8a3fc078b0c5c7934ac49 Mon Sep 17 00:00:00 2001 From: Shashikant Banerjee Date: Tue, 20 Apr 2021 18:16:47 +0530 Subject: [PATCH 07/10] Trigger CI check From 4964ca30812f1e422dd62d9319218f57dbcdd082 Mon Sep 17 00:00:00 2001 From: Shashikant Banerjee Date: Thu, 22 Apr 2021 14:48:29 +0530 Subject: [PATCH 08/10] Updatd to latest ratis snapshot. --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 5c8e86a86321..7a54037741d4 100644 --- a/pom.xml +++ b/pom.xml @@ -79,10 +79,10 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs ${ozone.version} - 2.1.0-80f6e4b-SNAPSHOT + 2.1.0-43915d2-SNAPSHOT - 0.6.0 + 0.7.0-a398b19-SNAPSHOT apache.snapshots.https Apache Development Snapshot Repository From 07c097c7cc42c6ef96b6a56151389edf6cb65aad Mon Sep 17 00:00:00 2001 From: Shashikant Banerjee Date: Thu, 22 Apr 2021 16:32:44 +0530 Subject: [PATCH 09/10] Trigger CI check From 1862ed071496ebb8712bfad4a558fa6023126181 Mon Sep 17 00:00:00 2001 From: Shashikant Banerjee Date: Thu, 22 Apr 2021 17:11:23 +0530 Subject: [PATCH 10/10] Updated grpc version. --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 7a54037741d4..2af3f605b014 100644 --- a/pom.xml +++ b/pom.xml @@ -183,7 +183,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs 1.33.0 1.5.0.Final - 4.1.51.Final + 4.1.63.Final 1.8