diff --git a/src/main/java/org/tikv/common/ConfigUtils.java b/src/main/java/org/tikv/common/ConfigUtils.java
index 8f1c3e9d361..940fe646f8a 100644
--- a/src/main/java/org/tikv/common/ConfigUtils.java
+++ b/src/main/java/org/tikv/common/ConfigUtils.java
@@ -92,6 +92,18 @@ public class ConfigUtils {
public static final String TIKV_KEY_CERT_CHAIN = "tikv.key_cert_chain";
public static final String TIKV_KEY_FILE = "tikv.key_file";
+ public static final String TiKV_CIRCUIT_BREAK_ENABLE = "tikv.circuit_break.enable";
+ public static final String TiKV_CIRCUIT_BREAK_AVAILABILITY_WINDOW_IN_SECONDS =
+ "tikv.circuit_break.trigger.availability.window_in_seconds";
+ public static final String TiKV_CIRCUIT_BREAK_AVAILABILITY_ERROR_THRESHOLD_PERCENTAGE =
+ "tikv.circuit_break.trigger.availability.error_threshold_percentage";
+ public static final String TiKV_CIRCUIT_BREAK_AVAILABILITY_REQUEST_VOLUMN_THRESHOLD =
+ "tikv.circuit_break.trigger.availability.request_volumn_threshold";
+ public static final String TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS =
+ "tikv.circuit_break.trigger.sleep_window_in_seconds";
+ public static final String TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT =
+ "tikv.circuit_break.trigger.attempt_request_count";
+
public static final String TIFLASH_ENABLE = "tiflash.enable";
public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379";
public static final String DEF_TIMEOUT = "200ms";
@@ -161,4 +173,11 @@ public class ConfigUtils {
public static final int DEF_TIKV_GRPC_KEEPALIVE_TIMEOUT = 3;
public static final boolean DEF_TIKV_TLS_ENABLE = false;
public static final boolean DEF_TIFLASH_ENABLE = false;
+
+ public static final boolean DEF_TiKV_CIRCUIT_BREAK_ENABLE = false;
+ public static final int DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_WINDOW_IN_SECONDS = 60;
+ public static final int DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_ERROR_THRESHOLD_PERCENTAGE = 100;
+ public static final int DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_REQUST_VOLUMN_THRESHOLD = 10;
+ public static final int DEF_TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS = 20;
+ public static final int DEF_TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT = 10;
}
diff --git a/src/main/java/org/tikv/common/TiConfiguration.java b/src/main/java/org/tikv/common/TiConfiguration.java
index aad8df6d956..4cdb48a777f 100644
--- a/src/main/java/org/tikv/common/TiConfiguration.java
+++ b/src/main/java/org/tikv/common/TiConfiguration.java
@@ -130,6 +130,20 @@ private static void loadFromDefaultProperties() {
setIfMissing(TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS);
setIfMissing(TIKV_BO_REGION_MISS_BASE_IN_MS, DEF_TIKV_BO_REGION_MISS_BASE_IN_MS);
setIfMissing(TIKV_RAWKV_SCAN_SLOWLOG_IN_MS, DEF_TIKV_RAWKV_SCAN_SLOWLOG_IN_MS);
+ setIfMissing(TiKV_CIRCUIT_BREAK_ENABLE, DEF_TiKV_CIRCUIT_BREAK_ENABLE);
+ setIfMissing(
+ TiKV_CIRCUIT_BREAK_AVAILABILITY_WINDOW_IN_SECONDS,
+ DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_WINDOW_IN_SECONDS);
+ setIfMissing(
+ TiKV_CIRCUIT_BREAK_AVAILABILITY_ERROR_THRESHOLD_PERCENTAGE,
+ DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_ERROR_THRESHOLD_PERCENTAGE);
+ setIfMissing(
+ TiKV_CIRCUIT_BREAK_AVAILABILITY_REQUEST_VOLUMN_THRESHOLD,
+ DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_REQUST_VOLUMN_THRESHOLD);
+ setIfMissing(
+ TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS, DEF_TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS);
+ setIfMissing(
+ TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT, DEF_TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT);
}
public static void listAll() {
@@ -360,6 +374,16 @@ private static ReplicaRead getReplicaRead(String key) {
private int keepaliveTime = getInt(TIKV_GRPC_KEEPALIVE_TIME);
private int keepaliveTimeout = getInt(TIKV_GRPC_KEEPALIVE_TIMEOUT);
+ private boolean circuitBreakEnable = getBoolean(TiKV_CIRCUIT_BREAK_ENABLE);
+ private int circuitBreakAvailabilityWindowInSeconds =
+ getInt(TiKV_CIRCUIT_BREAK_AVAILABILITY_WINDOW_IN_SECONDS);
+ private int circuitBreakAvailabilityErrorThresholdPercentage =
+ getInt(TiKV_CIRCUIT_BREAK_AVAILABILITY_ERROR_THRESHOLD_PERCENTAGE);
+ private int circuitBreakAvailabilityRequestVolumnThreshold =
+ getInt(TiKV_CIRCUIT_BREAK_AVAILABILITY_REQUEST_VOLUMN_THRESHOLD);
+ private int circuitBreakSleepWindowInSeconds = getInt(TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS);
+ private int circuitBreakAttemptRequestCount = getInt(TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT);
+
public enum KVMode {
TXN,
RAW
@@ -881,4 +905,57 @@ public int getRawKVScanSlowLogInMS() {
public void setRawKVScanSlowLogInMS(int rawKVScanSlowLogInMS) {
this.rawKVScanSlowLogInMS = rawKVScanSlowLogInMS;
}
+
+ public boolean isCircuitBreakEnable() {
+ return circuitBreakEnable;
+ }
+
+ public void setCircuitBreakEnable(boolean circuitBreakEnable) {
+ this.circuitBreakEnable = circuitBreakEnable;
+ }
+
+ public int getCircuitBreakAvailabilityWindowInSeconds() {
+ return circuitBreakAvailabilityWindowInSeconds;
+ }
+
+ public void setCircuitBreakAvailabilityWindowInSeconds(
+ int circuitBreakAvailabilityWindowInSeconds) {
+ this.circuitBreakAvailabilityWindowInSeconds = circuitBreakAvailabilityWindowInSeconds;
+ }
+
+ public int getCircuitBreakAvailabilityErrorThresholdPercentage() {
+ return circuitBreakAvailabilityErrorThresholdPercentage;
+ }
+
+ public void setCircuitBreakAvailabilityErrorThresholdPercentage(
+ int circuitBreakAvailabilityErrorThresholdPercentage) {
+ this.circuitBreakAvailabilityErrorThresholdPercentage =
+ circuitBreakAvailabilityErrorThresholdPercentage;
+ }
+
+ public int getCircuitBreakAvailabilityRequestVolumnThreshold() {
+ return circuitBreakAvailabilityRequestVolumnThreshold;
+ }
+
+ public void setCircuitBreakAvailabilityRequestVolumnThreshold(
+ int circuitBreakAvailabilityRequestVolumnThreshold) {
+ this.circuitBreakAvailabilityRequestVolumnThreshold =
+ circuitBreakAvailabilityRequestVolumnThreshold;
+ }
+
+ public int getCircuitBreakSleepWindowInSeconds() {
+ return circuitBreakSleepWindowInSeconds;
+ }
+
+ public void setCircuitBreakSleepWindowInSeconds(int circuitBreakSleepWindowInSeconds) {
+ this.circuitBreakSleepWindowInSeconds = circuitBreakSleepWindowInSeconds;
+ }
+
+ public int getCircuitBreakAttemptRequestCount() {
+ return circuitBreakAttemptRequestCount;
+ }
+
+ public void setCircuitBreakAttemptRequestCount(int circuitBreakAttemptRequestCount) {
+ this.circuitBreakAttemptRequestCount = circuitBreakAttemptRequestCount;
+ }
}
diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java
index f3f10c9ccef..d1171dbcf4f 100644
--- a/src/main/java/org/tikv/common/TiSession.java
+++ b/src/main/java/org/tikv/common/TiSession.java
@@ -43,6 +43,7 @@
import org.tikv.kvproto.ImportSstpb;
import org.tikv.kvproto.Metapb;
import org.tikv.raw.RawKVClient;
+import org.tikv.raw.SmartRawKVClient;
import org.tikv.txn.KVClient;
import org.tikv.txn.TxnKVClient;
@@ -126,6 +127,11 @@ public RawKVClient createRawClient() {
return new RawKVClient(this, this.getRegionStoreClientBuilder());
}
+ public SmartRawKVClient createSmartRawClient() {
+ RawKVClient rawKVClient = createRawClient();
+ return new SmartRawKVClient(rawKVClient, getConf());
+ }
+
public KVClient createKVClient() {
checkIsClosed();
diff --git a/src/main/java/org/tikv/common/exception/CircuitBreakerOpenException.java b/src/main/java/org/tikv/common/exception/CircuitBreakerOpenException.java
new file mode 100644
index 00000000000..08982575644
--- /dev/null
+++ b/src/main/java/org/tikv/common/exception/CircuitBreakerOpenException.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2018 PingCAP, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.tikv.common.exception;
+
+public class CircuitBreakerOpenException extends RuntimeException {
+ public CircuitBreakerOpenException() {
+ super("Circuit Breaker Opened");
+ }
+}
diff --git a/src/main/java/org/tikv/raw/RawKVClient.java b/src/main/java/org/tikv/raw/RawKVClient.java
index 7ffd1ed0514..c0e507c6b5f 100644
--- a/src/main/java/org/tikv/raw/RawKVClient.java
+++ b/src/main/java/org/tikv/raw/RawKVClient.java
@@ -44,7 +44,7 @@
import org.tikv.common.util.*;
import org.tikv.kvproto.Kvrpcpb.KvPair;
-public class RawKVClient implements AutoCloseable {
+public class RawKVClient implements RawKVClientBase {
private final TiSession tiSession;
private final RegionStoreClientBuilder clientBuilder;
private final TiConfiguration conf;
@@ -56,15 +56,6 @@ public class RawKVClient implements AutoCloseable {
private final ExecutorService deleteRangeThreadPool;
private static final Logger logger = LoggerFactory.getLogger(RawKVClient.class);
- // https://www.github.com/pingcap/tidb/blob/master/store/tikv/rawkv.go
- private static final int MAX_RAW_SCAN_LIMIT = 10240;
- private static final int MAX_RAW_BATCH_LIMIT = 1024;
- private static final int RAW_BATCH_PUT_SIZE = 1024 * 1024; // 1 MB
- private static final int RAW_BATCH_GET_SIZE = 16 * 1024; // 16 K
- private static final int RAW_BATCH_DELETE_SIZE = 16 * 1024; // 16 K
- private static final int RAW_BATCH_SCAN_SIZE = 16;
- private static final int RAW_BATCH_PAIR_COUNT = 512;
-
public static final Histogram RAW_REQUEST_LATENCY =
Histogram.build()
.name("client_java_raw_requests_latency")
@@ -106,23 +97,12 @@ public RawKVClient(TiSession session, RegionStoreClientBuilder clientBuilder) {
@Override
public void close() {}
- /**
- * Put a raw key-value pair to TiKV
- *
- * @param key raw key
- * @param value raw value
- */
+ @Override
public void put(ByteString key, ByteString value) {
put(key, value, 0);
}
- /**
- * Put a raw key-value pair to TiKV
- *
- * @param key raw key
- * @param value raw value
- * @param ttl the ttl of the key (in seconds), 0 means the key will never be outdated
- */
+ @Override
public void put(ByteString key, ByteString value, long ttl) {
String label = "client_raw_put";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
@@ -159,31 +139,12 @@ public void put(ByteString key, ByteString value, long ttl) {
}
}
- /**
- * Put a key-value pair if it does not exist. This API is atomic.
- *
- *
To use this API, please enable `tikv.enable_atomic_for_cas`.
- *
- * @param key key
- * @param value value
- * @return a ByteString. returns Optional.EMPTY if the value is written successfully. returns the
- * previous key if the value already exists, and does not write to TiKV.
- */
+ @Override
public Optional putIfAbsent(ByteString key, ByteString value) {
return putIfAbsent(key, value, 0L);
}
- /**
- * Put a key-value pair with TTL if it does not exist. This API is atomic.
- *
- * To use this API, please enable `tikv.enable_atomic_for_cas`.
- *
- * @param key key
- * @param value value
- * @param ttl TTL of key (in seconds), 0 means the key will never be outdated.
- * @return a ByteString. returns Optional.EMPTY if the value is written successfully. returns the
- * previous key if the value already exists, and does not write to TiKV.
- */
+ @Override
public Optional putIfAbsent(ByteString key, ByteString value, long ttl) {
try {
compareAndSet(key, Optional.empty(), value, ttl);
@@ -193,28 +154,13 @@ public Optional putIfAbsent(ByteString key, ByteString value, long t
}
}
- /**
- * Put a key-value pair if the prevValue matched the value in TiKV. This API is atomic.
- *
- * To use this API, please enable `tikv.enable_atomic_for_cas`.
- *
- * @param key key
- * @param value value
- */
+ @Override
public void compareAndSet(ByteString key, Optional prevValue, ByteString value)
throws RawCASConflictException {
compareAndSet(key, prevValue, value, 0L);
}
- /**
- * pair if the prevValue matched the value in TiKV. This API is atomic.
- *
- * To use this API, please enable `tikv.enable_atomic_for_cas`.
- *
- * @param key key
- * @param value value
- * @param ttl TTL of key (in seconds), 0 means the key will never be outdated.
- */
+ @Override
public void compareAndSet(
ByteString key, Optional prevValue, ByteString value, long ttl)
throws RawCASConflictException {
@@ -258,21 +204,12 @@ public void compareAndSet(
}
}
- /**
- * Put a set of raw key-value pair to TiKV.
- *
- * @param kvPairs kvPairs
- */
+ @Override
public void batchPut(Map kvPairs) {
batchPut(kvPairs, 0);
}
- /**
- * Put a set of raw key-value pair to TiKV.
- *
- * @param kvPairs kvPairs
- * @param ttl the TTL of keys to be put (in seconds), 0 means the keys will never be outdated
- */
+ @Override
public void batchPut(Map kvPairs, long ttl) {
String label = "client_raw_batch_put";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
@@ -301,12 +238,7 @@ public void batchPut(Map kvPairs, long ttl) {
}
}
- /**
- * Get a raw key-value pair from TiKV if key exists
- *
- * @param key raw key
- * @return a ByteString value if key exists, ByteString.EMPTY if key does not exist
- */
+ @Override
public Optional get(ByteString key) {
String label = "client_raw_get";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
@@ -344,12 +276,7 @@ public Optional get(ByteString key) {
}
}
- /**
- * Get a list of raw key-value pair from TiKV if key exists
- *
- * @param keys list of raw key
- * @return a ByteString value if key exists, ByteString.EMPTY if key does not exist
- */
+ @Override
public List batchGet(List keys) {
String label = "client_raw_batch_get";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
@@ -379,11 +306,7 @@ public List batchGet(List keys) {
}
}
- /**
- * Delete a list of raw key-value pair from TiKV if key exists
- *
- * @param keys list of raw key
- */
+ @Override
public void batchDelete(List keys) {
String label = "client_raw_batch_delete";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
@@ -413,13 +336,7 @@ public void batchDelete(List keys) {
}
}
- /**
- * Get the TTL of a raw key from TiKV if key exists
- *
- * @param key raw key
- * @return a Long indicating the TTL of key ttl is a non-null long value indicating TTL if key
- * exists. - ttl=0 if the key will never be outdated. - ttl=null if the key does not exist
- */
+ @Override
public Optional getKeyTTL(ByteString key) {
String label = "client_raw_get_key_ttl";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
@@ -456,18 +373,7 @@ public Optional getKeyTTL(ByteString key) {
}
}
- /**
- * Create a new `batch scan` request with `keyOnly` option Once resolved this request will result
- * in a set of scanners over the given keys.
- *
- * WARNING: This method is experimental. The `each_limit` parameter does not work as expected.
- * It does not limit the number of results returned of each range, instead it limits the number of
- * results in each region of each range. As a result, you may get more than each_limit key-value
- * pairs for each range. But you should not miss any entries.
- *
- * @param ranges a list of ranges
- * @return a set of scanners for keys over the given keys.
- */
+ @Override
public List> batchScanKeys(
List> ranges, int eachLimit) {
return batchScan(
@@ -487,18 +393,7 @@ public List> batchScanKeys(
.collect(Collectors.toList());
}
- /**
- * Create a new `batch scan` request. Once resolved this request will result in a set of scanners
- * over the given keys.
- *
- * WARNING: This method is experimental. The `each_limit` parameter does not work as expected.
- * It does not limit the number of results returned of each range, instead it limits the number of
- * results in each region of each range. As a result, you may get more than each_limit key-value
- * pairs for each range. But you should not miss any entries.
- *
- * @param ranges a list of `ScanOption` for each range
- * @return a set of scanners over the given keys.
- */
+ @Override
public List> batchScan(List ranges) {
String label = "client_raw_batch_scan";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
@@ -549,27 +444,12 @@ public List> batchScan(List ranges) {
}
}
- /**
- * Scan raw key-value pairs from TiKV in range [startKey, endKey)
- *
- * @param startKey raw start key, inclusive
- * @param endKey raw end key, exclusive
- * @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT}
- * @return list of key-value pairs in range
- */
+ @Override
public List scan(ByteString startKey, ByteString endKey, int limit) {
return scan(startKey, endKey, limit, false);
}
- /**
- * Scan raw key-value pairs from TiKV in range [startKey, endKey)
- *
- * @param startKey raw start key, inclusive
- * @param endKey raw end key, exclusive
- * @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT}
- * @param keyOnly whether to scan in key-only mode
- * @return list of key-value pairs in range
- */
+ @Override
public List scan(ByteString startKey, ByteString endKey, int limit, boolean keyOnly) {
String label = "client_raw_scan";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
@@ -604,48 +484,22 @@ public List scan(ByteString startKey, ByteString endKey, int limit, bool
}
}
- /**
- * Scan raw key-value pairs from TiKV in range [startKey, ♾)
- *
- * @param startKey raw start key, inclusive
- * @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT}
- * @return list of key-value pairs in range
- */
+ @Override
public List scan(ByteString startKey, int limit) {
return scan(startKey, limit, false);
}
- /**
- * Scan raw key-value pairs from TiKV in range [startKey, ♾)
- *
- * @param startKey raw start key, inclusive
- * @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT}
- * @param keyOnly whether to scan in key-only mode
- * @return list of key-value pairs in range
- */
+ @Override
public List scan(ByteString startKey, int limit, boolean keyOnly) {
return scan(startKey, ByteString.EMPTY, limit, keyOnly);
}
- /**
- * Scan all raw key-value pairs from TiKV in range [startKey, endKey)
- *
- * @param startKey raw start key, inclusive
- * @param endKey raw end key, exclusive
- * @return list of key-value pairs in range
- */
+ @Override
public List scan(ByteString startKey, ByteString endKey) {
return scan(startKey, endKey, false);
}
- /**
- * Scan all raw key-value pairs from TiKV in range [startKey, endKey)
- *
- * @param startKey raw start key, inclusive
- * @param endKey raw end key, exclusive
- * @param keyOnly whether to scan in key-only mode
- * @return list of key-value pairs in range
- */
+ @Override
public List scan(ByteString startKey, ByteString endKey, boolean keyOnly) {
String label = "client_raw_scan_without_limit";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
@@ -701,31 +555,22 @@ private List scan(ScanOption scanOption) {
return scan(startKey, endKey, limit, keyOnly);
}
- /**
- * Scan keys with prefix
- *
- * @param prefixKey prefix key
- * @param limit limit of keys retrieved
- * @param keyOnly whether to scan in keyOnly mode
- * @return kvPairs with the specified prefix
- */
+ @Override
public List scanPrefix(ByteString prefixKey, int limit, boolean keyOnly) {
return scan(prefixKey, Key.toRawKey(prefixKey).nextPrefix().toByteString(), limit, keyOnly);
}
+ @Override
public List scanPrefix(ByteString prefixKey) {
return scan(prefixKey, Key.toRawKey(prefixKey).nextPrefix().toByteString());
}
+ @Override
public List scanPrefix(ByteString prefixKey, boolean keyOnly) {
return scan(prefixKey, Key.toRawKey(prefixKey).nextPrefix().toByteString(), keyOnly);
}
- /**
- * Delete a raw key-value pair from TiKV if key exists
- *
- * @param key raw key to be deleted
- */
+ @Override
public void delete(ByteString key) {
String label = "client_raw_delete";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
@@ -762,15 +607,7 @@ public void delete(ByteString key) {
}
}
- /**
- * Delete all raw key-value pairs in range [startKey, endKey) from TiKV
- *
- * Cautious, this API cannot be used concurrently, if multiple clients write keys into this
- * range along with deleteRange API, the result will be undefined.
- *
- * @param startKey raw start key to be deleted
- * @param endKey raw start key to be deleted
- */
+ @Override
public synchronized void deleteRange(ByteString startKey, ByteString endKey) {
String label = "client_raw_delete_range";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
@@ -789,14 +626,7 @@ public synchronized void deleteRange(ByteString startKey, ByteString endKey) {
}
}
- /**
- * Delete all raw key-value pairs with the prefix `key` from TiKV
- *
- *
Cautious, this API cannot be used concurrently, if multiple clients write keys into this
- * range along with deleteRange API, the result will be undefined.
- *
- * @param key prefix of keys to be deleted
- */
+ @Override
public synchronized void deletePrefix(ByteString key) {
ByteString endKey = Key.toRawKey(key).nextPrefix().toByteString();
deleteRange(key, endKey);
diff --git a/src/main/java/org/tikv/raw/RawKVClientBase.java b/src/main/java/org/tikv/raw/RawKVClientBase.java
new file mode 100644
index 00000000000..9c55f0afb40
--- /dev/null
+++ b/src/main/java/org/tikv/raw/RawKVClientBase.java
@@ -0,0 +1,272 @@
+/*
+ * Copyright 2021 PingCAP, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.tikv.raw;
+
+import com.google.protobuf.ByteString;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.tikv.common.util.Pair;
+import org.tikv.common.util.ScanOption;
+import org.tikv.kvproto.Kvrpcpb;
+
+public interface RawKVClientBase extends AutoCloseable {
+ // https://www.github.com/pingcap/tidb/blob/master/store/tikv/rawkv.go
+ int MAX_RAW_SCAN_LIMIT = 10240;
+ int MAX_RAW_BATCH_LIMIT = 1024;
+ int RAW_BATCH_PUT_SIZE = 1024 * 1024;
+ int RAW_BATCH_GET_SIZE = 16 * 1024;
+ int RAW_BATCH_DELETE_SIZE = 16 * 1024;
+
+ /**
+ * Put a raw key-value pair to TiKV
+ *
+ * @param key raw key
+ * @param value raw value
+ */
+ void put(ByteString key, ByteString value);
+
+ /**
+ * Put a raw key-value pair to TiKV
+ *
+ * @param key raw key
+ * @param value raw value
+ * @param ttl the ttl of the key (in seconds), 0 means the key will never be outdated
+ */
+ void put(ByteString key, ByteString value, long ttl);
+
+ /**
+ * Put a key-value pair if it does not exist. This API is atomic.
+ *
+ *
To use this API, please enable `tikv.enable_atomic_for_cas`.
+ *
+ * @param key key
+ * @param value value
+ * @return a ByteString. returns Optional.EMPTY if the value is written successfully. returns the
+ * previous key if the value already exists, and does not write to TiKV.
+ */
+ Optional putIfAbsent(ByteString key, ByteString value);
+
+ /**
+ * Put a key-value pair with TTL if it does not exist. This API is atomic.
+ *
+ * To use this API, please enable `tikv.enable_atomic_for_cas`.
+ *
+ * @param key key
+ * @param value value
+ * @param ttl TTL of key (in seconds), 0 means the key will never be outdated.
+ * @return a ByteString. returns Optional.EMPTY if the value is written successfully. returns the
+ * previous key if the value already exists, and does not write to TiKV.
+ */
+ Optional putIfAbsent(ByteString key, ByteString value, long ttl);
+
+ /**
+ * Put a key-value pair if the prevValue matched the value in TiKV. This API is atomic.
+ *
+ * To use this API, please enable `tikv.enable_atomic_for_cas`.
+ *
+ * @param key key
+ * @param value value
+ */
+ void compareAndSet(ByteString key, Optional prevValue, ByteString value);
+
+ /**
+ * pair if the prevValue matched the value in TiKV. This API is atomic.
+ *
+ * To use this API, please enable `tikv.enable_atomic_for_cas`.
+ *
+ * @param key key
+ * @param value value
+ * @param ttl TTL of key (in seconds), 0 means the key will never be outdated.
+ */
+ void compareAndSet(ByteString key, Optional prevValue, ByteString value, long ttl);
+
+ /**
+ * Put a set of raw key-value pair to TiKV.
+ *
+ * @param kvPairs kvPairs
+ */
+ void batchPut(Map kvPairs);
+
+ /**
+ * Put a set of raw key-value pair to TiKV.
+ *
+ * @param kvPairs kvPairs
+ * @param ttl the TTL of keys to be put (in seconds), 0 means the keys will never be outdated
+ */
+ void batchPut(Map kvPairs, long ttl);
+
+ /**
+ * Get a raw key-value pair from TiKV if key exists
+ *
+ * @param key raw key
+ * @return a ByteString value if key exists, ByteString.EMPTY if key does not exist
+ */
+ Optional get(ByteString key);
+
+ /**
+ * Get a list of raw key-value pair from TiKV if key exists
+ *
+ * @param keys list of raw key
+ * @return a ByteString value if key exists, ByteString.EMPTY if key does not exist
+ */
+ List batchGet(List keys);
+
+ /**
+ * Delete a list of raw key-value pair from TiKV if key exists
+ *
+ * @param keys list of raw key
+ */
+ void batchDelete(List keys);
+
+ /**
+ * Get the TTL of a raw key from TiKV if key exists
+ *
+ * @param key raw key
+ * @return a Long indicating the TTL of key ttl is a non-null long value indicating TTL if key
+ * exists. - ttl=0 if the key will never be outdated. - ttl=null if the key does not exist
+ */
+ Optional getKeyTTL(ByteString key);
+
+ /**
+ * Create a new `batch scan` request with `keyOnly` option Once resolved this request will result
+ * in a set of scanners over the given keys.
+ *
+ * WARNING: This method is experimental. The `each_limit` parameter does not work as expected.
+ * It does not limit the number of results returned of each range, instead it limits the number of
+ * results in each region of each range. As a result, you may get more than each_limit key-value
+ * pairs for each range. But you should not miss any entries.
+ *
+ * @param ranges a list of ranges
+ * @return a set of scanners for keys over the given keys.
+ */
+ List> batchScanKeys(List> ranges, int eachLimit);
+
+ /**
+ * Create a new `batch scan` request. Once resolved this request will result in a set of scanners
+ * over the given keys.
+ *
+ * WARNING: This method is experimental. The `each_limit` parameter does not work as expected.
+ * It does not limit the number of results returned of each range, instead it limits the number of
+ * results in each region of each range. As a result, you may get more than each_limit key-value
+ * pairs for each range. But you should not miss any entries.
+ *
+ * @param ranges a list of `ScanOption` for each range
+ * @return a set of scanners over the given keys.
+ */
+ List> batchScan(List ranges);
+
+ /**
+ * Scan raw key-value pairs from TiKV in range [startKey, endKey)
+ *
+ * @param startKey raw start key, inclusive
+ * @param endKey raw end key, exclusive
+ * @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT}
+ * @return list of key-value pairs in range
+ */
+ List scan(ByteString startKey, ByteString endKey, int limit);
+
+ /**
+ * Scan raw key-value pairs from TiKV in range [startKey, endKey)
+ *
+ * @param startKey raw start key, inclusive
+ * @param endKey raw end key, exclusive
+ * @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT}
+ * @param keyOnly whether to scan in key-only mode
+ * @return list of key-value pairs in range
+ */
+ List scan(ByteString startKey, ByteString endKey, int limit, boolean keyOnly);
+
+ /**
+ * Scan raw key-value pairs from TiKV in range [startKey, ♾)
+ *
+ * @param startKey raw start key, inclusive
+ * @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT}
+ * @return list of key-value pairs in range
+ */
+ List scan(ByteString startKey, int limit);
+
+ /**
+ * Scan raw key-value pairs from TiKV in range [startKey, ♾)
+ *
+ * @param startKey raw start key, inclusive
+ * @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT}
+ * @param keyOnly whether to scan in key-only mode
+ * @return list of key-value pairs in range
+ */
+ List scan(ByteString startKey, int limit, boolean keyOnly);
+
+ /**
+ * Scan all raw key-value pairs from TiKV in range [startKey, endKey)
+ *
+ * @param startKey raw start key, inclusive
+ * @param endKey raw end key, exclusive
+ * @return list of key-value pairs in range
+ */
+ List scan(ByteString startKey, ByteString endKey);
+
+ /**
+ * Scan all raw key-value pairs from TiKV in range [startKey, endKey)
+ *
+ * @param startKey raw start key, inclusive
+ * @param endKey raw end key, exclusive
+ * @param keyOnly whether to scan in key-only mode
+ * @return list of key-value pairs in range
+ */
+ List scan(ByteString startKey, ByteString endKey, boolean keyOnly);
+
+ /**
+ * Scan keys with prefix
+ *
+ * @param prefixKey prefix key
+ * @param limit limit of keys retrieved
+ * @param keyOnly whether to scan in keyOnly mode
+ * @return kvPairs with the specified prefix
+ */
+ List scanPrefix(ByteString prefixKey, int limit, boolean keyOnly);
+
+ List scanPrefix(ByteString prefixKey);
+
+ List scanPrefix(ByteString prefixKey, boolean keyOnly);
+
+ /**
+ * Delete a raw key-value pair from TiKV if key exists
+ *
+ * @param key raw key to be deleted
+ */
+ void delete(ByteString key);
+
+ /**
+ * Delete all raw key-value pairs in range [startKey, endKey) from TiKV
+ *
+ * Cautious, this API cannot be used concurrently, if multiple clients write keys into this
+ * range along with deleteRange API, the result will be undefined.
+ *
+ * @param startKey raw start key to be deleted
+ * @param endKey raw start key to be deleted
+ */
+ void deleteRange(ByteString startKey, ByteString endKey);
+
+ /**
+ * Delete all raw key-value pairs with the prefix `key` from TiKV
+ *
+ *
Cautious, this API cannot be used concurrently, if multiple clients write keys into this
+ * range along with deleteRange API, the result will be undefined.
+ *
+ * @param key prefix of keys to be deleted
+ */
+ void deletePrefix(ByteString key);
+}
diff --git a/src/main/java/org/tikv/raw/SmartRawKVClient.java b/src/main/java/org/tikv/raw/SmartRawKVClient.java
new file mode 100644
index 00000000000..33a8981e81b
--- /dev/null
+++ b/src/main/java/org/tikv/raw/SmartRawKVClient.java
@@ -0,0 +1,274 @@
+/*
+ * Copyright 2021 PingCAP, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.tikv.raw;
+
+import com.google.protobuf.ByteString;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Histogram;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.tikv.common.TiConfiguration;
+import org.tikv.common.exception.CircuitBreakerOpenException;
+import org.tikv.common.util.Pair;
+import org.tikv.common.util.ScanOption;
+import org.tikv.kvproto.Kvrpcpb;
+import org.tikv.service.failsafe.CircuitBreaker;
+import org.tikv.service.failsafe.CircuitBreakerImpl;
+
+public class SmartRawKVClient implements RawKVClientBase {
+ private static final Logger logger = LoggerFactory.getLogger(SmartRawKVClient.class);
+
+ private static final Histogram REQUEST_LATENCY =
+ Histogram.build()
+ .name("client_java_smart_raw_requests_latency")
+ .help("client smart raw request latency.")
+ .labelNames("type")
+ .register();
+
+ private static final Counter REQUEST_SUCCESS =
+ Counter.build()
+ .name("client_java_smart_raw_requests_success")
+ .help("client smart raw request success.")
+ .labelNames("type")
+ .register();
+
+ private static final Counter REQUEST_FAILURE =
+ Counter.build()
+ .name("client_java_smart_raw_requests_failure")
+ .help("client smart raw request failure.")
+ .labelNames("type")
+ .register();
+
+ private static final Counter CIRCUIT_BREAKER_OPENED =
+ Counter.build()
+ .name("client_java_smart_raw_circuit_breaker_opened")
+ .help("client smart raw circuit breaker opened.")
+ .labelNames("type")
+ .register();
+
+ private final RawKVClientBase client;
+ private final CircuitBreaker circuitBreaker;
+
+ public SmartRawKVClient(RawKVClientBase client, TiConfiguration conf) {
+ this.client = client;
+ this.circuitBreaker = new CircuitBreakerImpl(conf);
+ }
+
+ @Override
+ public void put(ByteString key, ByteString value) {
+ callWithCircuitBreaker("put", () -> client.put(key, value));
+ }
+
+ @Override
+ public void put(ByteString key, ByteString value, long ttl) {
+ callWithCircuitBreaker("put", () -> client.put(key, value, ttl));
+ }
+
+ @Override
+ public Optional putIfAbsent(ByteString key, ByteString value) {
+ return callWithCircuitBreaker("putIfAbsent", () -> client.putIfAbsent(key, value));
+ }
+
+ @Override
+ public Optional putIfAbsent(ByteString key, ByteString value, long ttl) {
+ return callWithCircuitBreaker("putIfAbsent", () -> client.putIfAbsent(key, value, ttl));
+ }
+
+ @Override
+ public void compareAndSet(ByteString key, Optional prevValue, ByteString value) {
+ callWithCircuitBreaker("compareAndSet", () -> client.compareAndSet(key, prevValue, value));
+ }
+
+ @Override
+ public void compareAndSet(
+ ByteString key, Optional prevValue, ByteString value, long ttl) {
+ callWithCircuitBreaker("compareAndSet", () -> client.compareAndSet(key, prevValue, value, ttl));
+ }
+
+ @Override
+ public void batchPut(Map kvPairs) {
+ callWithCircuitBreaker("batchPut", () -> client.batchPut(kvPairs));
+ }
+
+ @Override
+ public void batchPut(Map kvPairs, long ttl) {
+ callWithCircuitBreaker("batchPut", () -> client.batchPut(kvPairs, ttl));
+ }
+
+ @Override
+ public Optional get(ByteString key) {
+ return callWithCircuitBreaker("get", () -> client.get(key));
+ }
+
+ @Override
+ public List batchGet(List keys) {
+ return callWithCircuitBreaker("batchGet", () -> client.batchGet(keys));
+ }
+
+ @Override
+ public void batchDelete(List keys) {
+ callWithCircuitBreaker("batchDelete", () -> client.batchDelete(keys));
+ }
+
+ @Override
+ public Optional getKeyTTL(ByteString key) {
+ return callWithCircuitBreaker("getKeyTTL", () -> client.getKeyTTL(key));
+ }
+
+ @Override
+ public List> batchScanKeys(
+ List> ranges, int eachLimit) {
+ return callWithCircuitBreaker("batchScanKeys", () -> client.batchScanKeys(ranges, eachLimit));
+ }
+
+ @Override
+ public List> batchScan(List ranges) {
+ return callWithCircuitBreaker("batchScan", () -> client.batchScan(ranges));
+ }
+
+ @Override
+ public List scan(ByteString startKey, ByteString endKey, int limit) {
+ return callWithCircuitBreaker("scan", () -> client.scan(startKey, endKey, limit));
+ }
+
+ @Override
+ public List scan(
+ ByteString startKey, ByteString endKey, int limit, boolean keyOnly) {
+ return callWithCircuitBreaker("scan", () -> client.scan(startKey, endKey, limit, keyOnly));
+ }
+
+ @Override
+ public List scan(ByteString startKey, int limit) {
+ return callWithCircuitBreaker("scan", () -> client.scan(startKey, limit));
+ }
+
+ @Override
+ public List scan(ByteString startKey, int limit, boolean keyOnly) {
+ return callWithCircuitBreaker("scan", () -> client.scan(startKey, limit, keyOnly));
+ }
+
+ @Override
+ public List scan(ByteString startKey, ByteString endKey) {
+ return callWithCircuitBreaker("scan", () -> client.scan(startKey, endKey));
+ }
+
+ @Override
+ public List scan(ByteString startKey, ByteString endKey, boolean keyOnly) {
+ return callWithCircuitBreaker("scan", () -> client.scan(startKey, endKey, keyOnly));
+ }
+
+ @Override
+ public List scanPrefix(ByteString prefixKey, int limit, boolean keyOnly) {
+ return callWithCircuitBreaker("scanPrefix", () -> client.scanPrefix(prefixKey, limit, keyOnly));
+ }
+
+ @Override
+ public List scanPrefix(ByteString prefixKey) {
+ return callWithCircuitBreaker("scanPrefix", () -> client.scanPrefix(prefixKey));
+ }
+
+ @Override
+ public List scanPrefix(ByteString prefixKey, boolean keyOnly) {
+ return callWithCircuitBreaker("scanPrefix", () -> client.scanPrefix(prefixKey, keyOnly));
+ }
+
+ @Override
+ public void delete(ByteString key) {
+ callWithCircuitBreaker("delete", () -> client.delete(key));
+ }
+
+ @Override
+ public void deleteRange(ByteString startKey, ByteString endKey) {
+ callWithCircuitBreaker("deleteRange", () -> client.deleteRange(startKey, endKey));
+ }
+
+ @Override
+ public void deletePrefix(ByteString key) {
+ callWithCircuitBreaker("deletePrefix", () -> client.deletePrefix(key));
+ }
+
+ T callWithCircuitBreaker(String funcName, Function1 func) {
+ Histogram.Timer requestTimer = REQUEST_LATENCY.labels(funcName).startTimer();
+ try {
+ T result = callWithCircuitBreaker0(funcName, func);
+ REQUEST_SUCCESS.labels(funcName).inc();
+ return result;
+ } catch (Exception e) {
+ REQUEST_FAILURE.labels(funcName).inc();
+ throw e;
+ } finally {
+ requestTimer.observeDuration();
+ }
+ }
+
+ private T callWithCircuitBreaker0(String funcName, Function1 func) {
+ if (circuitBreaker.allowRequest()) {
+ try {
+ T result = func.apply();
+ circuitBreaker.getMetrics().recordSuccess();
+ return result;
+ } catch (Exception e) {
+ circuitBreaker.getMetrics().recordFailure();
+ throw e;
+ }
+ } else if (circuitBreaker.attemptExecution()) {
+ logger.debug("attemptExecution");
+ try {
+ T result = func.apply();
+ circuitBreaker.getMetrics().recordSuccess();
+ circuitBreaker.recordAttemptSuccess();
+ logger.debug("markSuccess");
+ return result;
+ } catch (Exception e) {
+ circuitBreaker.getMetrics().recordFailure();
+ circuitBreaker.recordAttemptFailure();
+ logger.debug("markNonSuccess");
+ throw e;
+ }
+ } else {
+ logger.debug("Circuit Breaker Opened");
+ CIRCUIT_BREAKER_OPENED.labels(funcName).inc();
+ throw new CircuitBreakerOpenException();
+ }
+ }
+
+ private void callWithCircuitBreaker(String funcName, Function0 func) {
+ callWithCircuitBreaker(
+ funcName,
+ (Function1)
+ () -> {
+ func.apply();
+ return null;
+ });
+ }
+
+ @Override
+ public void close() throws Exception {
+ circuitBreaker.close();
+ client.close();
+ }
+
+ public interface Function1 {
+ T apply();
+ }
+
+ public interface Function0 {
+ void apply();
+ }
+}
diff --git a/src/main/java/org/tikv/service/failsafe/CircuitBreaker.java b/src/main/java/org/tikv/service/failsafe/CircuitBreaker.java
new file mode 100644
index 00000000000..e1c4f1e2bc4
--- /dev/null
+++ b/src/main/java/org/tikv/service/failsafe/CircuitBreaker.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2021 PingCAP, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.tikv.service.failsafe;
+
+import java.io.Closeable;
+
+public interface CircuitBreaker extends Closeable {
+
+ enum Status {
+ CLOSED(0),
+ HALF_OPEN(1),
+ OPEN(2);
+
+ private final int value;
+
+ private Status(int value) {
+ this.value = value;
+ }
+
+ public int getValue() {
+ return value;
+ }
+ }
+
+ /**
+ * Every requests asks this if it is allowed to proceed or not. It is idempotent and does not
+ * modify any internal state.
+ *
+ * @return boolean whether a request should be permitted
+ */
+ boolean allowRequest();
+
+ /**
+ * Invoked at start of command execution to attempt an execution. This is non-idempotent - it may
+ * modify internal state.
+ */
+ boolean attemptExecution();
+
+ /** Invoked on successful executions as part of feedback mechanism when in a half-open state. */
+ void recordAttemptSuccess();
+
+ /** Invoked on unsuccessful executions as part of feedback mechanism when in a half-open state. */
+ void recordAttemptFailure();
+
+ /** Get the Circuit Breaker Metrics Object. */
+ CircuitBreakerMetrics getMetrics();
+}
diff --git a/src/main/java/org/tikv/service/failsafe/CircuitBreakerImpl.java b/src/main/java/org/tikv/service/failsafe/CircuitBreakerImpl.java
new file mode 100644
index 00000000000..7f2231bd7ab
--- /dev/null
+++ b/src/main/java/org/tikv/service/failsafe/CircuitBreakerImpl.java
@@ -0,0 +1,202 @@
+/*
+ * Copyright 2021 PingCAP, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.tikv.service.failsafe;
+
+import io.prometheus.client.Counter;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.tikv.common.TiConfiguration;
+
+public class CircuitBreakerImpl implements CircuitBreaker {
+ private static final Logger logger = LoggerFactory.getLogger(CircuitBreakerImpl.class);
+
+ private static final Counter CIRCUIT_BREAKER_ATTEMPT_COUNTER =
+ Counter.build()
+ .name("client_java_circuit_breaker_attempt_counter")
+ .help("client circuit breaker attempt counter.")
+ .labelNames("type")
+ .register();
+
+ private final boolean enable;
+ private final int windowInSeconds;
+ private final int errorThresholdPercentage;
+ private final int requestVolumeThreshold;
+ private final int sleepWindowInSeconds;
+ private final int attemptRequestCount;
+
+ private final AtomicLong circuitOpened = new AtomicLong(-1);
+ private final AtomicReference status = new AtomicReference<>(Status.CLOSED);
+ private final AtomicLong attemptCount = new AtomicLong(0);
+ private final AtomicLong attemptSuccessCount = new AtomicLong(0);
+
+ private final CircuitBreakerMetrics metrics;
+
+ public CircuitBreakerImpl(TiConfiguration conf) {
+ this(
+ conf.isCircuitBreakEnable(),
+ conf.getCircuitBreakAvailabilityWindowInSeconds(),
+ conf.getCircuitBreakAvailabilityErrorThresholdPercentage(),
+ conf.getCircuitBreakAvailabilityRequestVolumnThreshold(),
+ conf.getCircuitBreakSleepWindowInSeconds(),
+ conf.getCircuitBreakAttemptRequestCount());
+ }
+
+ public CircuitBreakerImpl(
+ boolean enable,
+ int windowInSeconds,
+ int errorThresholdPercentage,
+ int requestVolumeThreshold,
+ int sleepWindowInSeconds,
+ int attemptRequestCount) {
+ this.enable = enable;
+ this.windowInSeconds = windowInSeconds;
+ this.errorThresholdPercentage = errorThresholdPercentage;
+ this.requestVolumeThreshold = requestVolumeThreshold;
+ this.sleepWindowInSeconds = sleepWindowInSeconds;
+ this.attemptRequestCount = attemptRequestCount;
+ this.metrics =
+ enable ? new CircuitBreakerMetricsImpl(windowInSeconds) : new NoOpCircuitBreakerMetrics();
+ this.metrics.addListener(getMetricsListener());
+ }
+
+ private MetricsListener getMetricsListener() {
+ return hc -> {
+ logger.debug("onNext " + hc.toString());
+ // check if we are past the requestVolumeThreshold
+ if (hc.getTotalRequests() < requestVolumeThreshold) {
+ // we are not past the minimum volume threshold for the stat window,
+ // so no change to circuit status.
+ // if it was CLOSED, it stays CLOSED
+ // if it was half-open, we need to wait for some successful command executions
+ // if it was open, we need to wait for sleep window to elapse
+ } else {
+ if (hc.getErrorPercentage() < errorThresholdPercentage) {
+ // we are not past the minimum error threshold for the stat window,
+ // so no change to circuit status.
+ // if it was CLOSED, it stays CLOSED
+ // if it was half-open, we need to wait for some successful command executions
+ // if it was open, we need to wait for sleep window to elapse
+ } else {
+ // our failure rate is too high, we need to set the state to OPEN
+ close2Open();
+ }
+ }
+ };
+ }
+
+ @Override
+ public CircuitBreakerMetrics getMetrics() {
+ return metrics;
+ }
+
+ @Override
+ public boolean allowRequest() {
+ if (!enable) {
+ return true;
+ }
+ return !isOpen();
+ }
+
+ boolean isOpen() {
+ return circuitOpened.get() >= 0;
+ }
+
+ Status getStatus() {
+ return status.get();
+ }
+
+ @Override
+ public void recordAttemptSuccess() {
+ CIRCUIT_BREAKER_ATTEMPT_COUNTER.labels("success").inc();
+ if (attemptSuccessCount.incrementAndGet() >= this.attemptRequestCount) {
+ halfOpen2Close();
+ }
+ }
+
+ @Override
+ public void recordAttemptFailure() {
+ CIRCUIT_BREAKER_ATTEMPT_COUNTER.labels("failure").inc();
+ halfOpen2Open();
+ }
+
+ @Override
+ public boolean attemptExecution() {
+ if (allowRequest()) {
+ return true;
+ } else {
+ if (isAfterSleepWindow()) {
+ // only the `attemptRequestCount` requests after sleep window should execute
+ // if all the executing commands succeed, the status will transition to CLOSED
+ // if some of the executing commands fail, the status will transition to OPEN
+ open2HalfOpen();
+ return attemptCount.incrementAndGet() <= attemptRequestCount;
+ } else {
+ return false;
+ }
+ }
+ }
+
+ private boolean isAfterSleepWindow() {
+ final long circuitOpenTime = circuitOpened.get();
+ final long currentTime = System.currentTimeMillis();
+ final long sleepWindowTime = (long) sleepWindowInSeconds * 1000;
+ return currentTime >= circuitOpenTime + sleepWindowTime;
+ }
+
+ private void close2Open() {
+ if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
+ // This thread wins the race to open the circuit
+ // it sets the start time for the sleep window
+ circuitOpened.set(System.currentTimeMillis());
+ logger.info("CLOSED => OPEN");
+ }
+ }
+
+ private void halfOpen2Close() {
+ if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
+ // This thread wins the race to close the circuit
+ circuitOpened.set(-1L);
+ logger.info("HALF_OPEN => CLOSED");
+ }
+ }
+
+ private void open2HalfOpen() {
+ if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
+ // This thread wins the race to half close the circuit
+ // it resets the attempt count
+ attemptCount.set(0);
+ attemptSuccessCount.set(0);
+ logger.info("OPEN => HALF_OPEN");
+ }
+ }
+
+ private void halfOpen2Open() {
+ if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
+ // This thread wins the race to re-open the circuit
+ // it resets the start time for the sleep window
+ circuitOpened.set(System.currentTimeMillis());
+ logger.info("HALF_OPEN => OPEN");
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ metrics.close();
+ }
+}
diff --git a/src/main/java/org/tikv/service/failsafe/CircuitBreakerMetrics.java b/src/main/java/org/tikv/service/failsafe/CircuitBreakerMetrics.java
new file mode 100644
index 00000000000..6287a9f199e
--- /dev/null
+++ b/src/main/java/org/tikv/service/failsafe/CircuitBreakerMetrics.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2021 PingCAP, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.tikv.service.failsafe;
+
+import java.io.Closeable;
+
+public interface CircuitBreakerMetrics extends Closeable {
+ /** Record a successful call. */
+ void recordSuccess();
+
+ /** Record a failure call. */
+ void recordFailure();
+
+ /** Add metrics listener. */
+ void addListener(MetricsListener metricsListener);
+}
diff --git a/src/main/java/org/tikv/service/failsafe/CircuitBreakerMetricsImpl.java b/src/main/java/org/tikv/service/failsafe/CircuitBreakerMetricsImpl.java
new file mode 100644
index 00000000000..da497efcb71
--- /dev/null
+++ b/src/main/java/org/tikv/service/failsafe/CircuitBreakerMetricsImpl.java
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2021 PingCAP, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.tikv.service.failsafe;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CircuitBreakerMetricsImpl implements CircuitBreakerMetrics {
+ private static final Logger logger = LoggerFactory.getLogger(CircuitBreakerMetricsImpl.class);
+
+ private final int windowInMS;
+ private final List listeners;
+ private final AtomicReference currentMetrics;
+
+ private final ScheduledExecutorService scheduler;
+ private static final int SCHEDULER_INITIAL_DELAY = 1000;
+ private static final int SCHEDULER_PERIOD = 1000;
+
+ public CircuitBreakerMetricsImpl(int windowInSeconds) {
+ this.windowInMS = windowInSeconds * 1000;
+ this.listeners = new ArrayList<>();
+ this.currentMetrics = new AtomicReference<>(new SingleWindowMetrics());
+
+ scheduler =
+ new ScheduledThreadPoolExecutor(
+ 1,
+ new BasicThreadFactory.Builder()
+ .namingPattern("circuit-breaker-metrics-%d")
+ .daemon(true)
+ .build());
+
+ scheduler.scheduleAtFixedRate(
+ this::onReachCircuitWindow,
+ SCHEDULER_INITIAL_DELAY,
+ SCHEDULER_PERIOD,
+ TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void recordSuccess() {
+ currentMetrics.get().recordSuccess();
+ }
+
+ @Override
+ public void recordFailure() {
+ currentMetrics.get().recordFailure();
+ }
+
+ private void onReachCircuitWindow() {
+ SingleWindowMetrics singleWindowMetrics = currentMetrics.get();
+ if (System.currentTimeMillis() < singleWindowMetrics.getStartMS() + windowInMS) {
+ return;
+ }
+ if (!currentMetrics.compareAndSet(singleWindowMetrics, new SingleWindowMetrics())) {
+ return;
+ }
+ logger.debug("window timeout, reset SingleWindowMetrics");
+ HealthCounts healthCounts = singleWindowMetrics.getHealthCounts();
+ for (MetricsListener metricsListener : listeners) {
+ metricsListener.onNext(healthCounts);
+ }
+ }
+
+ @Override
+ public void addListener(MetricsListener metricsListener) {
+ listeners.add(metricsListener);
+ }
+
+ @Override
+ public void close() throws IOException {
+ scheduler.shutdown();
+ }
+
+ /** Instead of using SingleWindowMetrics, it is better to use RollingWindowMetrics. */
+ static class SingleWindowMetrics {
+ private final long startMS = System.currentTimeMillis();
+ private final AtomicLong totalCount = new AtomicLong(0);
+ private final AtomicLong errorCount = new AtomicLong(0);
+
+ public void recordSuccess() {
+ totalCount.incrementAndGet();
+ }
+
+ public void recordFailure() {
+ totalCount.incrementAndGet();
+
+ errorCount.incrementAndGet();
+ }
+
+ public HealthCounts getHealthCounts() {
+ return new HealthCounts(totalCount.get(), errorCount.get());
+ }
+
+ public long getStartMS() {
+ return startMS;
+ }
+ }
+}
diff --git a/src/main/java/org/tikv/service/failsafe/HealthCounts.java b/src/main/java/org/tikv/service/failsafe/HealthCounts.java
new file mode 100644
index 00000000000..68f986aa489
--- /dev/null
+++ b/src/main/java/org/tikv/service/failsafe/HealthCounts.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2021 PingCAP, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.tikv.service.failsafe;
+
+public class HealthCounts {
+ private final long totalCount;
+ private final long errorCount;
+ private final int errorPercentage;
+
+ HealthCounts(long total, long error) {
+ this.totalCount = total;
+ this.errorCount = error;
+ if (totalCount > 0) {
+ this.errorPercentage = (int) ((double) errorCount / totalCount * 100);
+ } else {
+ this.errorPercentage = 0;
+ }
+ }
+
+ public long getTotalRequests() {
+ return totalCount;
+ }
+
+ public long getErrorCount() {
+ return errorCount;
+ }
+
+ public int getErrorPercentage() {
+ return errorPercentage;
+ }
+
+ @Override
+ public String toString() {
+ return "HealthCounts{"
+ + "totalCount="
+ + totalCount
+ + ", errorCount="
+ + errorCount
+ + ", errorPercentage="
+ + errorPercentage
+ + '}';
+ }
+}
diff --git a/src/main/java/org/tikv/service/failsafe/MetricsListener.java b/src/main/java/org/tikv/service/failsafe/MetricsListener.java
new file mode 100644
index 00000000000..2fd59b46617
--- /dev/null
+++ b/src/main/java/org/tikv/service/failsafe/MetricsListener.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2021 PingCAP, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.tikv.service.failsafe;
+
+public interface MetricsListener {
+ void onNext(HealthCounts healthCounts);
+}
diff --git a/src/main/java/org/tikv/service/failsafe/NoOpCircuitBreakerMetrics.java b/src/main/java/org/tikv/service/failsafe/NoOpCircuitBreakerMetrics.java
new file mode 100644
index 00000000000..e5474fb6e12
--- /dev/null
+++ b/src/main/java/org/tikv/service/failsafe/NoOpCircuitBreakerMetrics.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2021 PingCAP, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.tikv.service.failsafe;
+
+import java.io.IOException;
+
+public class NoOpCircuitBreakerMetrics implements CircuitBreakerMetrics {
+ @Override
+ public void recordSuccess() {
+ // do nothing
+ }
+
+ @Override
+ public void recordFailure() {
+ // do nothing
+ }
+
+ @Override
+ public void addListener(MetricsListener metricsListener) {
+ // do nothing
+ }
+
+ @Override
+ public void close() throws IOException {
+ // do nothing
+ }
+}
diff --git a/src/test/java/org/tikv/raw/CASTest.java b/src/test/java/org/tikv/raw/CASTest.java
index e8cccf5c99c..a331b4d06f9 100644
--- a/src/test/java/org/tikv/raw/CASTest.java
+++ b/src/test/java/org/tikv/raw/CASTest.java
@@ -74,7 +74,7 @@ public void rawPutIfAbsentTest() {
Optional res2 = client.putIfAbsent(key, value2, ttl);
assertEquals(res2.get(), value);
try {
- Thread.sleep(ttl * 1000);
+ Thread.sleep(ttl * 1000 + 100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
diff --git a/src/test/java/org/tikv/raw/SmartRawKVClientTest.java b/src/test/java/org/tikv/raw/SmartRawKVClientTest.java
new file mode 100644
index 00000000000..61bb70d6507
--- /dev/null
+++ b/src/test/java/org/tikv/raw/SmartRawKVClientTest.java
@@ -0,0 +1,86 @@
+package org.tikv.raw;
+
+import static org.junit.Assert.assertTrue;
+
+import com.google.protobuf.ByteString;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.tikv.BaseRawKVTest;
+import org.tikv.common.TiConfiguration;
+import org.tikv.common.TiSession;
+import org.tikv.common.exception.CircuitBreakerOpenException;
+
+public class SmartRawKVClientTest extends BaseRawKVTest {
+ private boolean enable = true;
+ private int windowInSeconds = 2;
+ private int errorThresholdPercentage = 100;
+ private int requestVolumeThreshold = 10;
+ private int sleepWindowInSeconds = 1;
+ private int attemptRequestCount = 10;
+
+ private int sleepDelta = 100;
+
+ private TiSession session;
+ private SmartRawKVClient client;
+
+ @Before
+ public void setup() {
+ TiConfiguration conf = createTiConfiguration();
+ conf.setCircuitBreakEnable(enable);
+ conf.setCircuitBreakAvailabilityWindowInSeconds(windowInSeconds);
+ conf.setCircuitBreakAvailabilityErrorThresholdPercentage(errorThresholdPercentage);
+ conf.setCircuitBreakAvailabilityRequestVolumnThreshold(requestVolumeThreshold);
+ conf.setCircuitBreakSleepWindowInSeconds(sleepWindowInSeconds);
+ conf.setCircuitBreakAttemptRequestCount(attemptRequestCount);
+ session = TiSession.create(conf);
+ client = session.createSmartRawClient();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (session != null) {
+ session.close();
+ }
+ }
+
+ @Test
+ public void testCircuitBreaker() throws InterruptedException {
+ // CLOSED => OPEN
+ {
+ for (int i = 1; i <= requestVolumeThreshold; i++) {
+ error();
+ }
+ Thread.sleep(windowInSeconds * 1000 + sleepDelta);
+
+ Exception error = null;
+ try {
+ client.get(ByteString.copyFromUtf8("key"));
+ assertTrue(false);
+ } catch (Exception e) {
+ error = e;
+ }
+ assertTrue(error instanceof CircuitBreakerOpenException);
+ }
+
+ // OPEN => CLOSED
+ {
+ Thread.sleep(sleepWindowInSeconds * 1000);
+ for (int i = 1; i <= attemptRequestCount; i++) {
+ success();
+ }
+ client.get(ByteString.copyFromUtf8("key"));
+ }
+ }
+
+ private void success() {
+ client.get(ByteString.copyFromUtf8("key"));
+ }
+
+ private void error() {
+ try {
+ client.callWithCircuitBreaker("error", () -> 1 / 0);
+ } catch (Exception ignored) {
+ }
+ }
+}
diff --git a/src/test/java/org/tikv/service/failsafe/CircuitBreakerMetricsTest.java b/src/test/java/org/tikv/service/failsafe/CircuitBreakerMetricsTest.java
new file mode 100644
index 00000000000..a8cbc3576fb
--- /dev/null
+++ b/src/test/java/org/tikv/service/failsafe/CircuitBreakerMetricsTest.java
@@ -0,0 +1,69 @@
+package org.tikv.service.failsafe;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+import org.junit.Test;
+
+public class CircuitBreakerMetricsTest {
+ private static final int TEST_COUNT = 10;
+ private static final int WINDOW_IN_SECONDS = 1;
+ private static final int SLEEP_DELTA = 100;
+
+ @Test
+ public void testAllSuccess() throws InterruptedException, IOException {
+ CircuitBreakerMetricsImpl metrics = new CircuitBreakerMetricsImpl(WINDOW_IN_SECONDS);
+
+ AtomicReference healthCounts = new AtomicReference<>();
+ MetricsListener metricsListener = healthCounts::set;
+ metrics.addListener(metricsListener);
+
+ for (int i = 1; i <= TEST_COUNT; i++) {
+ metrics.recordSuccess();
+ }
+ Thread.sleep(WINDOW_IN_SECONDS * 1000 + SLEEP_DELTA);
+ assertNotNull(healthCounts.get());
+ assertEquals(healthCounts.get().getTotalRequests(), TEST_COUNT);
+ assertEquals(healthCounts.get().getErrorPercentage(), 0);
+ metrics.close();
+ }
+
+ @Test
+ public void testAllFailure() throws InterruptedException, IOException {
+ CircuitBreakerMetricsImpl metrics = new CircuitBreakerMetricsImpl(WINDOW_IN_SECONDS);
+
+ AtomicReference healthCounts = new AtomicReference<>();
+ MetricsListener metricsListener = healthCounts::set;
+ metrics.addListener(metricsListener);
+
+ for (int i = 1; i <= TEST_COUNT; i++) {
+ metrics.recordFailure();
+ }
+ Thread.sleep(WINDOW_IN_SECONDS * 1000 + SLEEP_DELTA);
+ assertNotNull(healthCounts.get());
+ assertEquals(healthCounts.get().getTotalRequests(), TEST_COUNT);
+ assertEquals(healthCounts.get().getErrorPercentage(), 100);
+ metrics.close();
+ }
+
+ @Test
+ public void testHalfFailure() throws InterruptedException, IOException {
+ CircuitBreakerMetricsImpl metrics = new CircuitBreakerMetricsImpl(WINDOW_IN_SECONDS);
+
+ AtomicReference healthCounts = new AtomicReference<>();
+ MetricsListener metricsListener = healthCounts::set;
+ metrics.addListener(metricsListener);
+
+ for (int i = 1; i <= TEST_COUNT; i++) {
+ metrics.recordFailure();
+ metrics.recordSuccess();
+ }
+ Thread.sleep(WINDOW_IN_SECONDS * 1000 + SLEEP_DELTA);
+ assertNotNull(healthCounts.get());
+ assertEquals(healthCounts.get().getTotalRequests(), TEST_COUNT * 2);
+ assertEquals(healthCounts.get().getErrorPercentage(), 50);
+ metrics.close();
+ }
+}
diff --git a/src/test/java/org/tikv/service/failsafe/CircuitBreakerTest.java b/src/test/java/org/tikv/service/failsafe/CircuitBreakerTest.java
new file mode 100644
index 00000000000..766d5bff709
--- /dev/null
+++ b/src/test/java/org/tikv/service/failsafe/CircuitBreakerTest.java
@@ -0,0 +1,69 @@
+package org.tikv.service.failsafe;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+public class CircuitBreakerTest {
+
+ @Test
+ public void testCircuitBreaker() throws InterruptedException {
+ boolean enable = true;
+ int windowInSeconds = 2;
+ int errorThresholdPercentage = 100;
+ int requestVolumeThreshold = 10;
+ int sleepWindowInSeconds = 1;
+ int attemptRequestCount = 10;
+
+ int sleepDelta = 100;
+
+ CircuitBreakerImpl circuitBreaker =
+ new CircuitBreakerImpl(
+ enable,
+ windowInSeconds,
+ errorThresholdPercentage,
+ requestVolumeThreshold,
+ sleepWindowInSeconds,
+ attemptRequestCount);
+ CircuitBreakerMetrics metrics = circuitBreaker.getMetrics();
+
+ // initial state: CLOSE
+ assertTrue(!circuitBreaker.isOpen());
+ assertEquals(circuitBreaker.getStatus(), CircuitBreaker.Status.CLOSED);
+
+ // CLOSE => OPEN
+ for (int i = 1; i <= requestVolumeThreshold; i++) {
+ metrics.recordFailure();
+ }
+ Thread.sleep(windowInSeconds * 1000 + sleepDelta);
+ assertTrue(circuitBreaker.isOpen());
+ assertEquals(circuitBreaker.getStatus(), CircuitBreaker.Status.OPEN);
+
+ // OPEN => HALF_OPEN
+ Thread.sleep(sleepWindowInSeconds * 1000);
+ assertTrue(circuitBreaker.attemptExecution());
+ assertTrue(circuitBreaker.isOpen());
+ assertEquals(circuitBreaker.getStatus(), CircuitBreaker.Status.HALF_OPEN);
+
+ // HALF_OPEN => OPEN
+ circuitBreaker.recordAttemptFailure();
+ assertTrue(circuitBreaker.isOpen());
+ assertEquals(circuitBreaker.getStatus(), CircuitBreaker.Status.OPEN);
+
+ // OPEN => HALF_OPEN
+ Thread.sleep(sleepWindowInSeconds * 1000 + sleepDelta);
+ assertTrue(circuitBreaker.attemptExecution());
+ circuitBreaker.recordAttemptSuccess();
+ assertTrue(circuitBreaker.isOpen());
+ assertEquals(circuitBreaker.getStatus(), CircuitBreaker.Status.HALF_OPEN);
+
+ // HALF_OPEN => CLOSED
+ for (int i = 1; i < attemptRequestCount; i++) {
+ assertTrue(circuitBreaker.attemptExecution());
+ circuitBreaker.recordAttemptSuccess();
+ }
+ assertTrue(!circuitBreaker.isOpen());
+ assertEquals(circuitBreaker.getStatus(), CircuitBreaker.Status.CLOSED);
+ }
+}