Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public class XceiverClientGrpc extends XceiverClientSpi {
private final long timeout;
private SecurityConfig secConfig;
private final boolean topologyAwareRead;
private X509Certificate caCert;
private List<X509Certificate> caCerts;
// Cache the DN which returned the GetBlock command so that the ReadChunk
// command can be sent to the same DN.
private Map<DatanodeBlockID, DatanodeDetails> getBlockDNcache;
Expand All @@ -93,10 +93,10 @@ public class XceiverClientGrpc extends XceiverClientSpi {
*
* @param pipeline - Pipeline that defines the machines.
* @param config -- Ozone Config
* @param caCert - SCM ca certificate.
* @param caCerts - SCM ca certificate.
*/
public XceiverClientGrpc(Pipeline pipeline, ConfigurationSource config,
X509Certificate caCert) {
List<X509Certificate> caCerts) {
super();
Preconditions.checkNotNull(pipeline);
Preconditions.checkNotNull(config);
Expand All @@ -114,7 +114,7 @@ public XceiverClientGrpc(Pipeline pipeline, ConfigurationSource config,
this.topologyAwareRead = config.getBoolean(
OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY,
OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_DEFAULT);
this.caCert = caCert;
this.caCerts = caCerts;
this.getBlockDNcache = new ConcurrentHashMap<>();
}

Expand Down Expand Up @@ -179,8 +179,8 @@ private synchronized void connectToDatanode(DatanodeDetails dn)
.intercept(new GrpcClientInterceptor());
if (secConfig.isGrpcTlsEnabled()) {
SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient();
if (caCert != null) {
sslContextBuilder.trustManager(caCert);
if (caCerts != null) {
sslContextBuilder.trustManager(caCerts);
}
if (secConfig.useTestCert()) {
channelBuilder.overrideAuthority("localhost");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import java.io.Closeable;
import java.io.IOException;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

Expand All @@ -31,8 +31,6 @@
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneSecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
Expand Down Expand Up @@ -68,7 +66,7 @@ public class XceiverClientManager implements Closeable, XceiverClientFactory {
//TODO : change this to SCM configuration class
private final ConfigurationSource conf;
private final Cache<String, XceiverClientSpi> clientCache;
private X509Certificate caCert;
private List<X509Certificate> caCerts;

private static XceiverClientMetrics metrics;
private boolean isSecurityEnabled;
Expand All @@ -86,20 +84,15 @@ public XceiverClientManager(ConfigurationSource conf) throws IOException {

public XceiverClientManager(ConfigurationSource conf,
ScmClientConfig clientConf,
String caCertPem) throws IOException {
List<X509Certificate> caCerts) throws IOException {
Preconditions.checkNotNull(clientConf);
Preconditions.checkNotNull(conf);
long staleThresholdMs = clientConf.getStaleThreshold(MILLISECONDS);
this.conf = conf;
this.isSecurityEnabled = OzoneSecurityUtil.isSecurityEnabled(conf);
if (isSecurityEnabled) {
Preconditions.checkNotNull(caCertPem);
try {
this.caCert = CertificateCodec.getX509Cert(caCertPem);
} catch (CertificateException ex) {
throw new SCMSecurityException("Error: Fail to get SCM CA certificate",
ex);
}
Preconditions.checkNotNull(caCerts);
this.caCerts = caCerts;
}

this.clientCache = CacheBuilder.newBuilder()
Expand Down Expand Up @@ -232,10 +225,10 @@ public XceiverClientSpi call() throws Exception {
switch (type) {
case RATIS:
client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf,
caCert);
caCerts);
break;
case STAND_ALONE:
client = new XceiverClientGrpc(pipeline, conf, caCert);
client = new XceiverClientGrpc(pipeline, conf, caCerts);
break;
case CHAINED:
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,13 @@ public static XceiverClientRatis newXceiverClientRatis(

public static XceiverClientRatis newXceiverClientRatis(
org.apache.hadoop.hdds.scm.pipeline.Pipeline pipeline,
ConfigurationSource ozoneConf, X509Certificate caCert) {
ConfigurationSource ozoneConf, List<X509Certificate> caCerts) {
final String rpcType = ozoneConf
.get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(new
SecurityConfig(ozoneConf), caCert);
SecurityConfig(ozoneConf), caCerts);
return new XceiverClientRatis(pipeline,
SupportedRpcType.valueOfIgnoreCase(rpcType),
retryPolicy, tlsConfig, ozoneConf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,11 +294,11 @@ private static Map<String, String> getDatanodeRatisPrefixProps(
// For External gRPC client to server with gRPC TLS.
// No mTLS for external client as SCM CA does not issued certificates for them
public static GrpcTlsConfig createTlsClientConfig(SecurityConfig conf,
X509Certificate caCert) {
List<X509Certificate> caCerts) {
GrpcTlsConfig tlsConfig = null;
if (conf.isSecurityEnabled() && conf.isGrpcTlsEnabled()) {
tlsConfig = new GrpcTlsConfig(null, null,
caCert, false);
caCerts, false);
}
return tlsConfig;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,4 +173,15 @@ public static OzoneConfiguration removeSelfId(
}
return conf;
}

/**
* Get SCM Node Id list.
* @param configuration
* @return list of node ids.
*/
public static Collection<String> getSCMNodeIds(
ConfigurationSource configuration) {
String scmServiceId = getScmServiceId(configuration);
return getSCMNodeIds(configuration, scmServiceId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public enum ErrorCode {
DEFAULT,
MISSING_BLOCK_TOKEN,
BLOCK_TOKEN_VERIFICATION_FAILED,
GET_ROOT_CA_CERT_FAILED
GET_ROOT_CA_CERT_FAILED,
NOT_A_PRIMARY_SCM
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public final class OzoneConsts {
public static final String SCM_ID = "scmUuid";
public static final String CLUSTER_ID_PREFIX = "CID-";
public static final String SCM_CERT_SERIAL_ID = "scmCertSerialId";
public static final String PRIMARY_SCM_NODE_ID = "primaryScmNodeId";

public static final String OZONE_SIMPLE_ROOT_USER = "root";
public static final String OZONE_SIMPLE_HDFS_USER = "hdfs";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.nio.file.Path;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Enumeration;
Expand All @@ -39,6 +41,8 @@
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_HTTP_SECURITY_ENABLED_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;

import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -120,4 +124,25 @@ public static List<InetAddress> getValidInetsForCurrentHost()

return hostIps;
}

/**
* Convert list of string encoded certificates to list of X509Certificate.
* @param pemEncodedCerts
* @return list of X509Certificate.
* @throws IOException
*/
public static List<X509Certificate> convertToX509(
List<String> pemEncodedCerts) throws IOException {
List<X509Certificate> x509Certificates =
new ArrayList<>(pemEncodedCerts.size());
for (String cert : pemEncodedCerts) {
try {
x509Certificates.add(CertificateCodec.getX509Certificate(cert));
} catch (CertificateException ex) {
LOG.error("Error while converting to X509 format", ex);
throw new IOException(ex);
}
}
return x509Certificates;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,12 @@ private void getSCMSignedCert(OzoneConfiguration config) {
dnCertClient.storeCertificate(pemEncodedCert, true);
dnCertClient.storeCertificate(response.getX509CACertificate(), true,
true);

// Store Root CA certificate.
if (response.hasX509RootCACertificate()) {
dnCertClient.storeRootCACertificate(
response.getX509RootCACertificate(), true);
}
String dnCertSerialId = getX509Certificate(pemEncodedCert).
getSerialNumber().toString();
datanodeDetails.setCertSerialId(dnCertSerialId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -59,6 +60,7 @@
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.hdds.utils.HAUtils;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
Expand Down Expand Up @@ -461,19 +463,21 @@ public static XceiverServerRatis newXceiverServerRatis(
// DN Ratis server act as both SSL client and server and we must pass TLS
// configuration for both.
private static Parameters createTlsParameters(SecurityConfig conf,
CertificateClient caClient) {
CertificateClient caClient) throws IOException {
Parameters parameters = new Parameters();

if (conf.isSecurityEnabled() && conf.isGrpcTlsEnabled()) {
List<X509Certificate> caList = HAUtils.buildCAX509List(caClient,
conf.getConfiguration());
GrpcTlsConfig serverConfig = new GrpcTlsConfig(
caClient.getPrivateKey(), caClient.getCertificate(),
caClient.getCACertificate(), true);
caList, true);
GrpcConfigKeys.Server.setTlsConf(parameters, serverConfig);
GrpcConfigKeys.Admin.setTlsConf(parameters, serverConfig);

GrpcTlsConfig clientConfig = new GrpcTlsConfig(
caClient.getPrivateKey(), caClient.getCertificate(),
caClient.getCACertificate(), false);
caList, false);
GrpcConfigKeys.Client.setTlsConf(parameters, clientConfig);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.ozone.container.ozoneimpl;

import java.io.IOException;
import java.security.cert.X509Certificate;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
Expand All @@ -40,6 +41,7 @@
import org.apache.hadoop.hdds.security.token.BlockTokenVerifier;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.utils.HAUtils;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
Expand Down Expand Up @@ -167,8 +169,14 @@ public OzoneContainer(
blockDeletingService =
new BlockDeletingService(this, svcInterval.toMillis(), serviceTimeout,
TimeUnit.MILLISECONDS, config);
tlsClientConfig = RatisHelper.createTlsClientConfig(
secConf, certClient != null ? certClient.getCACertificate() : null);

List< X509Certificate > x509Certificates = null;
if (certClient != null) {
x509Certificates = HAUtils.buildCAX509List(certClient, conf);
}

tlsClientConfig = RatisHelper.createTlsClientConfig(secConf,
x509Certificates);

isStarted = new AtomicBoolean(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,12 +265,16 @@ public SCMGetCertResponseProto getDataNodeCertificateChain(
*/
@Override
public String getCACertificate() throws IOException {
return getCACert().getX509Certificate();
}


public SCMGetCertResponseProto getCACert() throws IOException {
SCMGetCACertificateRequestProto protoIns = SCMGetCACertificateRequestProto
.getDefaultInstance();
return submitRequest(Type.GetCACertificate,
builder -> builder.setGetCACertificateRequest(protoIns))
.getGetCertResponseProto().getX509Certificate();

.getGetCertResponseProto();
}

/**
Expand Down Expand Up @@ -301,7 +305,7 @@ public List<String> listCertificate(HddsProtos.NodeType role,
public String getRootCACertificate() throws IOException {
SCMGetCACertificateRequestProto protoIns = SCMGetCACertificateRequestProto
.getDefaultInstance();
return submitRequest(Type.GetCACertificate,
return submitRequest(Type.GetRootCACertificate,
builder -> builder.setGetCACertificateRequest(protoIns))
.getGetCertResponseProto().getX509RootCACertificate();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
import org.apache.hadoop.io.retry.FailoverProxyProvider;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
Expand Down Expand Up @@ -70,11 +71,20 @@ public class SCMBlockLocationFailoverProxyProvider implements
private final int maxRetryCount;
private final long retryInterval;

private final UserGroupInformation ugi;


public SCMBlockLocationFailoverProxyProvider(ConfigurationSource conf) {
this.conf = conf;
this.scmVersion = RPC.getProtocolVersion(ScmBlockLocationProtocolPB.class);

try {
this.ugi = UserGroupInformation.getCurrentUser();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why need change this file ? The logic is unchanged.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to make use of UGI during the creation of FailOverProxyProvider, otherwise, we will use UGI during create proxy time which might not be correct one.
Without this change, we shall see this error.

om1_1        | 2021-03-23 05:59:25,420 [IPC Server handler 7 on default port 9862] WARN ipc.Client: Exception encountered while connecting to the server
om1_1        | javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]
om1_1        | 	at jdk.security.jgss/com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:211)
om1_1        | 	at org.apache.hadoop.security.SaslRpcClient.saslConnect(SaslRpcClient.java:408)
om1_1        | 	at org.apache.hadoop.ipc.Client$Connection.setupSaslConnection(Client.java:622)
om1_1        | 	at org.apache.hadoop.ipc.Client$Connection.access$2300(Client.java:413)
om1_1        | 	at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:822)
om1_1        | 	at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:818)
om1_1        | 	at java.base/java.security.AccessController.doPrivileged(Native Method)
om1_1        | 	at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
om1_1        | 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762)
om1_1        | 	at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:818)
om1_1        | 	at org.apache.hadoop.ipc.Client$Connection.access$3800(Client.java:413)
om1_1        | 	at org.apache.hadoop.ipc.Client.getConnection(Client.java:1636)
om1_1        | 	at org.apache.hadoop.ipc.Client.call(Client.java:1452)
om1_1        | 	at org.apache.hadoop.ipc.Client.call(Client.java:1405)
om1_1        | 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233)
om1_1        | 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:118)
om1_1        | 	at com.sun.proxy.$Proxy32.send(Unknown Source)
om1_1        | 	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
om1_1        | 	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
om1_1        | 	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
om1_1        | 	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
om1_1        | 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
om1_1        | 	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
om1_1        | 	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
om1_1        | 	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
om1_1        | 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
om1_1        | 	at com.sun.proxy.$Proxy32.send(Unknown Source)
om1_1        | 	at org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolClientSideTranslatorPB.submitRequest(ScmBlockLocationProtocolClientSideTranslatorPB.java:118)
om1_1        | 	at org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolClientSideTranslatorPB.allocateBlock(ScmBlockLocationProtocolClientSideTranslatorPB.java:172)
om1_1        | 	at org.apache.hadoop.ozone.om.request.key.OMKeyRequest.allocateBlock(OMKeyRequest.java:128)
om1_1        | 	at org.apache.hadoop.ozone.om.request.key.OMKeyCreateRequest.preExecute(OMKeyCreateRequest.java:151)
om1_1        | 	at org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB.processRequest(OzoneManagerProtocolServerSideTranslatorPB.java:139)
om1_1        | 	at org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher.processRequest(OzoneProtocolMessageDispatcher.java:87)
om1_1        | 	at org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB.submitRequest(OzoneManagerProtocolServerSideTranslatorPB.java:122)
om1_1        | 	at org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos$OzoneManagerService$2.callBlockingMethod(OzoneManagerProtocolProtos.java)
om1_1        | 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:528)
om1_1        | 	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1086)
om1_1        | 	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1029)
om1_1        | 	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:957)
om1_1        | 	at java.base/java.security.AccessController.doPrivileged(Native Method)
om1_1        | 	at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
om1_1        | 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762)
om1_1        | 	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2957)
om1_1        | Caused by: GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)
om1_1        | 	at java.security.jgss/sun.security.jgss.krb5.Krb5InitCredential.getInstance(Krb5InitCredential.java:162)
om1_1        | 	at java.security.jgss/sun.security.jgss.krb5.Krb5MechFactory.getCredentialElement(Krb5MechFactory.java:126)
om1_1        | 	at java.security.jgss/sun.security.jgss.krb5.Krb5MechFactory.getMechanismContext(Krb5MechFactory.java:193)
om1_1        | 	at java.security.jgss/sun.security.jgss.GSSManagerImpl.getMechanismContext(GSSManagerImpl.java:218)
om1_1        | 	at java.security.jgss/sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:230)
om1_1        | 	at java.security.jgss/sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:196)
om1_1        | 	at jdk.security.jgss/com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:192)
om1_1        | 	... 42 more

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you enable the Debug log of UGI class to see which UGI is used if we don't cache the UGI at the time of provider creation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarification!

} catch (IOException ex) {
LOG.error("Unable to fetch user credentials from UGI", ex);
throw new RuntimeException(ex);
}

// Set some constant for non-HA.
if (scmServiceId == null) {
scmServiceId = SCM_DUMMY_SERVICE_ID;
Expand Down Expand Up @@ -230,10 +240,16 @@ private ScmBlockLocationProtocolPB createSCMProxy(
LegacyHadoopConfigurationSource.asHadoopConfiguration(conf);
RPC.setProtocolEngine(hadoopConf, ScmBlockLocationProtocolPB.class,
ProtobufRpcEngine.class);
return RPC.getProxy(ScmBlockLocationProtocolPB.class, scmVersion,
scmAddress, UserGroupInformation.getCurrentUser(), hadoopConf,
// FailoverOnNetworkException ensures that the IPC layer does not attempt
// retries on the same OM in case of connection exception. This retry
// policy essentially results in TRY_ONCE_THEN_FAIL.
RetryPolicy connectionRetryPolicy = RetryPolicies
.failoverOnNetworkException(0);
return RPC.getProtocolProxy(ScmBlockLocationProtocolPB.class, scmVersion,
scmAddress, ugi, hadoopConf,
NetUtils.getDefaultSocketFactory(hadoopConf),
(int)conf.getObject(SCMClientConfig.class).getRpcTimeOut());
(int)conf.getObject(SCMClientConfig.class).getRpcTimeOut(),
connectionRetryPolicy).getProxy();
}

public RetryPolicy getSCMBlockLocationRetryPolicy(String newLeader) {
Expand Down
Loading