Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,24 @@ The following includes ThreadPool related parameters, which can be passed in thr
- whether to enable `Compare And Set`, set true if using `RawKVClient.compareAndSet` or `RawKVClient.putIfAbsent`
- default: false

### TLS

#### tikv.tls_enable
- whether to enable TLS
- default: false

#### tikv.trust_cert_collection
- Trusted certificates for verifying the remote endpoint's certificate, e.g. /home/tidb/ca.pem. The file should contain an X.509 certificate collection in PEM format.
- default: null

#### tikv.key_cert_chain
- an X.509 certificate chain file in PEM format, e.g. /home/tidb/client.pem.
- default: null

#### tikv.key_file
- a PKCS#8 private key file in PEM format. e.g. /home/tidb/client-key.pem.
- default: null

## Metrics

Client Java supports exporting metrics to Prometheus using poll mode and viewing on Grafana. The following steps shows how to enable this function.
Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
<log4j.version>1.2.17</log4j.version>
<slf4j.version>1.7.16</slf4j.version>
<grpc.version>1.24.0</grpc.version>
<netty.tcnative.version>2.0.25.Final</netty.tcnative.version>
<powermock.version>1.6.6</powermock.version>
<jackson.version>2.12.3</jackson.version>
<trove4j.version>3.0.1</trove4j.version>
Expand Down Expand Up @@ -133,6 +134,11 @@
<artifactId>grpc-services</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative-boringssl-static</artifactId>
<version>${netty.tcnative.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-testing</artifactId>
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/org/tikv/common/ConfigUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ public class ConfigUtils {

public static final String TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS = "tikv.rawkv.default_backoff_in_ms";

public static final String TIKV_TLS_ENABLE = "tikv.tls_enable";
public static final String TIKV_TRUST_CERT_COLLECTION = "tikv.trust_cert_collection";
public static final String TIKV_KEY_CERT_CHAIN = "tikv.key_cert_chain";
public static final String TIKV_KEY_FILE = "tikv.key_file";

public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379";
public static final String DEF_TIMEOUT = "200ms";
public static final String DEF_TIKV_GRPC_INGEST_TIMEOUT = "200s";
Expand Down Expand Up @@ -125,4 +130,5 @@ public class ConfigUtils {

public static final int DEF_TIKV_GRPC_KEEPALIVE_TIME = 10;
public static final int DEF_TIKV_GRPC_KEEPALIVE_TIMEOUT = 3;
public static final boolean DEF_TIKV_TLS_ENABLE = false;
}
38 changes: 38 additions & 0 deletions src/main/java/org/tikv/common/TiConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ private static void loadFromDefaultProperties() {
setIfMissing(TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS, DEF_TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS);
setIfMissing(TIKV_GRPC_KEEPALIVE_TIME, DEF_TIKV_GRPC_KEEPALIVE_TIME);
setIfMissing(TIKV_GRPC_KEEPALIVE_TIMEOUT, DEF_TIKV_GRPC_KEEPALIVE_TIMEOUT);
setIfMissing(TIKV_TLS_ENABLE, DEF_TIKV_TLS_ENABLE);
}

public static void listAll() {
Expand Down Expand Up @@ -291,6 +292,11 @@ private static ReplicaRead getReplicaRead(String key) {

private int rawKVDefaultBackoffInMS = getInt(TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS);

private boolean tlsEnable = getBoolean(TIKV_TLS_ENABLE);
private String trustCertCollectionFile = getOption(TIKV_TRUST_CERT_COLLECTION).orElse(null);
private String keyCertChainFile = getOption(TIKV_KEY_CERT_CHAIN).orElse(null);
private String keyFile = getOption(TIKV_KEY_FILE).orElse(null);

private boolean isTest = false;

private int keepaliveTime = getInt(TIKV_GRPC_KEEPALIVE_TIME);
Expand Down Expand Up @@ -689,4 +695,36 @@ public int getKeepaliveTimeout() {
public void setKeepaliveTimeout(int timeout) {
this.keepaliveTimeout = timeout;
}

public boolean isTlsEnable() {
return tlsEnable;
}

public void setTlsEnable(boolean tlsEnable) {
this.tlsEnable = tlsEnable;
}

public String getTrustCertCollectionFile() {
return trustCertCollectionFile;
}

public void setTrustCertCollectionFile(String trustCertCollectionFile) {
this.trustCertCollectionFile = trustCertCollectionFile;
}

public String getKeyCertChainFile() {
return keyCertChainFile;
}

public void setKeyCertChainFile(String keyCertChainFile) {
this.keyCertChainFile = keyCertChainFile;
}

public String getKeyFile() {
return keyFile;
}

public void setKeyFile(String keyFile) {
this.keyFile = keyFile;
}
}
13 changes: 11 additions & 2 deletions src/main/java/org/tikv/common/TiSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,17 @@ public class TiSession implements AutoCloseable {
public TiSession(TiConfiguration conf) {
this.conf = conf;
this.channelFactory =
new ChannelFactory(
conf.getMaxFrameSize(), conf.getKeepaliveTime(), conf.getKeepaliveTimeout());
conf.isTlsEnable()
? new ChannelFactory(
conf.getMaxFrameSize(),
conf.getKeepaliveTime(),
conf.getKeepaliveTimeout(),
conf.getTrustCertCollectionFile(),
conf.getKeyCertChainFile(),
conf.getKeyFile())
: new ChannelFactory(
conf.getMaxFrameSize(), conf.getKeepaliveTime(), conf.getKeepaliveTimeout());

this.client = PDClient.createRaw(conf, channelFactory);
this.enableGrpcForward = conf.getEnableGrpcForward();
this.metricsServer = MetricsServer.getInstance(conf);
Expand Down
68 changes: 59 additions & 9 deletions src/main/java/org/tikv/common/util/ChannelFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,60 @@
package org.tikv.common.util;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NettyChannelBuilder;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import java.io.File;
import java.net.URI;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.HostMapping;
import org.tikv.common.pd.PDUtils;

public class ChannelFactory implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(ChannelFactory.class);

private final int maxFrameSize;
private final int keepaliveTime;
private final int keepaliveTimeout;
private final ConcurrentHashMap<String, ManagedChannel> connPool = new ConcurrentHashMap<>();
private final SslContextBuilder sslContextBuilder;

public ChannelFactory(int maxFrameSize, int keepaliveTime, int keepaliveTimeout) {
this.maxFrameSize = maxFrameSize;
this.keepaliveTime = keepaliveTime;
this.keepaliveTimeout = keepaliveTimeout;
this.sslContextBuilder = null;
}

public ChannelFactory(
int maxFrameSize,
int keepaliveTime,
int keepaliveTimeout,
String trustCertCollectionFilePath,
String keyCertChainFilePath,
String keyFilePath) {
this.maxFrameSize = maxFrameSize;
this.keepaliveTime = keepaliveTime;
this.keepaliveTimeout = keepaliveTimeout;
this.sslContextBuilder =
getSslContextBuilder(trustCertCollectionFilePath, keyCertChainFilePath, keyFilePath);
}

private SslContextBuilder getSslContextBuilder(
String trustCertCollectionFilePath, String keyCertChainFilePath, String keyFilePath) {
SslContextBuilder builder = GrpcSslContexts.forClient();
if (trustCertCollectionFilePath != null) {
builder.trustManager(new File(trustCertCollectionFilePath));
}
if (keyCertChainFilePath != null && keyFilePath != null) {
builder.keyManager(new File(keyCertChainFilePath), new File(keyFilePath));
}
return builder;
}

public ManagedChannel getChannel(String addressStr, HostMapping hostMapping) {
Expand All @@ -51,16 +88,29 @@ public ManagedChannel getChannel(String addressStr, HostMapping hostMapping) {
} catch (Exception e) {
throw new IllegalArgumentException("failed to get mapped address " + address, e);
}

// Channel should be lazy without actual connection until first call
// So a coarse grain lock is ok here
return ManagedChannelBuilder.forAddress(mappedAddr.getHost(), mappedAddr.getPort())
.maxInboundMessageSize(maxFrameSize)
.keepAliveTime(keepaliveTime, TimeUnit.SECONDS)
.keepAliveTimeout(keepaliveTimeout, TimeUnit.SECONDS)
.keepAliveWithoutCalls(true)
.usePlaintext(true)
.idleTimeout(60, TimeUnit.SECONDS)
.build();
NettyChannelBuilder builder =
NettyChannelBuilder.forAddress(mappedAddr.getHost(), mappedAddr.getPort())
.maxInboundMessageSize(maxFrameSize)
.keepAliveTime(keepaliveTime, TimeUnit.SECONDS)
.keepAliveTimeout(keepaliveTimeout, TimeUnit.SECONDS)
.keepAliveWithoutCalls(true)
.idleTimeout(60, TimeUnit.SECONDS);

if (sslContextBuilder == null) {
return builder.usePlaintext(true).build();
} else {
SslContext sslContext = null;
try {
sslContext = sslContextBuilder.build();
} catch (SSLException e) {
logger.error("create ssl context failed!", e);
return null;
}
return builder.sslContext(sslContext).build();
}
});
}

Expand Down