From 56d3d9b2ccdacfece335796859724ba3b9f9884e Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Sat, 22 Feb 2025 11:17:36 +0800 Subject: [PATCH 1/4] HDDS-11768. Extract SCM failover proxy provider logic --- .../hadoop/hdds/scm/ha/SCMNodeInfo.java | 12 +- ...ocationProtocolClientSideTranslatorPB.java | 2 +- ...SCMBlockLocationFailoverProxyProvider.java | 265 +------------- ...ontainerLocationFailoverProxyProvider.java | 269 +------------- .../proxy/SCMFailoverProxyProviderBase.java | 345 ++++++++++++++++++ ...SecurityProtocolFailoverProxyProvider.java | 267 +------------- ...ecretKeyProtocolFailoverProxyProvider.java | 263 +------------ .../SingleSecretKeyProtocolProxyProvider.java | 11 + 8 files changed, 391 insertions(+), 1043 deletions(-) create mode 100644 hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMFailoverProxyProviderBase.java diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeInfo.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeInfo.java index 44289f35ff28..e1302fbcf205 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeInfo.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeInfo.java @@ -55,12 +55,12 @@ public class SCMNodeInfo { private static final Logger LOG = LoggerFactory.getLogger(SCMNodeInfo.class); - private String serviceId; - private String nodeId; - private String blockClientAddress; - private String scmClientAddress; - private String scmSecurityAddress; - private String scmDatanodeAddress; + private final String serviceId; + private final String nodeId; + private final String blockClientAddress; + private final String scmClientAddress; + private final String scmSecurityAddress; + private final String scmDatanodeAddress; /** * Build SCM Node information from configuration. diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java index 00e8e3e0ad22..13a8846f0423 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java @@ -98,7 +98,7 @@ public ScmBlockLocationProtocolClientSideTranslatorPB( this.failoverProxyProvider = proxyProvider; this.rpcProxy = (ScmBlockLocationProtocolPB) RetryProxy.create( ScmBlockLocationProtocolPB.class, failoverProxyProvider, - failoverProxyProvider.getSCMBlockLocationRetryPolicy()); + failoverProxyProvider.getRetryPolicy()); } /** diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java index 70c31d05b291..6eed475c7033 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java @@ -17,283 +17,34 @@ package org.apache.hadoop.hdds.scm.proxy; -import static org.apache.hadoop.ozone.OzoneConsts.SCM_DUMMY_SERVICE_ID; - -import com.google.common.annotations.VisibleForTesting; -import java.io.Closeable; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.conf.ConfigurationException; import org.apache.hadoop.hdds.conf.ConfigurationSource; -import org.apache.hadoop.hdds.ratis.ServerNotLeaderException; -import org.apache.hadoop.hdds.scm.ha.SCMHAUtils; import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo; import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB; -import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource; -import org.apache.hadoop.io.retry.FailoverProxyProvider; -import org.apache.hadoop.io.retry.RetryPolicies; -import org.apache.hadoop.io.retry.RetryPolicy; -import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Failover proxy provider for SCM block location. */ -public class SCMBlockLocationFailoverProxyProvider implements - FailoverProxyProvider, Closeable { +public class SCMBlockLocationFailoverProxyProvider extends + SCMFailoverProxyProviderBase { public static final Logger LOG = LoggerFactory.getLogger(SCMBlockLocationFailoverProxyProvider.class); - private final SCMClientConfig scmClientConfig; - - private final Map> scmProxies; - private final Map scmProxyInfoMap; - private List scmNodeIds; - - // As SCM Client is shared across threads, performFailOver() - // updates the currentProxySCMNodeId based on the updateLeaderNodeId which is - // updated in shouldRetry(). When 2 or more threads run in parallel, the - // RetryInvocationHandler will check the expectedFailOverCount - // and not execute performFailOver() for one of them. So the other thread(s) - // shall not call performFailOver(), it will call getProxy() which uses - // currentProxySCMNodeId and returns the proxy. - private volatile String currentProxySCMNodeId; - private volatile int currentProxyIndex; - - private final ConfigurationSource conf; - private final long scmVersion; - - private String scmServiceId; - - private final int maxRetryCount; - private final long retryInterval; - - private final UserGroupInformation ugi; - - private String updatedLeaderNodeID = null; - public SCMBlockLocationFailoverProxyProvider(ConfigurationSource conf) { - this.conf = conf; - this.scmVersion = RPC.getProtocolVersion(ScmBlockLocationProtocolPB.class); - - try { - this.ugi = UserGroupInformation.getCurrentUser(); - } catch (IOException ex) { - LOG.error("Unable to fetch user credentials from UGI", ex); - throw new RuntimeException(ex); - } - - // Set some constant for non-HA. - if (scmServiceId == null) { - scmServiceId = SCM_DUMMY_SERVICE_ID; - } - this.scmProxies = new HashMap<>(); - this.scmProxyInfoMap = new HashMap<>(); - - loadConfigs(); - - this.currentProxyIndex = 0; - currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex); - - scmClientConfig = conf.getObject(SCMClientConfig.class); - this.maxRetryCount = scmClientConfig.getRetryCount(); - this.retryInterval = scmClientConfig.getRetryInterval(); - + super(ScmBlockLocationProtocolPB.class, conf, null); LOG.info("Created block location fail-over proxy with {} nodes: {}", - scmNodeIds.size(), scmProxyInfoMap.values()); - } - - private synchronized void loadConfigs() { - - scmNodeIds = new ArrayList<>(); - List scmNodeInfoList = SCMNodeInfo.buildNodeInfo(conf); - - for (SCMNodeInfo scmNodeInfo : scmNodeInfoList) { - if (scmNodeInfo.getBlockClientAddress() == null) { - throw new ConfigurationException("SCM BlockClient Address could not " + - "be obtained from config. Config is not properly defined"); - } else { - InetSocketAddress scmBlockClientAddress = - NetUtils.createSocketAddr(scmNodeInfo.getBlockClientAddress()); - - scmServiceId = scmNodeInfo.getServiceId(); - String scmNodeId = scmNodeInfo.getNodeId(); - scmNodeIds.add(scmNodeId); - SCMProxyInfo scmProxyInfo = new SCMProxyInfo( - scmNodeInfo.getServiceId(), scmNodeInfo.getNodeId(), - scmBlockClientAddress); - scmProxyInfoMap.put(scmNodeId, scmProxyInfo); - } - } - } - - @VisibleForTesting - public synchronized void changeCurrentProxy(String nodeId) { - currentProxyIndex = scmNodeIds.indexOf(nodeId); - currentProxySCMNodeId = nodeId; - nextProxyIndex(); - } - - private synchronized String getCurrentProxySCMNodeId() { - return currentProxySCMNodeId; - } - - @Override - public synchronized ProxyInfo getProxy() { - String currentProxyNodeId = getCurrentProxySCMNodeId(); - ProxyInfo currentProxyInfo = - scmProxies.get(currentProxyNodeId); - if (currentProxyInfo == null) { - currentProxyInfo = createSCMProxy(currentProxyNodeId); - } - return currentProxyInfo; + getSCMNodeIds(), getSCMProxyInfoList()); } @Override - public synchronized void performFailover( - ScmBlockLocationProtocolPB newLeader) { - //If leader node id is set, use that or else move to next proxy index. - if (updatedLeaderNodeID != null) { - currentProxySCMNodeId = updatedLeaderNodeID; - } else { - nextProxyIndex(); - } - - } - - public synchronized void performFailoverToAssignedLeader(String newLeader, - Exception e) { - ServerNotLeaderException snle = - (ServerNotLeaderException) SCMHAUtils.getServerNotLeaderException(e); - if (snle != null && snle.getSuggestedLeader() != null) { - Optional matchedProxyInfo = - scmProxyInfoMap.values().stream().filter( - proxyInfo -> NetUtils.getHostPortString(proxyInfo.getAddress()) - .equals(snle.getSuggestedLeader())).findFirst(); - if (matchedProxyInfo.isPresent()) { - newLeader = matchedProxyInfo.get().getNodeId(); - LOG.debug("Performing failover to suggested leader {}, nodeId {}", - snle.getSuggestedLeader(), newLeader); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Suggested leader {} does not match with any of the " + - "proxyInfo adress {}", snle.getSuggestedLeader(), - Arrays.toString(scmProxyInfoMap.values().toArray())); - } - } - } - assignLeaderToNode(newLeader); + protected Logger getLogger() { + return LOG; } @Override - public Class getInterface() { - return ScmBlockLocationProtocolPB.class; - } - - @Override - public synchronized void close() throws IOException { - for (ProxyInfo proxy : scmProxies.values()) { - ScmBlockLocationProtocolPB scmProxy = proxy.proxy; - if (scmProxy != null) { - RPC.stopProxy(scmProxy); - } - } - } - - private synchronized long getRetryInterval() { - // TODO add exponential backup - return retryInterval; - } - - private synchronized void nextProxyIndex() { - // round robin the next proxy - - currentProxyIndex = (getCurrentProxyIndex() + 1) % scmProxyInfoMap.size(); - currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex); - } - - private synchronized void assignLeaderToNode(String newLeaderNodeId) { - if (!currentProxySCMNodeId.equals(newLeaderNodeId)) { - if (scmProxyInfoMap.containsKey(newLeaderNodeId)) { - updatedLeaderNodeID = newLeaderNodeId; - LOG.debug("Updated LeaderNodeID {}", updatedLeaderNodeID); - } else { - updatedLeaderNodeID = null; - } - } - } - - /** - * Creates proxy object. - */ - private ProxyInfo createSCMProxy(String nodeId) { - ProxyInfo proxyInfo; - SCMProxyInfo scmProxyInfo = scmProxyInfoMap.get(nodeId); - InetSocketAddress address = scmProxyInfo.getAddress(); - try { - ScmBlockLocationProtocolPB scmProxy = createSCMProxy(address); - // Create proxyInfo here, to make it work with all Hadoop versions. - proxyInfo = new ProxyInfo<>(scmProxy, scmProxyInfo.toString()); - scmProxies.put(nodeId, proxyInfo); - return proxyInfo; - } catch (IOException ioe) { - LOG.error("{} Failed to create RPC proxy to SCM at {}", - this.getClass().getSimpleName(), address, ioe); - throw new RuntimeException(ioe); - } - } - - private ScmBlockLocationProtocolPB createSCMProxy( - InetSocketAddress scmAddress) throws IOException { - Configuration hadoopConf = - LegacyHadoopConfigurationSource.asHadoopConfiguration(conf); - RPC.setProtocolEngine(hadoopConf, ScmBlockLocationProtocolPB.class, - ProtobufRpcEngine.class); - // FailoverOnNetworkException ensures that the IPC layer does not attempt - // retries on the same OM in case of connection exception. This retry - // policy essentially results in TRY_ONCE_THEN_FAIL. - RetryPolicy connectionRetryPolicy = RetryPolicies - .failoverOnNetworkException(0); - return RPC.getProtocolProxy(ScmBlockLocationProtocolPB.class, scmVersion, - scmAddress, ugi, hadoopConf, - NetUtils.getDefaultSocketFactory(hadoopConf), - (int) scmClientConfig.getRpcTimeOut(), - connectionRetryPolicy).getProxy(); - } - - public RetryPolicy getSCMBlockLocationRetryPolicy() { - return new RetryPolicy() { - @Override - public RetryAction shouldRetry(Exception e, int retry, - int failover, boolean b) { - if (SCMHAUtils.checkRetriableWithNoFailoverException(e)) { - setUpdatedLeaderNodeID(); - } else { - performFailoverToAssignedLeader(null, e); - } - return SCMHAUtils.getRetryAction(failover, retry, e, maxRetryCount, - getRetryInterval()); - } - }; - } - - public synchronized int getCurrentProxyIndex() { - return currentProxyIndex; - } - - public synchronized void setUpdatedLeaderNodeID() { - this.updatedLeaderNodeID = getCurrentProxySCMNodeId(); + protected String getProtocolAddress(SCMNodeInfo scmNodeInfo) { + return scmNodeInfo.getBlockClientAddress(); } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMContainerLocationFailoverProxyProvider.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMContainerLocationFailoverProxyProvider.java index 6e1d25bb0ffe..d0210128f0f0 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMContainerLocationFailoverProxyProvider.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMContainerLocationFailoverProxyProvider.java @@ -17,31 +17,9 @@ package org.apache.hadoop.hdds.scm.proxy; -import com.google.common.annotations.VisibleForTesting; -import java.io.Closeable; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.stream.Collectors; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.conf.ConfigurationException; import org.apache.hadoop.hdds.conf.ConfigurationSource; -import org.apache.hadoop.hdds.ratis.ServerNotLeaderException; -import org.apache.hadoop.hdds.scm.ha.SCMHAUtils; import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo; import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB; -import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource; -import org.apache.hadoop.io.retry.FailoverProxyProvider; -import org.apache.hadoop.io.retry.RetryPolicies; -import org.apache.hadoop.io.retry.RetryPolicy; -import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,261 +27,28 @@ /** * Failover proxy provider for StorageContainerLocationProtocolPB. */ -public class SCMContainerLocationFailoverProxyProvider implements - FailoverProxyProvider, Closeable { +public class SCMContainerLocationFailoverProxyProvider extends + SCMFailoverProxyProviderBase { public static final Logger LOG = LoggerFactory.getLogger(SCMContainerLocationFailoverProxyProvider.class); - // scmNodeId -> ProxyInfo - private final Map> scmProxies; - // scmNodeId -> SCMProxyInfo - private final Map scmProxyInfoMap; - private List scmNodeIds; - - // As SCM Client is shared across threads, performFailOver() - // updates the currentProxySCMNodeId based on the updateLeaderNodeId which is - // updated in shouldRetry(). When 2 or more threads run in parallel, the - // RetryInvocationHandler will check the expectedFailOverCount - // and not execute performFailOver() for one of them. So the other thread(s) - // shall not call performFailOver(), it will call getProxy() which uses - // currentProxySCMNodeId and returns the proxy. - private volatile String currentProxySCMNodeId; - private volatile int currentProxyIndex; - - private final ConfigurationSource conf; - private final SCMClientConfig scmClientConfig; - private final long scmVersion; - - private String scmServiceId; - - private final int maxRetryCount; - private final long retryInterval; - - private final UserGroupInformation ugi; - - private String updatedLeaderNodeID = null; - /** * Construct SCMContainerLocationFailoverProxyProvider. * If userGroupInformation is not null, use the passed ugi, else obtain * from {@link UserGroupInformation#getCurrentUser()} - * @param conf - * @param userGroupInformation */ public SCMContainerLocationFailoverProxyProvider(ConfigurationSource conf, UserGroupInformation userGroupInformation) { - this.conf = conf; - - if (userGroupInformation == null) { - try { - this.ugi = UserGroupInformation.getCurrentUser(); - } catch (IOException ex) { - LOG.error("Unable to fetch user credentials from UGI", ex); - throw new RuntimeException(ex); - } - } else { - this.ugi = userGroupInformation; - } - this.scmVersion = RPC.getProtocolVersion( - StorageContainerLocationProtocolPB.class); - - this.scmProxies = new HashMap<>(); - this.scmProxyInfoMap = new HashMap<>(); - loadConfigs(); - - this.currentProxyIndex = 0; - currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex); - scmClientConfig = conf.getObject(SCMClientConfig.class); - this.maxRetryCount = scmClientConfig.getRetryCount(); - this.retryInterval = scmClientConfig.getRetryInterval(); - } - - @VisibleForTesting - protected synchronized void loadConfigs() { - List scmNodeInfoList = SCMNodeInfo.buildNodeInfo(conf); - - scmNodeIds = new ArrayList<>(); - - for (SCMNodeInfo scmNodeInfo : scmNodeInfoList) { - if (scmNodeInfo.getScmClientAddress() == null) { - throw new ConfigurationException("SCM Client Address could not " + - "be obtained from config. Config is not properly defined"); - } else { - InetSocketAddress scmClientAddress = - NetUtils.createSocketAddr(scmNodeInfo.getScmClientAddress()); - - scmServiceId = scmNodeInfo.getServiceId(); - String scmNodeId = scmNodeInfo.getNodeId(); - - scmNodeIds.add(scmNodeId); - SCMProxyInfo scmProxyInfo = new SCMProxyInfo(scmServiceId, scmNodeId, - scmClientAddress); - scmProxyInfoMap.put(scmNodeId, scmProxyInfo); - } - } - } - - @VisibleForTesting - public synchronized String getCurrentProxySCMNodeId() { - return currentProxySCMNodeId; - } - - @VisibleForTesting - public synchronized void changeCurrentProxy(String nodeId) { - currentProxyIndex = scmNodeIds.indexOf(nodeId); - currentProxySCMNodeId = nodeId; - nextProxyIndex(); + super(StorageContainerLocationProtocolPB.class, conf, userGroupInformation); } @Override - public synchronized ProxyInfo getProxy() { - ProxyInfo currentProxyInfo = scmProxies.get(getCurrentProxySCMNodeId()); - if (currentProxyInfo == null) { - currentProxyInfo = createSCMProxy(getCurrentProxySCMNodeId()); - } - return currentProxyInfo; - } - - public synchronized List getProxies() { - for (SCMProxyInfo scmProxyInfo : scmProxyInfoMap.values()) { - if (scmProxies.get(scmProxyInfo.getNodeId()) == null) { - scmProxies.put(scmProxyInfo.getNodeId(), - createSCMProxy(scmProxyInfo.getNodeId())); - } - } - return scmProxies.values().stream() - .map(proxyInfo -> proxyInfo.proxy).collect(Collectors.toList()); + protected Logger getLogger() { + return LOG; } @Override - public synchronized void performFailover( - StorageContainerLocationProtocolPB newLeader) { - if (updatedLeaderNodeID != null) { - currentProxySCMNodeId = updatedLeaderNodeID; - } else { - nextProxyIndex(); - } - LOG.debug("Failing over to next proxy. {}", getCurrentProxySCMNodeId()); - } - - public synchronized void performFailoverToAssignedLeader(String newLeader, - Exception e) { - ServerNotLeaderException snle = - (ServerNotLeaderException) SCMHAUtils.getServerNotLeaderException(e); - if (snle != null && snle.getSuggestedLeader() != null) { - Optional matchedProxyInfo = - scmProxyInfoMap.values().stream().filter( - proxyInfo -> NetUtils.getHostPortString(proxyInfo.getAddress()) - .equals(snle.getSuggestedLeader())).findFirst(); - if (matchedProxyInfo.isPresent()) { - newLeader = matchedProxyInfo.get().getNodeId(); - LOG.debug("Performing failover to suggested leader {}, nodeId {}", - snle.getSuggestedLeader(), newLeader); - } else { - LOG.debug("Suggested leader {} does not match with any of the " + - "proxyInfo adress {}", snle.getSuggestedLeader(), - Arrays.toString(scmProxyInfoMap.values().toArray())); - } - } - assignLeaderToNode(newLeader); - } - - @Override - public Class getInterface() { - return StorageContainerLocationProtocolPB.class; - } - - @Override - public synchronized void close() throws IOException { - for (ProxyInfo - proxy : scmProxies.values()) { - StorageContainerLocationProtocolPB scmProxy = - proxy.proxy; - if (scmProxy != null) { - RPC.stopProxy(scmProxy); - } - } - } - - private long getRetryInterval() { - // TODO add exponential backup - return retryInterval; - } - - private synchronized void nextProxyIndex() { - // round robin the next proxy - currentProxyIndex = (currentProxyIndex + 1) % scmProxyInfoMap.size(); - currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex); - } - - private synchronized void assignLeaderToNode(String newLeaderNodeId) { - if (!currentProxySCMNodeId.equals(newLeaderNodeId)) { - if (scmProxyInfoMap.containsKey(newLeaderNodeId)) { - updatedLeaderNodeID = newLeaderNodeId; - LOG.debug("Updated LeaderNodeID {}", updatedLeaderNodeID); - } else { - updatedLeaderNodeID = null; - } - } - } - - /** - * Creates proxy object. - */ - private ProxyInfo createSCMProxy(String nodeId) { - ProxyInfo proxyInfo; - SCMProxyInfo scmProxyInfo = scmProxyInfoMap.get(nodeId); - InetSocketAddress address = scmProxyInfo.getAddress(); - try { - StorageContainerLocationProtocolPB scmProxy = createSCMProxy(address); - // Create proxyInfo here, to make it work with all Hadoop versions. - proxyInfo = new ProxyInfo<>(scmProxy, scmProxyInfo.toString()); - scmProxies.put(nodeId, proxyInfo); - return proxyInfo; - } catch (IOException ioe) { - LOG.error("{} Failed to create RPC proxy to SCM at {}", - this.getClass().getSimpleName(), address, ioe); - throw new RuntimeException(ioe); - } - } - - - private StorageContainerLocationProtocolPB createSCMProxy( - InetSocketAddress scmAddress) throws IOException { - Configuration hadoopConf = - LegacyHadoopConfigurationSource.asHadoopConfiguration(conf); - RPC.setProtocolEngine(hadoopConf, StorageContainerLocationProtocolPB.class, - ProtobufRpcEngine.class); - // FailoverOnNetworkException ensures that the IPC layer does not attempt - // retries on the same OM in case of connection exception. This retry - // policy essentially results in TRY_ONCE_THEN_FAIL. - RetryPolicy connectionRetryPolicy = RetryPolicies - .failoverOnNetworkException(0); - return RPC.getProtocolProxy( - StorageContainerLocationProtocolPB.class, - scmVersion, scmAddress, ugi, - hadoopConf, NetUtils.getDefaultSocketFactory(hadoopConf), - (int)scmClientConfig.getRpcTimeOut(), connectionRetryPolicy).getProxy(); - } - - public RetryPolicy getRetryPolicy() { - return new RetryPolicy() { - @Override - public RetryAction shouldRetry(Exception e, int retry, - int failover, boolean b) { - if (SCMHAUtils.checkRetriableWithNoFailoverException(e)) { - setUpdatedLeaderNodeID(); - } else { - performFailoverToAssignedLeader(null, e); - } - return SCMHAUtils.getRetryAction(failover, retry, e, maxRetryCount, - getRetryInterval()); - } - }; - } - - public synchronized void setUpdatedLeaderNodeID() { - this.updatedLeaderNodeID = getCurrentProxySCMNodeId(); + protected String getProtocolAddress(SCMNodeInfo scmNodeInfo) { + return scmNodeInfo.getScmClientAddress(); } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMFailoverProxyProviderBase.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMFailoverProxyProviderBase.java new file mode 100644 index 000000000000..76315726957b --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMFailoverProxyProviderBase.java @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.proxy; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.conf.ConfigurationException; +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.ratis.ServerNotLeaderException; +import org.apache.hadoop.hdds.scm.ha.SCMHAUtils; +import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo; +import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource; +import org.apache.hadoop.io.retry.FailoverProxyProvider; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; + +/** + * A failover proxy provider base abstract class. + * Provides common methods for failover proxy provider + * implementations. Failover proxy provider allows clients to configure + * multiple OMs to connect to. In case of SCM failover, client can try + * connecting to another SCM node from the list of proxies. + */ +public abstract class SCMFailoverProxyProviderBase implements FailoverProxyProvider { + + private final Logger logger; + + private final SCMClientConfig scmClientConfig; + + private final Class protocolClass; + + // scmNodeId -> ProxyInfo + private final Map> scmProxies; + // scmNodeId -> SCMProxyInfo + private final Map scmProxyInfoMap; + private List scmNodeIds; + + // As SCM Client is shared across threads, performFailOver() + // updates the currentProxySCMNodeId based on the updateLeaderNodeId which is + // updated in shouldRetry(). When 2 or more threads run in parallel, the + // RetryInvocationHandler will check the expectedFailOverCount + // and not execute performFailOver() for one of them. So the other thread(s) + // shall not call performFailOver(), it will call getProxy() which uses + // currentProxySCMNodeId and returns the proxy. + private volatile String currentProxySCMNodeId; + private volatile int currentProxyIndex; + + private final ConfigurationSource conf; + private final long scmVersion; + + private final int maxRetryCount; + private final long retryInterval; + + private final UserGroupInformation ugi; + + private String updatedLeaderNodeID = null; + + /** + * Construct SCMFailoverProxyProviderBase. + * If userGroupInformation is not null, use the passed ugi, else obtain + * from {@link UserGroupInformation#getCurrentUser()} + */ + public SCMFailoverProxyProviderBase(Class protocol, ConfigurationSource conf, + UserGroupInformation userGroupInformation) { + this.logger = getLogger(); + Preconditions.checkNotNull(logger); + this.protocolClass = protocol; + this.conf = conf; + + if (userGroupInformation == null) { + try { + this.ugi = UserGroupInformation.getCurrentUser(); + } catch (IOException ex) { + logger.error("Unable to fetch user credentials from UGI", ex); + throw new RuntimeException(ex); + } + } else { + this.ugi = userGroupInformation; + } + this.scmVersion = RPC.getProtocolVersion(protocol); + + this.scmProxies = new HashMap<>(); + this.scmProxyInfoMap = new HashMap<>(); + loadConfigs(); + + this.currentProxyIndex = 0; + currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex); + + scmClientConfig = conf.getObject(SCMClientConfig.class); + this.maxRetryCount = scmClientConfig.getRetryCount(); + this.retryInterval = scmClientConfig.getRetryInterval(); + + logger.info("Created fail-over proxy for protocol {} with {} nodes: {}", protocol.getSimpleName(), + scmNodeIds.size(), scmProxyInfoMap.values()); + } + + /** + * Get the logger implementation for the specific protocol's failover proxy provider. + */ + protected abstract Logger getLogger(); + + /** + * Get the specific protocol address from {@link SCMNodeInfo}. + * @param scmNodeInfo SCM node info which contains different protocols' address. + * @return protocol address. + */ + protected abstract String getProtocolAddress(SCMNodeInfo scmNodeInfo); + + /** + * Get the SCM node ID the current proxy is pointing to. + * This can be overridden with a single SCM node ID to disable SCM failover. + * See {@link SingleSecretKeyProtocolProxyProvider} + * @return current proxy's SCM Node ID. + */ + protected synchronized String getCurrentProxySCMNodeId() { + return currentProxySCMNodeId; + } + + @VisibleForTesting + protected synchronized void loadConfigs() { + List scmNodeInfoList = SCMNodeInfo.buildNodeInfo(conf); + scmNodeIds = new ArrayList<>(); + + + for (SCMNodeInfo scmNodeInfo : scmNodeInfoList) { + String protocolAddress = getProtocolAddress(scmNodeInfo); + if (protocolAddress == null) { + throw new ConfigurationException(protocolClass.getSimpleName() + " SCM Address could not " + + "be obtained from config. Config is not properly defined"); + } else { + InetSocketAddress protocolAddr = NetUtils.createSocketAddr(protocolAddress); + + String scmServiceId = scmNodeInfo.getServiceId(); + String scmNodeId = scmNodeInfo.getNodeId(); + scmNodeIds.add(scmNodeId); + SCMProxyInfo scmProxyInfo = new SCMProxyInfo(scmServiceId, scmNodeId, protocolAddr); + scmProxyInfoMap.put(scmNodeId, scmProxyInfo); + } + } + } + + + @VisibleForTesting + public synchronized void changeCurrentProxy(String nodeId) { + currentProxyIndex = scmNodeIds.indexOf(nodeId); + currentProxySCMNodeId = nodeId; + nextProxyIndex(); + } + + @Override + public synchronized ProxyInfo getProxy() { + ProxyInfo currentProxyInfo = scmProxies.get(getCurrentProxySCMNodeId()); + if (currentProxyInfo == null) { + currentProxyInfo = createSCMProxy(getCurrentProxySCMNodeId()); + } + return currentProxyInfo; + } + + public synchronized List getProxies() { + for (SCMProxyInfo scmProxyInfo : scmProxyInfoMap.values()) { + if (scmProxies.get(scmProxyInfo.getNodeId()) == null) { + scmProxies.put(scmProxyInfo.getNodeId(), + createSCMProxy(scmProxyInfo.getNodeId())); + } + } + return scmProxies.values().stream() + .map(proxyInfo -> proxyInfo.proxy).collect(Collectors.toList()); + } + + @Override + public synchronized void performFailover(T newLeader) { + if (updatedLeaderNodeID != null) { + currentProxySCMNodeId = updatedLeaderNodeID; + } else { + nextProxyIndex(); + } + logger.debug("Failing over to next proxy. {}", getCurrentProxySCMNodeId()); + } + + public synchronized void performFailoverToAssignedLeader(String newLeader, + Exception e) { + ServerNotLeaderException snle = + (ServerNotLeaderException) SCMHAUtils.getServerNotLeaderException(e); + if (snle != null && snle.getSuggestedLeader() != null) { + Optional matchedProxyInfo = + scmProxyInfoMap.values().stream().filter( + proxyInfo -> NetUtils.getHostPortString(proxyInfo.getAddress()) + .equals(snle.getSuggestedLeader())).findFirst(); + if (matchedProxyInfo.isPresent()) { + newLeader = matchedProxyInfo.get().getNodeId(); + logger.debug("Performing failover to suggested leader {}, nodeId {}", + snle.getSuggestedLeader(), newLeader); + } else { + logger.debug("Suggested leader {} does not match with any of the " + + "proxyInfo address {}", snle.getSuggestedLeader(), + Arrays.toString(scmProxyInfoMap.values().toArray())); + } + } + assignLeaderToNode(newLeader); + } + + @Override + public Class getInterface() { + return protocolClass; + } + + public List getSCMNodeIds() { + return Collections.unmodifiableList(scmNodeIds); + } + + public Collection getSCMProxyInfoList() { + return Collections.unmodifiableCollection(scmProxyInfoMap.values()); + } + + @Override + public synchronized void close() throws IOException { + for (ProxyInfo proxy : scmProxies.values()) { + T scmProxy = proxy.proxy; + if (scmProxy != null) { + RPC.stopProxy(scmProxy); + } + } + } + + private long getRetryInterval() { + // TODO add exponential backup + return retryInterval; + } + + private synchronized void nextProxyIndex() { + // round robin the next proxy + currentProxyIndex = (currentProxyIndex + 1) % scmProxyInfoMap.size(); + currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex); + } + + private synchronized void assignLeaderToNode(String newLeaderNodeId) { + if (!currentProxySCMNodeId.equals(newLeaderNodeId)) { + if (scmProxyInfoMap.containsKey(newLeaderNodeId)) { + updatedLeaderNodeID = newLeaderNodeId; + logger.debug("Updated LeaderNodeID {}", updatedLeaderNodeID); + } else { + updatedLeaderNodeID = null; + } + } + } + + /** + * Creates proxy object. + */ + private ProxyInfo createSCMProxy(String nodeId) { + ProxyInfo proxyInfo; + SCMProxyInfo scmProxyInfo = scmProxyInfoMap.get(nodeId); + InetSocketAddress address = scmProxyInfo.getAddress(); + try { + T scmProxy = createSCMProxy(address); + // Create proxyInfo here, to make it work with all Hadoop versions. + proxyInfo = new ProxyInfo(scmProxy, scmProxyInfo.toString()); + scmProxies.put(nodeId, proxyInfo); + return proxyInfo; + } catch (IOException ioe) { + logger.error("{} Failed to create RPC proxy to SCM at {}", + this.getClass().getSimpleName(), address, ioe); + throw new RuntimeException(ioe); + } + } + + private T createSCMProxy(InetSocketAddress scmAddress) throws IOException { + Configuration hadoopConf = + LegacyHadoopConfigurationSource.asHadoopConfiguration(conf); + RPC.setProtocolEngine(hadoopConf, protocolClass, ProtobufRpcEngine.class); + // FailoverOnNetworkException ensures that the IPC layer does not attempt + // retries on the same OM in case of connection exception. This retry + // policy essentially results in TRY_ONCE_THEN_FAIL. + RetryPolicy connectionRetryPolicy = RetryPolicies.failoverOnNetworkException(0); + return RPC.getProtocolProxy( + protocolClass, + scmVersion, scmAddress, ugi, + hadoopConf, NetUtils.getDefaultSocketFactory(hadoopConf), + (int)scmClientConfig.getRpcTimeOut(), connectionRetryPolicy).getProxy(); + } + + public RetryPolicy getRetryPolicy() { + return new RetryPolicy() { + @Override + public RetryAction shouldRetry(Exception e, int retry, + int failover, boolean b) { + if (logger.isDebugEnabled()) { + if (e.getCause() != null) { + logger.debug("RetryProxy: SCM Server {}: {}: {}", + getCurrentProxySCMNodeId(), + e.getCause().getClass().getSimpleName(), + e.getCause().getMessage()); + } else { + logger.debug("RetryProxy: SCM {}: {}", getCurrentProxySCMNodeId(), + e.getMessage()); + } + } + + if (SCMHAUtils.checkRetriableWithNoFailoverException(e)) { + setUpdatedLeaderNodeID(); + } else { + performFailoverToAssignedLeader(null, e); + } + return SCMHAUtils.getRetryAction(failover, retry, e, maxRetryCount, + getRetryInterval()); + } + }; + } + + public synchronized void setUpdatedLeaderNodeID() { + this.updatedLeaderNodeID = getCurrentProxySCMNodeId(); + } +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMSecurityProtocolFailoverProxyProvider.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMSecurityProtocolFailoverProxyProvider.java index c7b50af67af9..60e07b90b417 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMSecurityProtocolFailoverProxyProvider.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMSecurityProtocolFailoverProxyProvider.java @@ -17,30 +17,9 @@ package org.apache.hadoop.hdds.scm.proxy; -import com.google.common.base.Preconditions; -import java.io.Closeable; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.conf.ConfigurationException; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB; -import org.apache.hadoop.hdds.ratis.ServerNotLeaderException; -import org.apache.hadoop.hdds.scm.ha.SCMHAUtils; import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo; -import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource; -import org.apache.hadoop.io.retry.FailoverProxyProvider; -import org.apache.hadoop.io.retry.RetryPolicies; -import org.apache.hadoop.io.retry.RetryPolicy; -import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,260 +27,26 @@ /** * Failover proxy provider for SCMSecurityProtocol server. */ -public class SCMSecurityProtocolFailoverProxyProvider implements - FailoverProxyProvider, Closeable { +public class SCMSecurityProtocolFailoverProxyProvider extends SCMFailoverProxyProviderBase { public static final Logger LOG = LoggerFactory.getLogger(SCMSecurityProtocolFailoverProxyProvider.class); - // scmNodeId -> ProxyInfo - private final Map> scmProxies; - - // scmNodeId -> SCMProxyInfo - private final Map scmProxyInfoMap; - - private List scmNodeIds; - - // As SCM Client is shared across threads, performFailOver() - // updates the currentProxySCMNodeId based on the updateLeaderNodeId which is - // updated in shouldRetry(). When 2 or more threads run in parallel, the - // RetryInvocationHandler will check the expectedFailOverCount - // and not execute performFailOver() for one of them. So the other thread(s) - // shall not call performFailOver(), it will call getProxy() which uses - // currentProxySCMNodeId and returns the proxy. - private volatile String currentProxySCMNodeId; - private volatile int currentProxyIndex; - - - private final ConfigurationSource conf; - private final SCMClientConfig scmClientConfig; - private final long scmVersion; - - private String scmServiceId; - - private final int maxRetryCount; - private final long retryInterval; - - private final UserGroupInformation ugi; - - private String updatedLeaderNodeID = null; - /** * Construct fail-over proxy provider for SCMSecurityProtocol Server. - * @param conf - * @param userGroupInformation */ public SCMSecurityProtocolFailoverProxyProvider(ConfigurationSource conf, UserGroupInformation userGroupInformation) { - Preconditions.checkNotNull(userGroupInformation); - this.ugi = userGroupInformation; - this.conf = conf; - this.scmVersion = RPC.getProtocolVersion(SCMSecurityProtocolPB.class); - - this.scmProxies = new HashMap<>(); - this.scmProxyInfoMap = new HashMap<>(); - loadConfigs(); - - this.currentProxyIndex = 0; - currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex); - scmClientConfig = conf.getObject(SCMClientConfig.class); - this.maxRetryCount = scmClientConfig.getRetryCount(); - this.retryInterval = scmClientConfig.getRetryInterval(); - } - - protected synchronized void loadConfigs() { - List scmNodeInfoList = SCMNodeInfo.buildNodeInfo(conf); - scmNodeIds = new ArrayList<>(); - - for (SCMNodeInfo scmNodeInfo : scmNodeInfoList) { - if (scmNodeInfo.getScmSecurityAddress() == null) { - throw new ConfigurationException("SCM Client Address could not " + - "be obtained from config. Config is not properly defined"); - } else { - InetSocketAddress scmSecurityAddress = - NetUtils.createSocketAddr(scmNodeInfo.getScmSecurityAddress()); - - scmServiceId = scmNodeInfo.getServiceId(); - String scmNodeId = scmNodeInfo.getNodeId(); - - scmNodeIds.add(scmNodeId); - SCMProxyInfo scmProxyInfo = new SCMProxyInfo(scmServiceId, scmNodeId, - scmSecurityAddress); - scmProxyInfoMap.put(scmNodeId, scmProxyInfo); - } - } + super(SCMSecurityProtocolPB.class, conf, userGroupInformation); } @Override - public synchronized ProxyInfo getProxy() { - ProxyInfo currentProxyInfo = scmProxies.get(getCurrentProxySCMNodeId()); - if (currentProxyInfo == null) { - currentProxyInfo = createSCMProxy(getCurrentProxySCMNodeId()); - } - return currentProxyInfo; - } - - /** - * Creates proxy object. - */ - private ProxyInfo createSCMProxy(String nodeId) { - ProxyInfo proxyInfo; - SCMProxyInfo scmProxyInfo = scmProxyInfoMap.get(nodeId); - InetSocketAddress address = scmProxyInfo.getAddress(); - try { - SCMSecurityProtocolPB scmProxy = createSCMProxy(address); - // Create proxyInfo here, to make it work with all Hadoop versions. - proxyInfo = new ProxyInfo<>(scmProxy, scmProxyInfo.toString()); - scmProxies.put(nodeId, proxyInfo); - return proxyInfo; - } catch (IOException ioe) { - LOG.error("{} Failed to create RPC proxy to SCM at {}", - this.getClass().getSimpleName(), address, ioe); - throw new RuntimeException(ioe); - } + protected Logger getLogger() { + return LOG; } - private SCMSecurityProtocolPB createSCMProxy(InetSocketAddress scmAddress) - throws IOException { - Configuration hadoopConf = - LegacyHadoopConfigurationSource.asHadoopConfiguration(conf); - RPC.setProtocolEngine(hadoopConf, SCMSecurityProtocolPB.class, - ProtobufRpcEngine.class); - - // FailoverOnNetworkException ensures that the IPC layer does not attempt - // retries on the same SCM in case of connection exception. This retry - // policy essentially results in TRY_ONCE_THEN_FAIL. - - RetryPolicy connectionRetryPolicy = RetryPolicies - .failoverOnNetworkException(0); - - return RPC.getProtocolProxy(SCMSecurityProtocolPB.class, - scmVersion, scmAddress, ugi, - hadoopConf, NetUtils.getDefaultSocketFactory(hadoopConf), - (int)scmClientConfig.getRpcTimeOut(), connectionRetryPolicy).getProxy(); - } - - @Override - public synchronized void performFailover(SCMSecurityProtocolPB currentProxy) { - if (updatedLeaderNodeID != null) { - currentProxySCMNodeId = updatedLeaderNodeID; - } else { - nextProxyIndex(); - } - LOG.debug("Failing over to next proxy. {}", getCurrentProxySCMNodeId()); - } - - public synchronized void performFailoverToAssignedLeader(String newLeader, - Exception e) { - ServerNotLeaderException snle = - (ServerNotLeaderException) SCMHAUtils.getServerNotLeaderException(e); - if (snle != null && snle.getSuggestedLeader() != null) { - Optional< SCMProxyInfo > matchedProxyInfo = - scmProxyInfoMap.values().stream().filter( - proxyInfo -> NetUtils.getHostPortString(proxyInfo.getAddress()) - .equals(snle.getSuggestedLeader())).findFirst(); - if (matchedProxyInfo.isPresent()) { - newLeader = matchedProxyInfo.get().getNodeId(); - LOG.debug("Performing failover to suggested leader {}, nodeId {}", - snle.getSuggestedLeader(), newLeader); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Suggested leader {} does not match with any of the " + - "proxyInfo adress {}", snle.getSuggestedLeader(), - Arrays.toString(scmProxyInfoMap.values().toArray())); - } - } - } - assignLeaderToNode(newLeader); - } - - - private synchronized void assignLeaderToNode(String newLeaderNodeId) { - if (!currentProxySCMNodeId.equals(newLeaderNodeId)) { - if (scmProxyInfoMap.containsKey(newLeaderNodeId)) { - updatedLeaderNodeID = newLeaderNodeId; - LOG.debug("Updated LeaderNodeID {}", updatedLeaderNodeID); - } else { - updatedLeaderNodeID = null; - } - } - } - - /** - * Update the proxy index to the next proxy in the list. - * @return the new proxy index - */ - private synchronized void nextProxyIndex() { - // round robin the next proxy - currentProxyIndex = (getCurrentProxyIndex() + 1) % scmProxyInfoMap.size(); - currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex); - } - - public RetryPolicy getRetryPolicy() { - // Client will attempt up to maxFailovers number of failovers between - // available SCMs before throwing exception. - RetryPolicy retryPolicy = new RetryPolicy() { - @Override - public RetryAction shouldRetry(Exception exception, int retries, - int failovers, boolean isIdempotentOrAtMostOnce) - throws Exception { - - if (LOG.isDebugEnabled()) { - if (exception.getCause() != null) { - LOG.debug("RetryProxy: SCM Security Server {}: {}: {}", - getCurrentProxySCMNodeId(), - exception.getCause().getClass().getSimpleName(), - exception.getCause().getMessage()); - } else { - LOG.debug("RetryProxy: SCM {}: {}", getCurrentProxySCMNodeId(), - exception.getMessage()); - } - } - - if (SCMHAUtils.checkRetriableWithNoFailoverException(exception)) { - setUpdatedLeaderNodeID(); - } else { - performFailoverToAssignedLeader(null, exception); - } - return SCMHAUtils - .getRetryAction(failovers, retries, exception, maxRetryCount, - getRetryInterval()); - } - }; - - return retryPolicy; - } - - public synchronized void setUpdatedLeaderNodeID() { - this.updatedLeaderNodeID = getCurrentProxySCMNodeId(); - } - - @Override - public Class< SCMSecurityProtocolPB > getInterface() { - return SCMSecurityProtocolPB.class; - } - - @Override - public synchronized void close() throws IOException { - for (ProxyInfo proxyInfo : scmProxies.values()) { - SCMSecurityProtocolPB proxy = proxyInfo.proxy; - if (proxy != null) { - RPC.stopProxy(proxy); - } - } - } - - public synchronized String getCurrentProxySCMNodeId() { - return currentProxySCMNodeId; - } - - public synchronized int getCurrentProxyIndex() { - return currentProxyIndex; - } - - private long getRetryInterval() { - return retryInterval; + protected String getProtocolAddress(SCMNodeInfo scmNodeInfo) { + return scmNodeInfo.getScmSecurityAddress(); } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SecretKeyProtocolFailoverProxyProvider.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SecretKeyProtocolFailoverProxyProvider.java index 25f5b5524e40..98ff404cda4c 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SecretKeyProtocolFailoverProxyProvider.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SecretKeyProtocolFailoverProxyProvider.java @@ -17,30 +17,9 @@ package org.apache.hadoop.hdds.scm.proxy; -import com.google.common.base.Preconditions; -import java.io.Closeable; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.conf.ConfigurationException; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos.SCMSecretKeyProtocolService; -import org.apache.hadoop.hdds.ratis.ServerNotLeaderException; -import org.apache.hadoop.hdds.scm.ha.SCMHAUtils; import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo; -import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource; -import org.apache.hadoop.io.retry.FailoverProxyProvider; -import org.apache.hadoop.io.retry.RetryPolicies; -import org.apache.hadoop.io.retry.RetryPolicy; -import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,255 +27,27 @@ /** * Failover proxy provider for SCMSecretKeyProtocolService server. */ -public class SecretKeyProtocolFailoverProxyProvider - implements - FailoverProxyProvider, Closeable { +public class SecretKeyProtocolFailoverProxyProvider< + T extends SCMSecretKeyProtocolService.BlockingInterface> extends SCMFailoverProxyProviderBase { public static final Logger LOG = LoggerFactory.getLogger(SecretKeyProtocolFailoverProxyProvider.class); - // scmNodeId -> ProxyInfo - private final Map> scmProxies; - - // scmNodeId -> SCMProxyInfo - private final Map scmProxyInfoMap; - - private List scmNodeIds; - - // As SCM Client is shared across threads, performFailOver() - // updates the currentProxySCMNodeId based on the updateLeaderNodeId which is - // updated in shouldRetry(). When 2 or more threads run in parallel, the - // RetryInvocationHandler will check the expectedFailOverCount - // and not execute performFailOver() for one of them. So the other thread(s) - // shall not call performFailOver(), it will call getProxy() which uses - // currentProxySCMNodeId and returns the proxy. - private volatile String currentProxySCMNodeId; - private volatile int currentProxyIndex; - - - private final ConfigurationSource conf; - private final SCMClientConfig scmClientConfig; - private final long scmVersion; - - private String scmServiceId; - - private final int maxRetryCount; - private final long retryInterval; - - private final UserGroupInformation ugi; - private final Class proxyClazz; - - private String updatedLeaderNodeID = null; - /** * Construct fail-over proxy provider for SCMSecurityProtocol Server. - * @param conf - * @param userGroupInformation */ public SecretKeyProtocolFailoverProxyProvider(ConfigurationSource conf, UserGroupInformation userGroupInformation, Class proxyClazz) { - Preconditions.checkNotNull(userGroupInformation); - this.ugi = userGroupInformation; - this.conf = conf; - this.proxyClazz = proxyClazz; - this.scmVersion = RPC.getProtocolVersion(proxyClazz); - - this.scmProxies = new HashMap<>(); - this.scmProxyInfoMap = new HashMap<>(); - loadConfigs(); - - this.currentProxyIndex = 0; - currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex); - scmClientConfig = conf.getObject(SCMClientConfig.class); - this.maxRetryCount = scmClientConfig.getRetryCount(); - this.retryInterval = scmClientConfig.getRetryInterval(); - } - - protected synchronized void loadConfigs() { - List scmNodeInfoList = SCMNodeInfo.buildNodeInfo(conf); - scmNodeIds = new ArrayList<>(); - - for (SCMNodeInfo scmNodeInfo : scmNodeInfoList) { - if (scmNodeInfo.getScmSecurityAddress() == null) { - throw new ConfigurationException("SCM Client Address could not " + - "be obtained from config. Config is not properly defined"); - } else { - InetSocketAddress scmSecurityAddress = - NetUtils.createSocketAddr(scmNodeInfo.getScmSecurityAddress()); - - scmServiceId = scmNodeInfo.getServiceId(); - String scmNodeId = scmNodeInfo.getNodeId(); - - scmNodeIds.add(scmNodeId); - SCMProxyInfo scmProxyInfo = new SCMProxyInfo(scmServiceId, scmNodeId, - scmSecurityAddress); - scmProxyInfoMap.put(scmNodeId, scmProxyInfo); - } - } + super(proxyClazz, conf, userGroupInformation); } @Override - public synchronized ProxyInfo getProxy() { - ProxyInfo currentProxyInfo = scmProxies.get(getCurrentProxySCMNodeId()); - if (currentProxyInfo == null) { - currentProxyInfo = createSCMProxy(getCurrentProxySCMNodeId()); - } - return currentProxyInfo; - } - - /** - * Creates proxy object. - */ - private ProxyInfo createSCMProxy(String nodeId) { - ProxyInfo proxyInfo; - SCMProxyInfo scmProxyInfo = scmProxyInfoMap.get(nodeId); - InetSocketAddress address = scmProxyInfo.getAddress(); - try { - T scmProxy = createSCMProxy(address); - // Create proxyInfo here, to make it work with all Hadoop versions. - proxyInfo = new ProxyInfo(scmProxy, scmProxyInfo.toString()); - scmProxies.put(nodeId, proxyInfo); - return proxyInfo; - } catch (IOException ioe) { - LOG.error("{} Failed to create RPC proxy to SCM at {}", - this.getClass().getSimpleName(), address, ioe); - throw new RuntimeException(ioe); - } + protected Logger getLogger() { + return LOG; } - private T createSCMProxy(InetSocketAddress scmAddress) - throws IOException { - Configuration hadoopConf = - LegacyHadoopConfigurationSource.asHadoopConfiguration(conf); - RPC.setProtocolEngine(hadoopConf, proxyClazz, - ProtobufRpcEngine.class); - - // FailoverOnNetworkException ensures that the IPC layer does not attempt - // retries on the same SCM in case of connection exception. This retry - // policy essentially results in TRY_ONCE_THEN_FAIL. - - RetryPolicy connectionRetryPolicy = RetryPolicies - .failoverOnNetworkException(0); - - return RPC.getProtocolProxy(proxyClazz, - scmVersion, scmAddress, ugi, - hadoopConf, NetUtils.getDefaultSocketFactory(hadoopConf), - (int)scmClientConfig.getRpcTimeOut(), connectionRetryPolicy).getProxy(); - } - - @Override - public synchronized void performFailover(T currentProxy) { - if (updatedLeaderNodeID != null) { - currentProxySCMNodeId = updatedLeaderNodeID; - } else { - nextProxyIndex(); - } - LOG.debug("Failing over to next proxy. {}", getCurrentProxySCMNodeId()); - } - - public synchronized void performFailoverToAssignedLeader(String newLeader, - Exception e) { - ServerNotLeaderException snle = - (ServerNotLeaderException) SCMHAUtils.getServerNotLeaderException(e); - if (snle != null && snle.getSuggestedLeader() != null) { - Optional< SCMProxyInfo > matchedProxyInfo = - scmProxyInfoMap.values().stream().filter( - proxyInfo -> NetUtils.getHostPortString(proxyInfo.getAddress()) - .equals(snle.getSuggestedLeader())).findFirst(); - if (matchedProxyInfo.isPresent()) { - newLeader = matchedProxyInfo.get().getNodeId(); - LOG.debug("Performing failover to suggested leader {}, nodeId {}", - snle.getSuggestedLeader(), newLeader); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Suggested leader {} does not match with any of the " + - "proxyInfo adress {}", snle.getSuggestedLeader(), - Arrays.toString(scmProxyInfoMap.values().toArray())); - } - } - } - assignLeaderToNode(newLeader); - } - - - private synchronized void assignLeaderToNode(String newLeaderNodeId) { - if (!currentProxySCMNodeId.equals(newLeaderNodeId)) { - if (scmProxyInfoMap.containsKey(newLeaderNodeId)) { - updatedLeaderNodeID = newLeaderNodeId; - LOG.debug("Updated LeaderNodeID {}", updatedLeaderNodeID); - } else { - updatedLeaderNodeID = null; - } - } - } - - /** - * Update the proxy index to the next proxy in the list. - * @return the new proxy index - */ - private synchronized void nextProxyIndex() { - // round robin the next proxy - currentProxyIndex = (getCurrentProxyIndex() + 1) % scmProxyInfoMap.size(); - currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex); - } - - public RetryPolicy getRetryPolicy() { - // Client will attempt up to maxFailovers number of failovers between - // available SCMs before throwing exception. - - return (exception, retries, failovers, isIdempotentOrAtMostOnce) -> { - - if (LOG.isDebugEnabled()) { - if (exception.getCause() != null) { - LOG.debug("RetryProxy: SCM Security Server {}: {}: {}", - getCurrentProxySCMNodeId(), - exception.getCause().getClass().getSimpleName(), - exception.getCause().getMessage()); - } else { - LOG.debug("RetryProxy: SCM {}: {}", getCurrentProxySCMNodeId(), - exception.getMessage()); - } - } - - if (SCMHAUtils.checkRetriableWithNoFailoverException(exception)) { - setUpdatedLeaderNodeID(); - } else { - performFailoverToAssignedLeader(null, exception); - } - return SCMHAUtils - .getRetryAction(failovers, retries, exception, maxRetryCount, - getRetryInterval()); - }; - } - - public synchronized void setUpdatedLeaderNodeID() { - this.updatedLeaderNodeID = getCurrentProxySCMNodeId(); - } - - @Override - public Class getInterface() { - return proxyClazz; - } - - @Override - public synchronized void close() throws IOException { - for (ProxyInfo proxyInfo : scmProxies.values()) { - if (proxyInfo.proxy != null) { - RPC.stopProxy(proxyInfo.proxy); - } - } - } - - public synchronized String getCurrentProxySCMNodeId() { - return currentProxySCMNodeId; - } - - public synchronized int getCurrentProxyIndex() { - return currentProxyIndex; - } - - private long getRetryInterval() { - return retryInterval; + protected String getProtocolAddress(SCMNodeInfo scmNodeInfo) { + return scmNodeInfo.getScmSecurityAddress(); } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SingleSecretKeyProtocolProxyProvider.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SingleSecretKeyProtocolProxyProvider.java index c41e3627ea50..2cfa051a9364 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SingleSecretKeyProtocolProxyProvider.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SingleSecretKeyProtocolProxyProvider.java @@ -20,6 +20,8 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos.SCMSecretKeyProtocolService; import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Proxy provider for SCMSecretKeyProtocolService against a @@ -28,6 +30,10 @@ public class SingleSecretKeyProtocolProxyProvider extends SecretKeyProtocolFailoverProxyProvider { + + public static final Logger LOG = + LoggerFactory.getLogger(SingleSecretKeyProtocolProxyProvider.class); + private final String scmNodeId; public SingleSecretKeyProtocolProxyProvider( @@ -49,6 +55,11 @@ public synchronized void performFailover(T currentProxy) { // do nothing. } + @Override + protected Logger getLogger() { + return LOG; + } + @Override public synchronized void performFailoverToAssignedLeader(String newLeader, Exception e) { From e1ebb8aa39ccc0312721c3c7e211a439bd8ecb81 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Sat, 22 Feb 2025 23:34:47 +0800 Subject: [PATCH 2/4] Typo and styling --- .../hadoop/hdds/scm/proxy/SCMFailoverProxyProviderBase.java | 2 +- .../scm/proxy/SecretKeyProtocolFailoverProxyProvider.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMFailoverProxyProviderBase.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMFailoverProxyProviderBase.java index 76315726957b..2ed84f6ef188 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMFailoverProxyProviderBase.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMFailoverProxyProviderBase.java @@ -50,7 +50,7 @@ * A failover proxy provider base abstract class. * Provides common methods for failover proxy provider * implementations. Failover proxy provider allows clients to configure - * multiple OMs to connect to. In case of SCM failover, client can try + * multiple SCMs to connect to. In case of SCM failover, client can try * connecting to another SCM node from the list of proxies. */ public abstract class SCMFailoverProxyProviderBase implements FailoverProxyProvider { diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SecretKeyProtocolFailoverProxyProvider.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SecretKeyProtocolFailoverProxyProvider.java index 98ff404cda4c..0359bb062cd8 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SecretKeyProtocolFailoverProxyProvider.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SecretKeyProtocolFailoverProxyProvider.java @@ -27,8 +27,8 @@ /** * Failover proxy provider for SCMSecretKeyProtocolService server. */ -public class SecretKeyProtocolFailoverProxyProvider< - T extends SCMSecretKeyProtocolService.BlockingInterface> extends SCMFailoverProxyProviderBase { +public class SecretKeyProtocolFailoverProxyProvider + extends SCMFailoverProxyProviderBase { public static final Logger LOG = LoggerFactory.getLogger(SecretKeyProtocolFailoverProxyProvider.class); From dfdc86e61af297ca22febdf07f372244e2635e79 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Sat, 22 Feb 2025 23:50:03 +0800 Subject: [PATCH 3/4] Address comments --- .../hadoop/hdds/scm/ha/SCMNodeInfo.java | 2 ++ ...SCMBlockLocationFailoverProxyProvider.java | 2 -- .../proxy/SCMFailoverProxyProviderBase.java | 27 ++++++++----------- 3 files changed, 13 insertions(+), 18 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeInfo.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeInfo.java index e1302fbcf205..d797bd0a11bd 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeInfo.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeInfo.java @@ -40,6 +40,7 @@ import java.util.ArrayList; import java.util.List; import java.util.OptionalInt; +import net.jcip.annotations.Immutable; import org.apache.hadoop.hdds.conf.ConfigurationException; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.ozone.ha.ConfUtils; @@ -52,6 +53,7 @@ * This class is used by SCM clients like OzoneManager, Client, Admin * commands to figure out SCM Node Information to make contact to SCM. */ +@Immutable public class SCMNodeInfo { private static final Logger LOG = LoggerFactory.getLogger(SCMNodeInfo.class); diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java index 6eed475c7033..60e69997599e 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java @@ -33,8 +33,6 @@ public class SCMBlockLocationFailoverProxyProvider extends public SCMBlockLocationFailoverProxyProvider(ConfigurationSource conf) { super(ScmBlockLocationProtocolPB.class, conf, null); - LOG.info("Created block location fail-over proxy with {} nodes: {}", - getSCMNodeIds(), getSCMProxyInfoList()); } @Override diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMFailoverProxyProviderBase.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMFailoverProxyProviderBase.java index 2ed84f6ef188..3bf204cc34f3 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMFailoverProxyProviderBase.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMFailoverProxyProviderBase.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hdds.scm.proxy; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; @@ -55,8 +54,6 @@ */ public abstract class SCMFailoverProxyProviderBase implements FailoverProxyProvider { - private final Logger logger; - private final SCMClientConfig scmClientConfig; private final Class protocolClass; @@ -94,8 +91,6 @@ public abstract class SCMFailoverProxyProviderBase implements FailoverProxyPr */ public SCMFailoverProxyProviderBase(Class protocol, ConfigurationSource conf, UserGroupInformation userGroupInformation) { - this.logger = getLogger(); - Preconditions.checkNotNull(logger); this.protocolClass = protocol; this.conf = conf; @@ -103,7 +98,7 @@ public SCMFailoverProxyProviderBase(Class protocol, ConfigurationSource conf, try { this.ugi = UserGroupInformation.getCurrentUser(); } catch (IOException ex) { - logger.error("Unable to fetch user credentials from UGI", ex); + getLogger().error("Unable to fetch user credentials from UGI", ex); throw new RuntimeException(ex); } } else { @@ -122,7 +117,7 @@ public SCMFailoverProxyProviderBase(Class protocol, ConfigurationSource conf, this.maxRetryCount = scmClientConfig.getRetryCount(); this.retryInterval = scmClientConfig.getRetryInterval(); - logger.info("Created fail-over proxy for protocol {} with {} nodes: {}", protocol.getSimpleName(), + getLogger().info("Created fail-over proxy for protocol {} with {} nodes: {}", protocol.getSimpleName(), scmNodeIds.size(), scmProxyInfoMap.values()); } @@ -206,7 +201,7 @@ public synchronized void performFailover(T newLeader) { } else { nextProxyIndex(); } - logger.debug("Failing over to next proxy. {}", getCurrentProxySCMNodeId()); + getLogger().debug("Failing over to next proxy. {}", getCurrentProxySCMNodeId()); } public synchronized void performFailoverToAssignedLeader(String newLeader, @@ -220,10 +215,10 @@ public synchronized void performFailoverToAssignedLeader(String newLeader, .equals(snle.getSuggestedLeader())).findFirst(); if (matchedProxyInfo.isPresent()) { newLeader = matchedProxyInfo.get().getNodeId(); - logger.debug("Performing failover to suggested leader {}, nodeId {}", + getLogger().debug("Performing failover to suggested leader {}, nodeId {}", snle.getSuggestedLeader(), newLeader); } else { - logger.debug("Suggested leader {} does not match with any of the " + + getLogger().debug("Suggested leader {} does not match with any of the " + "proxyInfo address {}", snle.getSuggestedLeader(), Arrays.toString(scmProxyInfoMap.values().toArray())); } @@ -269,7 +264,7 @@ private synchronized void assignLeaderToNode(String newLeaderNodeId) { if (!currentProxySCMNodeId.equals(newLeaderNodeId)) { if (scmProxyInfoMap.containsKey(newLeaderNodeId)) { updatedLeaderNodeID = newLeaderNodeId; - logger.debug("Updated LeaderNodeID {}", updatedLeaderNodeID); + getLogger().debug("Updated LeaderNodeID {}", updatedLeaderNodeID); } else { updatedLeaderNodeID = null; } @@ -286,11 +281,11 @@ private ProxyInfo createSCMProxy(String nodeId) { try { T scmProxy = createSCMProxy(address); // Create proxyInfo here, to make it work with all Hadoop versions. - proxyInfo = new ProxyInfo(scmProxy, scmProxyInfo.toString()); + proxyInfo = new ProxyInfo<>(scmProxy, scmProxyInfo.toString()); scmProxies.put(nodeId, proxyInfo); return proxyInfo; } catch (IOException ioe) { - logger.error("{} Failed to create RPC proxy to SCM at {}", + getLogger().error("{} Failed to create RPC proxy to SCM at {}", this.getClass().getSimpleName(), address, ioe); throw new RuntimeException(ioe); } @@ -316,14 +311,14 @@ public RetryPolicy getRetryPolicy() { @Override public RetryAction shouldRetry(Exception e, int retry, int failover, boolean b) { - if (logger.isDebugEnabled()) { + if (getLogger().isDebugEnabled()) { if (e.getCause() != null) { - logger.debug("RetryProxy: SCM Server {}: {}: {}", + getLogger().debug("RetryProxy: SCM Server {}: {}: {}", getCurrentProxySCMNodeId(), e.getCause().getClass().getSimpleName(), e.getCause().getMessage()); } else { - logger.debug("RetryProxy: SCM {}: {}", getCurrentProxySCMNodeId(), + getLogger().debug("RetryProxy: SCM {}: {}", getCurrentProxySCMNodeId(), e.getMessage()); } } From db126c9ca3330f4e723d0e20335ee040fa64a41d Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Sat, 22 Feb 2025 23:57:07 +0800 Subject: [PATCH 4/4] Fix another typo --- .../hadoop/hdds/scm/proxy/SCMFailoverProxyProviderBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMFailoverProxyProviderBase.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMFailoverProxyProviderBase.java index 3bf204cc34f3..504504e597cb 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMFailoverProxyProviderBase.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMFailoverProxyProviderBase.java @@ -296,7 +296,7 @@ private T createSCMProxy(InetSocketAddress scmAddress) throws IOException { LegacyHadoopConfigurationSource.asHadoopConfiguration(conf); RPC.setProtocolEngine(hadoopConf, protocolClass, ProtobufRpcEngine.class); // FailoverOnNetworkException ensures that the IPC layer does not attempt - // retries on the same OM in case of connection exception. This retry + // retries on the same SCM in case of connection exception. This retry // policy essentially results in TRY_ONCE_THEN_FAIL. RetryPolicy connectionRetryPolicy = RetryPolicies.failoverOnNetworkException(0); return RPC.getProtocolProxy(