diff --git a/.gitignore b/.gitignore index 3d372b4332e..0dbb05e1b1c 100644 --- a/.gitignore +++ b/.gitignore @@ -71,3 +71,6 @@ out/ # # Work around https://youtrack.jetbrains.com/issue/IDEA-116898 # gradle/wrapper/gradle-wrapper.properties + +# vscode +.settings \ No newline at end of file diff --git a/src/main/java/org/tikv/common/ConfigUtils.java b/src/main/java/org/tikv/common/ConfigUtils.java index a4963dc863f..de50e7c8702 100644 --- a/src/main/java/org/tikv/common/ConfigUtils.java +++ b/src/main/java/org/tikv/common/ConfigUtils.java @@ -28,6 +28,8 @@ public class ConfigUtils { public static final String TIKV_GRPC_SCAN_TIMEOUT = "tikv.grpc.scan_timeout_in_ms"; public static final String TIKV_GRPC_SCAN_BATCH_SIZE = "tikv.grpc.scan_batch_size"; public static final String TIKV_GRPC_MAX_FRAME_SIZE = "tikv.grpc.max_frame_size"; + public static final String TIKV_GRPC_KEEPALIVE_TIME = "tikv.grpc.keepalive_time"; + public static final String TIKV_GRPC_KEEPALIVE_TIMEOUT = "tikv.grpc.keepalive_timeout"; public static final String TIKV_INDEX_SCAN_BATCH_SIZE = "tikv.index.scan_batch_size"; public static final String TIKV_INDEX_SCAN_CONCURRENCY = "tikv.index.scan_concurrency"; @@ -120,4 +122,7 @@ public class ConfigUtils { public static final String LEADER = "LEADER"; public static final String FOLLOWER = "FOLLOWER"; public static final String LEADER_AND_FOLLOWER = "LEADER_AND_FOLLOWER"; + + public static final int DEF_TIKV_GRPC_KEEPALIVE_TIME = 10; + public static final int DEF_TIKV_GRPC_KEEPALIVE_TIMEOUT = 3; } diff --git a/src/main/java/org/tikv/common/TiConfiguration.java b/src/main/java/org/tikv/common/TiConfiguration.java index f1f02666c3e..15b814dc36b 100644 --- a/src/main/java/org/tikv/common/TiConfiguration.java +++ b/src/main/java/org/tikv/common/TiConfiguration.java @@ -88,6 +88,8 @@ private static void loadFromDefaultProperties() { setIfMissing(TIKV_IMPORTER_MAX_KV_BATCH_SIZE, DEF_TIKV_IMPORTER_MAX_KV_BATCH_SIZE); setIfMissing(TIKV_SCATTER_WAIT_SECONDS, DEF_TIKV_SCATTER_WAIT_SECONDS); setIfMissing(TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS, DEF_TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS); + setIfMissing(TIKV_GRPC_KEEPALIVE_TIME, DEF_TIKV_GRPC_KEEPALIVE_TIME); + setIfMissing(TIKV_GRPC_KEEPALIVE_TIMEOUT, DEF_TIKV_GRPC_KEEPALIVE_TIMEOUT); } public static void listAll() { @@ -291,6 +293,9 @@ private static ReplicaRead getReplicaRead(String key) { private boolean isTest = false; + private int keepaliveTime = getInt(TIKV_GRPC_KEEPALIVE_TIME); + private int keepaliveTimeout = getInt(TIKV_GRPC_KEEPALIVE_TIMEOUT); + public enum KVMode { TXN, RAW @@ -668,4 +673,20 @@ public boolean isTest() { public void setTest(boolean test) { isTest = test; } + + public int getKeepaliveTime() { + return keepaliveTime; + } + + public void setKeepaliveTime(int keepaliveTime) { + this.keepaliveTime = keepaliveTime; + } + + public int getKeepaliveTimeout() { + return keepaliveTimeout; + } + + public void setKeepaliveTimeout(int timeout) { + this.keepaliveTimeout = timeout; + } } diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index 12a6cefabd6..72000d33673 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -78,7 +78,9 @@ public class TiSession implements AutoCloseable { public TiSession(TiConfiguration conf) { this.conf = conf; - this.channelFactory = new ChannelFactory(conf.getMaxFrameSize()); + this.channelFactory = + new ChannelFactory( + conf.getMaxFrameSize(), conf.getKeepaliveTime(), conf.getKeepaliveTimeout()); this.client = PDClient.createRaw(conf, channelFactory); this.enableGrpcForward = conf.getEnableGrpcForward(); this.metricsServer = MetricsServer.getInstance(conf); diff --git a/src/main/java/org/tikv/common/util/ChannelFactory.java b/src/main/java/org/tikv/common/util/ChannelFactory.java index 9435e9d69d7..5433ad5acd3 100644 --- a/src/main/java/org/tikv/common/util/ChannelFactory.java +++ b/src/main/java/org/tikv/common/util/ChannelFactory.java @@ -25,10 +25,14 @@ public class ChannelFactory implements AutoCloseable { private final int maxFrameSize; + private final int keepaliveTime; + private final int keepaliveTimeout; private final ConcurrentHashMap connPool = new ConcurrentHashMap<>(); - public ChannelFactory(int maxFrameSize) { + public ChannelFactory(int maxFrameSize, int keepaliveTime, int keepaliveTimeout) { this.maxFrameSize = maxFrameSize; + this.keepaliveTime = keepaliveTime; + this.keepaliveTimeout = keepaliveTimeout; } public ManagedChannel getChannel(String addressStr, HostMapping hostMapping) { @@ -51,6 +55,9 @@ public ManagedChannel getChannel(String addressStr, HostMapping hostMapping) { // So a coarse grain lock is ok here return ManagedChannelBuilder.forAddress(mappedAddr.getHost(), mappedAddr.getPort()) .maxInboundMessageSize(maxFrameSize) + .keepAliveTime(keepaliveTime, TimeUnit.SECONDS) + .keepAliveTimeout(keepaliveTimeout, TimeUnit.SECONDS) + .keepAliveWithoutCalls(true) .usePlaintext(true) .idleTimeout(60, TimeUnit.SECONDS) .build();