From 6dfa1a8b03210d5b8ad1023f8ff594bfb0771d2a Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Thu, 2 Dec 2021 13:11:19 +0800 Subject: [PATCH 01/17] add Circuit Breaker Signed-off-by: marsishandsome --- .../java/org/tikv/common/ConfigUtils.java | 19 ++ .../java/org/tikv/common/TiConfiguration.java | 77 +++++ src/main/java/org/tikv/common/TiSession.java | 6 + .../CircuitBreakerOpenException.java | 22 ++ .../tikv/common/failsafe/CircuitBreaker.java | 62 ++++ .../common/failsafe/CircuitBreakerImpl.java | 187 ++++++++++++ .../failsafe/CircuitBreakerMetrics.java | 27 ++ .../failsafe/CircuitBreakerMetricsImpl.java | 91 ++++++ .../tikv/common/failsafe/HealthCounts.java | 56 ++++ .../tikv/common/failsafe/MetricsListener.java | 20 ++ .../java/org/tikv/raw/BaseRawKVClient.java | 272 ++++++++++++++++++ .../tikv/raw/CircuitBreakerRawKVClient.java | 230 +++++++++++++++ src/main/java/org/tikv/raw/RawKVClient.java | 224 ++------------- .../failsafe/CircuitBreakerMetricsTest.java | 68 +++++ .../common/failsafe/CircuitBreakerTest.java | 68 +++++ .../raw/CircuitBreakerRawKVClientTest.java | 85 ++++++ 16 files changed, 1317 insertions(+), 197 deletions(-) create mode 100644 src/main/java/org/tikv/common/exception/CircuitBreakerOpenException.java create mode 100644 src/main/java/org/tikv/common/failsafe/CircuitBreaker.java create mode 100644 src/main/java/org/tikv/common/failsafe/CircuitBreakerImpl.java create mode 100644 src/main/java/org/tikv/common/failsafe/CircuitBreakerMetrics.java create mode 100644 src/main/java/org/tikv/common/failsafe/CircuitBreakerMetricsImpl.java create mode 100644 src/main/java/org/tikv/common/failsafe/HealthCounts.java create mode 100644 src/main/java/org/tikv/common/failsafe/MetricsListener.java create mode 100644 src/main/java/org/tikv/raw/BaseRawKVClient.java create mode 100644 src/main/java/org/tikv/raw/CircuitBreakerRawKVClient.java create mode 100644 src/test/java/org/tikv/common/failsafe/CircuitBreakerMetricsTest.java create mode 100644 src/test/java/org/tikv/common/failsafe/CircuitBreakerTest.java create mode 100644 src/test/java/org/tikv/raw/CircuitBreakerRawKVClientTest.java diff --git a/src/main/java/org/tikv/common/ConfigUtils.java b/src/main/java/org/tikv/common/ConfigUtils.java index 8f1c3e9d361..8f5f1613675 100644 --- a/src/main/java/org/tikv/common/ConfigUtils.java +++ b/src/main/java/org/tikv/common/ConfigUtils.java @@ -92,6 +92,18 @@ public class ConfigUtils { public static final String TIKV_KEY_CERT_CHAIN = "tikv.key_cert_chain"; public static final String TIKV_KEY_FILE = "tikv.key_file"; + public static final String TiKV_CIRCUIT_BREAK_ENABLE = "tikv.circuit.break.enable"; + public static final String TiKV_CIRCUIT_BREAK_AVAILABILITY_WINDOW_IN_SECONDS = + "tikv.circuit.break.trigger.availability.window_in_seconds"; + public static final String TiKV_CIRCUIT_BREAK_AVAILABILITY_ERROR_THRESHOLD_PERCENTAGE = + "tikv.circuit.break.trigger.availability.error_threshold_percentage"; + public static final String TiKV_CIRCUIT_BREAK_AVAILABILITY_REQUEST_VOLUMN_THRESHOLD = + "tikv.circuit.break.trigger.availability.request_volumn_threshold"; + public static final String TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS = + "tikv.circuit.break.trigger.sleep_window_in_seconds"; + public static final String TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT = + "tikv.circuit.break.trigger.attempt_request_count"; + public static final String TIFLASH_ENABLE = "tiflash.enable"; public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379"; public static final String DEF_TIMEOUT = "200ms"; @@ -161,4 +173,11 @@ public class ConfigUtils { public static final int DEF_TIKV_GRPC_KEEPALIVE_TIMEOUT = 3; public static final boolean DEF_TIKV_TLS_ENABLE = false; public static final boolean DEF_TIFLASH_ENABLE = false; + + public static final boolean DEF_TiKV_CIRCUIT_BREAK_ENABLE = false; + public static final int DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_WINDOW_IN_SECONDS = 60; + public static final int DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_ERROR_THRESHOLD_PERCENTAGE = 100; + public static final int DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_REQUST_VOLUMN_THRESHOLD = 10; + public static final int DEF_TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS = 20; + public static final int DEF_TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT = 10; } diff --git a/src/main/java/org/tikv/common/TiConfiguration.java b/src/main/java/org/tikv/common/TiConfiguration.java index aad8df6d956..4cdb48a777f 100644 --- a/src/main/java/org/tikv/common/TiConfiguration.java +++ b/src/main/java/org/tikv/common/TiConfiguration.java @@ -130,6 +130,20 @@ private static void loadFromDefaultProperties() { setIfMissing(TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS); setIfMissing(TIKV_BO_REGION_MISS_BASE_IN_MS, DEF_TIKV_BO_REGION_MISS_BASE_IN_MS); setIfMissing(TIKV_RAWKV_SCAN_SLOWLOG_IN_MS, DEF_TIKV_RAWKV_SCAN_SLOWLOG_IN_MS); + setIfMissing(TiKV_CIRCUIT_BREAK_ENABLE, DEF_TiKV_CIRCUIT_BREAK_ENABLE); + setIfMissing( + TiKV_CIRCUIT_BREAK_AVAILABILITY_WINDOW_IN_SECONDS, + DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_WINDOW_IN_SECONDS); + setIfMissing( + TiKV_CIRCUIT_BREAK_AVAILABILITY_ERROR_THRESHOLD_PERCENTAGE, + DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_ERROR_THRESHOLD_PERCENTAGE); + setIfMissing( + TiKV_CIRCUIT_BREAK_AVAILABILITY_REQUEST_VOLUMN_THRESHOLD, + DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_REQUST_VOLUMN_THRESHOLD); + setIfMissing( + TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS, DEF_TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS); + setIfMissing( + TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT, DEF_TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT); } public static void listAll() { @@ -360,6 +374,16 @@ private static ReplicaRead getReplicaRead(String key) { private int keepaliveTime = getInt(TIKV_GRPC_KEEPALIVE_TIME); private int keepaliveTimeout = getInt(TIKV_GRPC_KEEPALIVE_TIMEOUT); + private boolean circuitBreakEnable = getBoolean(TiKV_CIRCUIT_BREAK_ENABLE); + private int circuitBreakAvailabilityWindowInSeconds = + getInt(TiKV_CIRCUIT_BREAK_AVAILABILITY_WINDOW_IN_SECONDS); + private int circuitBreakAvailabilityErrorThresholdPercentage = + getInt(TiKV_CIRCUIT_BREAK_AVAILABILITY_ERROR_THRESHOLD_PERCENTAGE); + private int circuitBreakAvailabilityRequestVolumnThreshold = + getInt(TiKV_CIRCUIT_BREAK_AVAILABILITY_REQUEST_VOLUMN_THRESHOLD); + private int circuitBreakSleepWindowInSeconds = getInt(TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS); + private int circuitBreakAttemptRequestCount = getInt(TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT); + public enum KVMode { TXN, RAW @@ -881,4 +905,57 @@ public int getRawKVScanSlowLogInMS() { public void setRawKVScanSlowLogInMS(int rawKVScanSlowLogInMS) { this.rawKVScanSlowLogInMS = rawKVScanSlowLogInMS; } + + public boolean isCircuitBreakEnable() { + return circuitBreakEnable; + } + + public void setCircuitBreakEnable(boolean circuitBreakEnable) { + this.circuitBreakEnable = circuitBreakEnable; + } + + public int getCircuitBreakAvailabilityWindowInSeconds() { + return circuitBreakAvailabilityWindowInSeconds; + } + + public void setCircuitBreakAvailabilityWindowInSeconds( + int circuitBreakAvailabilityWindowInSeconds) { + this.circuitBreakAvailabilityWindowInSeconds = circuitBreakAvailabilityWindowInSeconds; + } + + public int getCircuitBreakAvailabilityErrorThresholdPercentage() { + return circuitBreakAvailabilityErrorThresholdPercentage; + } + + public void setCircuitBreakAvailabilityErrorThresholdPercentage( + int circuitBreakAvailabilityErrorThresholdPercentage) { + this.circuitBreakAvailabilityErrorThresholdPercentage = + circuitBreakAvailabilityErrorThresholdPercentage; + } + + public int getCircuitBreakAvailabilityRequestVolumnThreshold() { + return circuitBreakAvailabilityRequestVolumnThreshold; + } + + public void setCircuitBreakAvailabilityRequestVolumnThreshold( + int circuitBreakAvailabilityRequestVolumnThreshold) { + this.circuitBreakAvailabilityRequestVolumnThreshold = + circuitBreakAvailabilityRequestVolumnThreshold; + } + + public int getCircuitBreakSleepWindowInSeconds() { + return circuitBreakSleepWindowInSeconds; + } + + public void setCircuitBreakSleepWindowInSeconds(int circuitBreakSleepWindowInSeconds) { + this.circuitBreakSleepWindowInSeconds = circuitBreakSleepWindowInSeconds; + } + + public int getCircuitBreakAttemptRequestCount() { + return circuitBreakAttemptRequestCount; + } + + public void setCircuitBreakAttemptRequestCount(int circuitBreakAttemptRequestCount) { + this.circuitBreakAttemptRequestCount = circuitBreakAttemptRequestCount; + } } diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index f3f10c9ccef..8b3a33f8f9a 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -42,6 +42,7 @@ import org.tikv.common.util.*; import org.tikv.kvproto.ImportSstpb; import org.tikv.kvproto.Metapb; +import org.tikv.raw.CircuitBreakerRawKVClient; import org.tikv.raw.RawKVClient; import org.tikv.txn.KVClient; import org.tikv.txn.TxnKVClient; @@ -126,6 +127,11 @@ public RawKVClient createRawClient() { return new RawKVClient(this, this.getRegionStoreClientBuilder()); } + public CircuitBreakerRawKVClient createCircuitBreakerRawClient() { + RawKVClient rawKVClient = createRawClient(); + return new CircuitBreakerRawKVClient(rawKVClient, getConf()); + } + public KVClient createKVClient() { checkIsClosed(); diff --git a/src/main/java/org/tikv/common/exception/CircuitBreakerOpenException.java b/src/main/java/org/tikv/common/exception/CircuitBreakerOpenException.java new file mode 100644 index 00000000000..08982575644 --- /dev/null +++ b/src/main/java/org/tikv/common/exception/CircuitBreakerOpenException.java @@ -0,0 +1,22 @@ +/* + * Copyright 2018 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.tikv.common.exception; + +public class CircuitBreakerOpenException extends RuntimeException { + public CircuitBreakerOpenException() { + super("Circuit Breaker Opened"); + } +} diff --git a/src/main/java/org/tikv/common/failsafe/CircuitBreaker.java b/src/main/java/org/tikv/common/failsafe/CircuitBreaker.java new file mode 100644 index 00000000000..cab27aecebb --- /dev/null +++ b/src/main/java/org/tikv/common/failsafe/CircuitBreaker.java @@ -0,0 +1,62 @@ +/* + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.tikv.common.failsafe; + +public interface CircuitBreaker { + + enum Status { + CLOSED, + OPEN, + HALF_OPEN; + } + + /** + * Every requests asks this if it is allowed to proceed or not. It is idempotent and does not + * modify any internal state. + * + * @return boolean whether a request should be permitted + */ + boolean allowRequest(); + + /** + * Whether the circuit is currently open (tripped). + * + * @return boolean state of circuit breaker + */ + boolean isOpen(); + + /** + * Get the status of circuit breaker. + * + * @return The status of circuit breaker. + */ + Status getStatus(); + + /** + * Invoked at start of command execution to attempt an execution. This is non-idempotent - it may + * modify internal state. + */ + boolean attemptExecution(); + + /** Invoked on successful executions as part of feedback mechanism when in a half-open state. */ + void markAttemptSuccess(); + + /** Invoked on unsuccessful executions as part of feedback mechanism when in a half-open state. */ + void markAttemptFailure(); + + /** Get the Circuit Breaker Metrics Object. */ + CircuitBreakerMetrics getMetrics(); +} diff --git a/src/main/java/org/tikv/common/failsafe/CircuitBreakerImpl.java b/src/main/java/org/tikv/common/failsafe/CircuitBreakerImpl.java new file mode 100644 index 00000000000..e6f3cdf0d95 --- /dev/null +++ b/src/main/java/org/tikv/common/failsafe/CircuitBreakerImpl.java @@ -0,0 +1,187 @@ +/* + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.tikv.common.failsafe; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.tikv.common.TiConfiguration; + +public class CircuitBreakerImpl implements CircuitBreaker { + private static final Logger logger = LoggerFactory.getLogger(CircuitBreakerImpl.class); + + private final boolean enable; + private final int windowInSeconds; + private final int errorThresholdPercentage; + private final int requestVolumeThreshold; + private final int sleepWindowInSeconds; + private final int attemptRequestCount; + + private final AtomicLong circuitOpened = new AtomicLong(-1); + private final AtomicReference status = new AtomicReference<>(Status.CLOSED); + private final AtomicLong attemptCount = new AtomicLong(0); + private final AtomicLong attemptSuccessCount = new AtomicLong(0); + + private final CircuitBreakerMetrics metrics; + + public CircuitBreakerImpl(TiConfiguration conf) { + this( + conf.isCircuitBreakEnable(), + conf.getCircuitBreakAvailabilityWindowInSeconds(), + conf.getCircuitBreakAvailabilityErrorThresholdPercentage(), + conf.getCircuitBreakAvailabilityRequestVolumnThreshold(), + conf.getCircuitBreakSleepWindowInSeconds(), + conf.getCircuitBreakAttemptRequestCount()); + } + + public CircuitBreakerImpl( + boolean enable, + int windowInSeconds, + int errorThresholdPercentage, + int requestVolumeThreshold, + int sleepWindowInSeconds, + int attemptRequestCount) { + this.enable = enable; + this.windowInSeconds = windowInSeconds; + this.errorThresholdPercentage = errorThresholdPercentage; + this.requestVolumeThreshold = requestVolumeThreshold; + this.sleepWindowInSeconds = sleepWindowInSeconds; + this.attemptRequestCount = attemptRequestCount; + this.metrics = new CircuitBreakerMetricsImpl(windowInSeconds); + this.metrics.addListener(getMetricsListener()); + } + + private MetricsListener getMetricsListener() { + return hc -> { + logger.info("onNext " + hc.toString()); + // check if we are past the requestVolumeThreshold + if (hc.getTotalRequests() < requestVolumeThreshold) { + // we are not past the minimum volume threshold for the stat window, + // so no change to circuit status. + // if it was CLOSED, it stays CLOSED + // if it was half-open, we need to wait for some successful command executions + // if it was open, we need to wait for sleep window to elapse + } else { + if (hc.getErrorPercentage() < errorThresholdPercentage) { + // we are not past the minimum error threshold for the stat window, + // so no change to circuit status. + // if it was CLOSED, it stays CLOSED + // if it was half-open, we need to wait for some successful command executions + // if it was open, we need to wait for sleep window to elapse + } else { + // our failure rate is too high, we need to set the state to OPEN + close2Open(); + } + } + }; + } + + @Override + public CircuitBreakerMetrics getMetrics() { + return metrics; + } + + @Override + public boolean allowRequest() { + if (!enable) { + return true; + } + return !isOpen(); + } + + @Override + public boolean isOpen() { + return circuitOpened.get() >= 0; + } + + @Override + public Status getStatus() { + return status.get(); + } + + @Override + public void markAttemptSuccess() { + if (attemptSuccessCount.incrementAndGet() >= this.attemptRequestCount) { + halfOpen2Close(); + } + } + + @Override + public void markAttemptFailure() { + halfOpen2Open(); + } + + @Override + public boolean attemptExecution() { + if (!isOpen()) { + return true; + } else { + if (isAfterSleepWindow()) { + // only the `attemptRequestCount` requests after sleep window should execute + // if all the executing commands succeed, the status will transition to CLOSED + // if some of the executing commands fail, the status will transition to OPEN + open2HalfOpen(); + return attemptCount.incrementAndGet() <= attemptRequestCount; + } else { + return false; + } + } + } + + private boolean isAfterSleepWindow() { + final long circuitOpenTime = circuitOpened.get(); + final long currentTime = System.currentTimeMillis(); + final long sleepWindowTime = (long) sleepWindowInSeconds * 1000; + return currentTime >= circuitOpenTime + sleepWindowTime; + } + + private void close2Open() { + if (status.compareAndSet(Status.CLOSED, Status.OPEN)) { + // This thread wins the race to open the circuit + // it sets the start time for the sleep window + circuitOpened.set(System.currentTimeMillis()); + logger.info("CLOSED => OPEN"); + } + } + + private void halfOpen2Close() { + if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) { + // This thread wins the race to close the circuit + circuitOpened.set(-1L); + logger.info("HALF_OPEN => CLOSED"); + } + } + + private void open2HalfOpen() { + if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) { + // This thread wins the race to half close the circuit + // it resets the attempt count + attemptCount.set(0); + attemptSuccessCount.set(0); + logger.info("OPEN => HALF_OPEN"); + } + } + + private void halfOpen2Open() { + if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) { + // This thread wins the race to re-open the circuit + // it resets the start time for the sleep window + circuitOpened.set(System.currentTimeMillis()); + logger.info("HALF_OPEN => OPEN"); + } + } +} diff --git a/src/main/java/org/tikv/common/failsafe/CircuitBreakerMetrics.java b/src/main/java/org/tikv/common/failsafe/CircuitBreakerMetrics.java new file mode 100644 index 00000000000..3a285b4a7df --- /dev/null +++ b/src/main/java/org/tikv/common/failsafe/CircuitBreakerMetrics.java @@ -0,0 +1,27 @@ +/* + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.tikv.common.failsafe; + +public interface CircuitBreakerMetrics { + /** Record a successful call. */ + void success(); + + /** Record a failure call. */ + void failure(); + + /** Add metrics listener. */ + void addListener(MetricsListener metricsListener); +} diff --git a/src/main/java/org/tikv/common/failsafe/CircuitBreakerMetricsImpl.java b/src/main/java/org/tikv/common/failsafe/CircuitBreakerMetricsImpl.java new file mode 100644 index 00000000000..20f7e42d1af --- /dev/null +++ b/src/main/java/org/tikv/common/failsafe/CircuitBreakerMetricsImpl.java @@ -0,0 +1,91 @@ +/* + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.tikv.common.failsafe; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CircuitBreakerMetricsImpl implements CircuitBreakerMetrics { + private static final Logger logger = LoggerFactory.getLogger(CircuitBreakerMetricsImpl.class); + + private final int windowInMS; + private final List listeners; + private final AtomicReference currentMetrics; + + public CircuitBreakerMetricsImpl(int windowInSeconds) { + this.windowInMS = windowInSeconds * 1000; + listeners = new ArrayList<>(); + this.currentMetrics = new AtomicReference<>(new SingleWindowMetrics()); + } + + @Override + public void success() { + currentMetrics.get().success(); + checkTimeout(); + } + + @Override + public void failure() { + currentMetrics.get().failure(); + checkTimeout(); + } + + private void checkTimeout() { + SingleWindowMetrics singleWindowMetrics = currentMetrics.get(); + if (System.currentTimeMillis() >= singleWindowMetrics.getStartMS() + windowInMS) { + if (currentMetrics.compareAndSet(singleWindowMetrics, new SingleWindowMetrics())) { + logger.info("window timeout, reset SingleWindowMetrics"); + HealthCounts healthCounts = singleWindowMetrics.getHealthCounts(); + for (MetricsListener metricsListener : listeners) { + metricsListener.onNext(healthCounts); + } + } + } + } + + @Override + public void addListener(MetricsListener metricsListener) { + listeners.add(metricsListener); + } + + /** Instead of using SingleWindowMetrics, it is better to use RollingWindowMetrics. */ + static class SingleWindowMetrics { + private final long startMS = System.currentTimeMillis(); + private final AtomicLong totalCount = new AtomicLong(0); + private final AtomicLong errorCount = new AtomicLong(0); + + public void success() { + totalCount.incrementAndGet(); + } + + public void failure() { + totalCount.incrementAndGet(); + errorCount.incrementAndGet(); + } + + public HealthCounts getHealthCounts() { + return new HealthCounts(totalCount.get(), errorCount.get()); + } + + public long getStartMS() { + return startMS; + } + } +} diff --git a/src/main/java/org/tikv/common/failsafe/HealthCounts.java b/src/main/java/org/tikv/common/failsafe/HealthCounts.java new file mode 100644 index 00000000000..2a756902daf --- /dev/null +++ b/src/main/java/org/tikv/common/failsafe/HealthCounts.java @@ -0,0 +1,56 @@ +/* + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.tikv.common.failsafe; + +public class HealthCounts { + private final long totalCount; + private final long errorCount; + private final int errorPercentage; + + HealthCounts(long total, long error) { + this.totalCount = total; + this.errorCount = error; + if (totalCount > 0) { + this.errorPercentage = (int) ((double) errorCount / totalCount * 100); + } else { + this.errorPercentage = 0; + } + } + + public long getTotalRequests() { + return totalCount; + } + + public long getErrorCount() { + return errorCount; + } + + public int getErrorPercentage() { + return errorPercentage; + } + + @Override + public String toString() { + return "HealthCounts{" + + "totalCount=" + + totalCount + + ", errorCount=" + + errorCount + + ", errorPercentage=" + + errorPercentage + + '}'; + } +} diff --git a/src/main/java/org/tikv/common/failsafe/MetricsListener.java b/src/main/java/org/tikv/common/failsafe/MetricsListener.java new file mode 100644 index 00000000000..51eda2de6a7 --- /dev/null +++ b/src/main/java/org/tikv/common/failsafe/MetricsListener.java @@ -0,0 +1,20 @@ +/* + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.tikv.common.failsafe; + +public interface MetricsListener { + void onNext(HealthCounts healthCounts); +} diff --git a/src/main/java/org/tikv/raw/BaseRawKVClient.java b/src/main/java/org/tikv/raw/BaseRawKVClient.java new file mode 100644 index 00000000000..2c963f11e71 --- /dev/null +++ b/src/main/java/org/tikv/raw/BaseRawKVClient.java @@ -0,0 +1,272 @@ +/* + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.tikv.raw; + +import com.google.protobuf.ByteString; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.tikv.common.util.Pair; +import org.tikv.common.util.ScanOption; +import org.tikv.kvproto.Kvrpcpb; + +public interface BaseRawKVClient extends AutoCloseable { + // https://www.github.com/pingcap/tidb/blob/master/store/tikv/rawkv.go + int MAX_RAW_SCAN_LIMIT = 10240; + int MAX_RAW_BATCH_LIMIT = 1024; + int RAW_BATCH_PUT_SIZE = 1024 * 1024; + int RAW_BATCH_GET_SIZE = 16 * 1024; + int RAW_BATCH_DELETE_SIZE = 16 * 1024; + + /** + * Put a raw key-value pair to TiKV + * + * @param key raw key + * @param value raw value + */ + void put(ByteString key, ByteString value); + + /** + * Put a raw key-value pair to TiKV + * + * @param key raw key + * @param value raw value + * @param ttl the ttl of the key (in seconds), 0 means the key will never be outdated + */ + void put(ByteString key, ByteString value, long ttl); + + /** + * Put a key-value pair if it does not exist. This API is atomic. + * + *

To use this API, please enable `tikv.enable_atomic_for_cas`. + * + * @param key key + * @param value value + * @return a ByteString. returns Optional.EMPTY if the value is written successfully. returns the + * previous key if the value already exists, and does not write to TiKV. + */ + Optional putIfAbsent(ByteString key, ByteString value); + + /** + * Put a key-value pair with TTL if it does not exist. This API is atomic. + * + *

To use this API, please enable `tikv.enable_atomic_for_cas`. + * + * @param key key + * @param value value + * @param ttl TTL of key (in seconds), 0 means the key will never be outdated. + * @return a ByteString. returns Optional.EMPTY if the value is written successfully. returns the + * previous key if the value already exists, and does not write to TiKV. + */ + Optional putIfAbsent(ByteString key, ByteString value, long ttl); + + /** + * Put a key-value pair if the prevValue matched the value in TiKV. This API is atomic. + * + *

To use this API, please enable `tikv.enable_atomic_for_cas`. + * + * @param key key + * @param value value + */ + void compareAndSet(ByteString key, Optional prevValue, ByteString value); + + /** + * pair if the prevValue matched the value in TiKV. This API is atomic. + * + *

To use this API, please enable `tikv.enable_atomic_for_cas`. + * + * @param key key + * @param value value + * @param ttl TTL of key (in seconds), 0 means the key will never be outdated. + */ + void compareAndSet(ByteString key, Optional prevValue, ByteString value, long ttl); + + /** + * Put a set of raw key-value pair to TiKV. + * + * @param kvPairs kvPairs + */ + void batchPut(Map kvPairs); + + /** + * Put a set of raw key-value pair to TiKV. + * + * @param kvPairs kvPairs + * @param ttl the TTL of keys to be put (in seconds), 0 means the keys will never be outdated + */ + void batchPut(Map kvPairs, long ttl); + + /** + * Get a raw key-value pair from TiKV if key exists + * + * @param key raw key + * @return a ByteString value if key exists, ByteString.EMPTY if key does not exist + */ + Optional get(ByteString key); + + /** + * Get a list of raw key-value pair from TiKV if key exists + * + * @param keys list of raw key + * @return a ByteString value if key exists, ByteString.EMPTY if key does not exist + */ + List batchGet(List keys); + + /** + * Delete a list of raw key-value pair from TiKV if key exists + * + * @param keys list of raw key + */ + void batchDelete(List keys); + + /** + * Get the TTL of a raw key from TiKV if key exists + * + * @param key raw key + * @return a Long indicating the TTL of key ttl is a non-null long value indicating TTL if key + * exists. - ttl=0 if the key will never be outdated. - ttl=null if the key does not exist + */ + Optional getKeyTTL(ByteString key); + + /** + * Create a new `batch scan` request with `keyOnly` option Once resolved this request will result + * in a set of scanners over the given keys. + * + *

WARNING: This method is experimental. The `each_limit` parameter does not work as expected. + * It does not limit the number of results returned of each range, instead it limits the number of + * results in each region of each range. As a result, you may get more than each_limit key-value + * pairs for each range. But you should not miss any entries. + * + * @param ranges a list of ranges + * @return a set of scanners for keys over the given keys. + */ + List> batchScanKeys(List> ranges, int eachLimit); + + /** + * Create a new `batch scan` request. Once resolved this request will result in a set of scanners + * over the given keys. + * + *

WARNING: This method is experimental. The `each_limit` parameter does not work as expected. + * It does not limit the number of results returned of each range, instead it limits the number of + * results in each region of each range. As a result, you may get more than each_limit key-value + * pairs for each range. But you should not miss any entries. + * + * @param ranges a list of `ScanOption` for each range + * @return a set of scanners over the given keys. + */ + List> batchScan(List ranges); + + /** + * Scan raw key-value pairs from TiKV in range [startKey, endKey) + * + * @param startKey raw start key, inclusive + * @param endKey raw end key, exclusive + * @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT} + * @return list of key-value pairs in range + */ + List scan(ByteString startKey, ByteString endKey, int limit); + + /** + * Scan raw key-value pairs from TiKV in range [startKey, endKey) + * + * @param startKey raw start key, inclusive + * @param endKey raw end key, exclusive + * @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT} + * @param keyOnly whether to scan in key-only mode + * @return list of key-value pairs in range + */ + List scan(ByteString startKey, ByteString endKey, int limit, boolean keyOnly); + + /** + * Scan raw key-value pairs from TiKV in range [startKey, ♾) + * + * @param startKey raw start key, inclusive + * @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT} + * @return list of key-value pairs in range + */ + List scan(ByteString startKey, int limit); + + /** + * Scan raw key-value pairs from TiKV in range [startKey, ♾) + * + * @param startKey raw start key, inclusive + * @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT} + * @param keyOnly whether to scan in key-only mode + * @return list of key-value pairs in range + */ + List scan(ByteString startKey, int limit, boolean keyOnly); + + /** + * Scan all raw key-value pairs from TiKV in range [startKey, endKey) + * + * @param startKey raw start key, inclusive + * @param endKey raw end key, exclusive + * @return list of key-value pairs in range + */ + List scan(ByteString startKey, ByteString endKey); + + /** + * Scan all raw key-value pairs from TiKV in range [startKey, endKey) + * + * @param startKey raw start key, inclusive + * @param endKey raw end key, exclusive + * @param keyOnly whether to scan in key-only mode + * @return list of key-value pairs in range + */ + List scan(ByteString startKey, ByteString endKey, boolean keyOnly); + + /** + * Scan keys with prefix + * + * @param prefixKey prefix key + * @param limit limit of keys retrieved + * @param keyOnly whether to scan in keyOnly mode + * @return kvPairs with the specified prefix + */ + List scanPrefix(ByteString prefixKey, int limit, boolean keyOnly); + + List scanPrefix(ByteString prefixKey); + + List scanPrefix(ByteString prefixKey, boolean keyOnly); + + /** + * Delete a raw key-value pair from TiKV if key exists + * + * @param key raw key to be deleted + */ + void delete(ByteString key); + + /** + * Delete all raw key-value pairs in range [startKey, endKey) from TiKV + * + *

Cautious, this API cannot be used concurrently, if multiple clients write keys into this + * range along with deleteRange API, the result will be undefined. + * + * @param startKey raw start key to be deleted + * @param endKey raw start key to be deleted + */ + void deleteRange(ByteString startKey, ByteString endKey); + + /** + * Delete all raw key-value pairs with the prefix `key` from TiKV + * + *

Cautious, this API cannot be used concurrently, if multiple clients write keys into this + * range along with deleteRange API, the result will be undefined. + * + * @param key prefix of keys to be deleted + */ + void deletePrefix(ByteString key); +} diff --git a/src/main/java/org/tikv/raw/CircuitBreakerRawKVClient.java b/src/main/java/org/tikv/raw/CircuitBreakerRawKVClient.java new file mode 100644 index 00000000000..f7621db0e80 --- /dev/null +++ b/src/main/java/org/tikv/raw/CircuitBreakerRawKVClient.java @@ -0,0 +1,230 @@ +/* + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.tikv.raw; + +import com.google.protobuf.ByteString; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.tikv.common.TiConfiguration; +import org.tikv.common.exception.CircuitBreakerOpenException; +import org.tikv.common.failsafe.CircuitBreaker; +import org.tikv.common.failsafe.CircuitBreakerImpl; +import org.tikv.common.failsafe.CircuitBreakerMetrics; +import org.tikv.common.util.Pair; +import org.tikv.common.util.ScanOption; +import org.tikv.kvproto.Kvrpcpb; + +public class CircuitBreakerRawKVClient implements BaseRawKVClient { + private static final Logger logger = LoggerFactory.getLogger(CircuitBreakerRawKVClient.class); + + private final BaseRawKVClient client; + private final CircuitBreaker circuitBreaker; + private final CircuitBreakerMetrics circuitBreakerMetrics; + + public CircuitBreakerRawKVClient(BaseRawKVClient client, TiConfiguration conf) { + this.client = client; + this.circuitBreaker = new CircuitBreakerImpl(conf); + this.circuitBreakerMetrics = this.circuitBreaker.getMetrics(); + } + + @Override + public void put(ByteString key, ByteString value) { + callWithCircuitBreaker(() -> client.put(key, value)); + } + + @Override + public void put(ByteString key, ByteString value, long ttl) { + callWithCircuitBreaker(() -> client.put(key, value, ttl)); + } + + @Override + public Optional putIfAbsent(ByteString key, ByteString value) { + return callWithCircuitBreaker(() -> client.putIfAbsent(key, value)); + } + + @Override + public Optional putIfAbsent(ByteString key, ByteString value, long ttl) { + return callWithCircuitBreaker(() -> client.putIfAbsent(key, value, ttl)); + } + + @Override + public void compareAndSet(ByteString key, Optional prevValue, ByteString value) { + callWithCircuitBreaker(() -> client.compareAndSet(key, prevValue, value)); + } + + @Override + public void compareAndSet( + ByteString key, Optional prevValue, ByteString value, long ttl) { + callWithCircuitBreaker(() -> client.compareAndSet(key, prevValue, value, ttl)); + } + + @Override + public void batchPut(Map kvPairs) { + callWithCircuitBreaker(() -> client.batchPut(kvPairs)); + } + + @Override + public void batchPut(Map kvPairs, long ttl) { + callWithCircuitBreaker(() -> client.batchPut(kvPairs, ttl)); + } + + @Override + public Optional get(ByteString key) { + return callWithCircuitBreaker(() -> client.get(key)); + } + + @Override + public List batchGet(List keys) { + return callWithCircuitBreaker(() -> client.batchGet(keys)); + } + + @Override + public void batchDelete(List keys) { + callWithCircuitBreaker(() -> client.batchDelete(keys)); + } + + @Override + public Optional getKeyTTL(ByteString key) { + return callWithCircuitBreaker(() -> client.getKeyTTL(key)); + } + + @Override + public List> batchScanKeys( + List> ranges, int eachLimit) { + return callWithCircuitBreaker(() -> client.batchScanKeys(ranges, eachLimit)); + } + + @Override + public List> batchScan(List ranges) { + return callWithCircuitBreaker(() -> client.batchScan(ranges)); + } + + @Override + public List scan(ByteString startKey, ByteString endKey, int limit) { + return callWithCircuitBreaker(() -> client.scan(startKey, endKey, limit)); + } + + @Override + public List scan( + ByteString startKey, ByteString endKey, int limit, boolean keyOnly) { + return callWithCircuitBreaker(() -> client.scan(startKey, endKey, limit, keyOnly)); + } + + @Override + public List scan(ByteString startKey, int limit) { + return callWithCircuitBreaker(() -> client.scan(startKey, limit)); + } + + @Override + public List scan(ByteString startKey, int limit, boolean keyOnly) { + return callWithCircuitBreaker(() -> client.scan(startKey, limit, keyOnly)); + } + + @Override + public List scan(ByteString startKey, ByteString endKey) { + return callWithCircuitBreaker(() -> client.scan(startKey, endKey)); + } + + @Override + public List scan(ByteString startKey, ByteString endKey, boolean keyOnly) { + return callWithCircuitBreaker(() -> client.scan(startKey, endKey, keyOnly)); + } + + @Override + public List scanPrefix(ByteString prefixKey, int limit, boolean keyOnly) { + return callWithCircuitBreaker(() -> client.scanPrefix(prefixKey, limit, keyOnly)); + } + + @Override + public List scanPrefix(ByteString prefixKey) { + return callWithCircuitBreaker(() -> client.scanPrefix(prefixKey)); + } + + @Override + public List scanPrefix(ByteString prefixKey, boolean keyOnly) { + return callWithCircuitBreaker(() -> client.scanPrefix(prefixKey, keyOnly)); + } + + @Override + public void delete(ByteString key) { + callWithCircuitBreaker(() -> client.delete(key)); + } + + @Override + public void deleteRange(ByteString startKey, ByteString endKey) { + callWithCircuitBreaker(() -> client.deleteRange(startKey, endKey)); + } + + @Override + public void deletePrefix(ByteString key) { + callWithCircuitBreaker(() -> client.deletePrefix(key)); + } + + T callWithCircuitBreaker(Function1 func) { + if (circuitBreaker.allowRequest()) { + try { + T result = func.apply(); + circuitBreakerMetrics.success(); + return result; + } catch (Exception e) { + circuitBreakerMetrics.failure(); + throw e; + } + } else if (circuitBreaker.attemptExecution()) { + logger.info("attemptExecution"); + try { + T result = func.apply(); + circuitBreakerMetrics.success(); + circuitBreaker.markAttemptSuccess(); + logger.info("markSuccess"); + return result; + } catch (Exception e) { + circuitBreakerMetrics.failure(); + circuitBreaker.markAttemptFailure(); + logger.info("markNonSuccess"); + throw e; + } + } else { + logger.warn("Circuit Breaker Opened"); + throw new CircuitBreakerOpenException(); + } + } + + void callWithCircuitBreaker(Function0 func) { + callWithCircuitBreaker( + (Function1) + () -> { + func.apply(); + return null; + }); + } + + @Override + public void close() throws Exception { + client.close(); + } + + public interface Function1 { + T apply(); + } + + public interface Function0 { + void apply(); + } +} diff --git a/src/main/java/org/tikv/raw/RawKVClient.java b/src/main/java/org/tikv/raw/RawKVClient.java index 7ffd1ed0514..336a48536c9 100644 --- a/src/main/java/org/tikv/raw/RawKVClient.java +++ b/src/main/java/org/tikv/raw/RawKVClient.java @@ -44,7 +44,7 @@ import org.tikv.common.util.*; import org.tikv.kvproto.Kvrpcpb.KvPair; -public class RawKVClient implements AutoCloseable { +public class RawKVClient implements BaseRawKVClient { private final TiSession tiSession; private final RegionStoreClientBuilder clientBuilder; private final TiConfiguration conf; @@ -56,15 +56,6 @@ public class RawKVClient implements AutoCloseable { private final ExecutorService deleteRangeThreadPool; private static final Logger logger = LoggerFactory.getLogger(RawKVClient.class); - // https://www.github.com/pingcap/tidb/blob/master/store/tikv/rawkv.go - private static final int MAX_RAW_SCAN_LIMIT = 10240; - private static final int MAX_RAW_BATCH_LIMIT = 1024; - private static final int RAW_BATCH_PUT_SIZE = 1024 * 1024; // 1 MB - private static final int RAW_BATCH_GET_SIZE = 16 * 1024; // 16 K - private static final int RAW_BATCH_DELETE_SIZE = 16 * 1024; // 16 K - private static final int RAW_BATCH_SCAN_SIZE = 16; - private static final int RAW_BATCH_PAIR_COUNT = 512; - public static final Histogram RAW_REQUEST_LATENCY = Histogram.build() .name("client_java_raw_requests_latency") @@ -106,23 +97,12 @@ public RawKVClient(TiSession session, RegionStoreClientBuilder clientBuilder) { @Override public void close() {} - /** - * Put a raw key-value pair to TiKV - * - * @param key raw key - * @param value raw value - */ + @Override public void put(ByteString key, ByteString value) { put(key, value, 0); } - /** - * Put a raw key-value pair to TiKV - * - * @param key raw key - * @param value raw value - * @param ttl the ttl of the key (in seconds), 0 means the key will never be outdated - */ + @Override public void put(ByteString key, ByteString value, long ttl) { String label = "client_raw_put"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); @@ -159,31 +139,12 @@ public void put(ByteString key, ByteString value, long ttl) { } } - /** - * Put a key-value pair if it does not exist. This API is atomic. - * - *

To use this API, please enable `tikv.enable_atomic_for_cas`. - * - * @param key key - * @param value value - * @return a ByteString. returns Optional.EMPTY if the value is written successfully. returns the - * previous key if the value already exists, and does not write to TiKV. - */ + @Override public Optional putIfAbsent(ByteString key, ByteString value) { return putIfAbsent(key, value, 0L); } - /** - * Put a key-value pair with TTL if it does not exist. This API is atomic. - * - *

To use this API, please enable `tikv.enable_atomic_for_cas`. - * - * @param key key - * @param value value - * @param ttl TTL of key (in seconds), 0 means the key will never be outdated. - * @return a ByteString. returns Optional.EMPTY if the value is written successfully. returns the - * previous key if the value already exists, and does not write to TiKV. - */ + @Override public Optional putIfAbsent(ByteString key, ByteString value, long ttl) { try { compareAndSet(key, Optional.empty(), value, ttl); @@ -193,28 +154,13 @@ public Optional putIfAbsent(ByteString key, ByteString value, long t } } - /** - * Put a key-value pair if the prevValue matched the value in TiKV. This API is atomic. - * - *

To use this API, please enable `tikv.enable_atomic_for_cas`. - * - * @param key key - * @param value value - */ + @Override public void compareAndSet(ByteString key, Optional prevValue, ByteString value) throws RawCASConflictException { compareAndSet(key, prevValue, value, 0L); } - /** - * pair if the prevValue matched the value in TiKV. This API is atomic. - * - *

To use this API, please enable `tikv.enable_atomic_for_cas`. - * - * @param key key - * @param value value - * @param ttl TTL of key (in seconds), 0 means the key will never be outdated. - */ + @Override public void compareAndSet( ByteString key, Optional prevValue, ByteString value, long ttl) throws RawCASConflictException { @@ -258,21 +204,12 @@ public void compareAndSet( } } - /** - * Put a set of raw key-value pair to TiKV. - * - * @param kvPairs kvPairs - */ + @Override public void batchPut(Map kvPairs) { batchPut(kvPairs, 0); } - /** - * Put a set of raw key-value pair to TiKV. - * - * @param kvPairs kvPairs - * @param ttl the TTL of keys to be put (in seconds), 0 means the keys will never be outdated - */ + @Override public void batchPut(Map kvPairs, long ttl) { String label = "client_raw_batch_put"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); @@ -301,12 +238,7 @@ public void batchPut(Map kvPairs, long ttl) { } } - /** - * Get a raw key-value pair from TiKV if key exists - * - * @param key raw key - * @return a ByteString value if key exists, ByteString.EMPTY if key does not exist - */ + @Override public Optional get(ByteString key) { String label = "client_raw_get"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); @@ -344,12 +276,7 @@ public Optional get(ByteString key) { } } - /** - * Get a list of raw key-value pair from TiKV if key exists - * - * @param keys list of raw key - * @return a ByteString value if key exists, ByteString.EMPTY if key does not exist - */ + @Override public List batchGet(List keys) { String label = "client_raw_batch_get"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); @@ -379,11 +306,7 @@ public List batchGet(List keys) { } } - /** - * Delete a list of raw key-value pair from TiKV if key exists - * - * @param keys list of raw key - */ + @Override public void batchDelete(List keys) { String label = "client_raw_batch_delete"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); @@ -413,13 +336,7 @@ public void batchDelete(List keys) { } } - /** - * Get the TTL of a raw key from TiKV if key exists - * - * @param key raw key - * @return a Long indicating the TTL of key ttl is a non-null long value indicating TTL if key - * exists. - ttl=0 if the key will never be outdated. - ttl=null if the key does not exist - */ + @Override public Optional getKeyTTL(ByteString key) { String label = "client_raw_get_key_ttl"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); @@ -456,18 +373,7 @@ public Optional getKeyTTL(ByteString key) { } } - /** - * Create a new `batch scan` request with `keyOnly` option Once resolved this request will result - * in a set of scanners over the given keys. - * - *

WARNING: This method is experimental. The `each_limit` parameter does not work as expected. - * It does not limit the number of results returned of each range, instead it limits the number of - * results in each region of each range. As a result, you may get more than each_limit key-value - * pairs for each range. But you should not miss any entries. - * - * @param ranges a list of ranges - * @return a set of scanners for keys over the given keys. - */ + @Override public List> batchScanKeys( List> ranges, int eachLimit) { return batchScan( @@ -487,18 +393,7 @@ public List> batchScanKeys( .collect(Collectors.toList()); } - /** - * Create a new `batch scan` request. Once resolved this request will result in a set of scanners - * over the given keys. - * - *

WARNING: This method is experimental. The `each_limit` parameter does not work as expected. - * It does not limit the number of results returned of each range, instead it limits the number of - * results in each region of each range. As a result, you may get more than each_limit key-value - * pairs for each range. But you should not miss any entries. - * - * @param ranges a list of `ScanOption` for each range - * @return a set of scanners over the given keys. - */ + @Override public List> batchScan(List ranges) { String label = "client_raw_batch_scan"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); @@ -549,27 +444,12 @@ public List> batchScan(List ranges) { } } - /** - * Scan raw key-value pairs from TiKV in range [startKey, endKey) - * - * @param startKey raw start key, inclusive - * @param endKey raw end key, exclusive - * @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT} - * @return list of key-value pairs in range - */ + @Override public List scan(ByteString startKey, ByteString endKey, int limit) { return scan(startKey, endKey, limit, false); } - /** - * Scan raw key-value pairs from TiKV in range [startKey, endKey) - * - * @param startKey raw start key, inclusive - * @param endKey raw end key, exclusive - * @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT} - * @param keyOnly whether to scan in key-only mode - * @return list of key-value pairs in range - */ + @Override public List scan(ByteString startKey, ByteString endKey, int limit, boolean keyOnly) { String label = "client_raw_scan"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); @@ -604,48 +484,22 @@ public List scan(ByteString startKey, ByteString endKey, int limit, bool } } - /** - * Scan raw key-value pairs from TiKV in range [startKey, ♾) - * - * @param startKey raw start key, inclusive - * @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT} - * @return list of key-value pairs in range - */ + @Override public List scan(ByteString startKey, int limit) { return scan(startKey, limit, false); } - /** - * Scan raw key-value pairs from TiKV in range [startKey, ♾) - * - * @param startKey raw start key, inclusive - * @param limit limit of key-value pairs scanned, should be less than {@link #MAX_RAW_SCAN_LIMIT} - * @param keyOnly whether to scan in key-only mode - * @return list of key-value pairs in range - */ + @Override public List scan(ByteString startKey, int limit, boolean keyOnly) { return scan(startKey, ByteString.EMPTY, limit, keyOnly); } - /** - * Scan all raw key-value pairs from TiKV in range [startKey, endKey) - * - * @param startKey raw start key, inclusive - * @param endKey raw end key, exclusive - * @return list of key-value pairs in range - */ + @Override public List scan(ByteString startKey, ByteString endKey) { return scan(startKey, endKey, false); } - /** - * Scan all raw key-value pairs from TiKV in range [startKey, endKey) - * - * @param startKey raw start key, inclusive - * @param endKey raw end key, exclusive - * @param keyOnly whether to scan in key-only mode - * @return list of key-value pairs in range - */ + @Override public List scan(ByteString startKey, ByteString endKey, boolean keyOnly) { String label = "client_raw_scan_without_limit"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); @@ -701,31 +555,22 @@ private List scan(ScanOption scanOption) { return scan(startKey, endKey, limit, keyOnly); } - /** - * Scan keys with prefix - * - * @param prefixKey prefix key - * @param limit limit of keys retrieved - * @param keyOnly whether to scan in keyOnly mode - * @return kvPairs with the specified prefix - */ + @Override public List scanPrefix(ByteString prefixKey, int limit, boolean keyOnly) { return scan(prefixKey, Key.toRawKey(prefixKey).nextPrefix().toByteString(), limit, keyOnly); } + @Override public List scanPrefix(ByteString prefixKey) { return scan(prefixKey, Key.toRawKey(prefixKey).nextPrefix().toByteString()); } + @Override public List scanPrefix(ByteString prefixKey, boolean keyOnly) { return scan(prefixKey, Key.toRawKey(prefixKey).nextPrefix().toByteString(), keyOnly); } - /** - * Delete a raw key-value pair from TiKV if key exists - * - * @param key raw key to be deleted - */ + @Override public void delete(ByteString key) { String label = "client_raw_delete"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); @@ -762,15 +607,7 @@ public void delete(ByteString key) { } } - /** - * Delete all raw key-value pairs in range [startKey, endKey) from TiKV - * - *

Cautious, this API cannot be used concurrently, if multiple clients write keys into this - * range along with deleteRange API, the result will be undefined. - * - * @param startKey raw start key to be deleted - * @param endKey raw start key to be deleted - */ + @Override public synchronized void deleteRange(ByteString startKey, ByteString endKey) { String label = "client_raw_delete_range"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); @@ -789,14 +626,7 @@ public synchronized void deleteRange(ByteString startKey, ByteString endKey) { } } - /** - * Delete all raw key-value pairs with the prefix `key` from TiKV - * - *

Cautious, this API cannot be used concurrently, if multiple clients write keys into this - * range along with deleteRange API, the result will be undefined. - * - * @param key prefix of keys to be deleted - */ + @Override public synchronized void deletePrefix(ByteString key) { ByteString endKey = Key.toRawKey(key).nextPrefix().toByteString(); deleteRange(key, endKey); diff --git a/src/test/java/org/tikv/common/failsafe/CircuitBreakerMetricsTest.java b/src/test/java/org/tikv/common/failsafe/CircuitBreakerMetricsTest.java new file mode 100644 index 00000000000..010f847e210 --- /dev/null +++ b/src/test/java/org/tikv/common/failsafe/CircuitBreakerMetricsTest.java @@ -0,0 +1,68 @@ +package org.tikv.common.failsafe; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.util.concurrent.atomic.AtomicReference; +import org.junit.Test; + +public class CircuitBreakerMetricsTest { + private static final int TEST_COUNT = 10; + private static final int WINDOW_IN_SECONDS = 1; + + @Test + public void testAllSuccess() throws InterruptedException { + CircuitBreakerMetricsImpl metrics = new CircuitBreakerMetricsImpl(WINDOW_IN_SECONDS); + + AtomicReference healthCounts = new AtomicReference<>(); + MetricsListener metricsListener = healthCounts::set; + metrics.addListener(metricsListener); + + for (int i = 1; i < TEST_COUNT; i++) { + metrics.success(); + } + Thread.sleep(WINDOW_IN_SECONDS * 1000); + metrics.success(); + assertNotNull(healthCounts.get()); + assertEquals(healthCounts.get().getTotalRequests(), TEST_COUNT); + assertEquals(healthCounts.get().getErrorPercentage(), 0); + } + + @Test + public void testAllFailure() throws InterruptedException { + CircuitBreakerMetricsImpl metrics = new CircuitBreakerMetricsImpl(WINDOW_IN_SECONDS); + + AtomicReference healthCounts = new AtomicReference<>(); + MetricsListener metricsListener = healthCounts::set; + metrics.addListener(metricsListener); + + for (int i = 1; i < TEST_COUNT; i++) { + metrics.failure(); + } + Thread.sleep(WINDOW_IN_SECONDS * 1000); + metrics.failure(); + assertNotNull(healthCounts.get()); + assertEquals(healthCounts.get().getTotalRequests(), TEST_COUNT); + assertEquals(healthCounts.get().getErrorPercentage(), 100); + } + + @Test + public void testHalfFailure() throws InterruptedException { + CircuitBreakerMetricsImpl metrics = new CircuitBreakerMetricsImpl(WINDOW_IN_SECONDS); + + AtomicReference healthCounts = new AtomicReference<>(); + MetricsListener metricsListener = healthCounts::set; + metrics.addListener(metricsListener); + + for (int i = 1; i < TEST_COUNT; i++) { + metrics.failure(); + metrics.success(); + } + metrics.failure(); + Thread.sleep(WINDOW_IN_SECONDS * 1000); + metrics.success(); + assertNotNull(healthCounts.get()); + assertEquals(healthCounts.get().getTotalRequests(), TEST_COUNT * 2); + assertEquals(healthCounts.get().getErrorPercentage(), 50); + } +} diff --git a/src/test/java/org/tikv/common/failsafe/CircuitBreakerTest.java b/src/test/java/org/tikv/common/failsafe/CircuitBreakerTest.java new file mode 100644 index 00000000000..b2f09ead687 --- /dev/null +++ b/src/test/java/org/tikv/common/failsafe/CircuitBreakerTest.java @@ -0,0 +1,68 @@ +package org.tikv.common.failsafe; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; + +public class CircuitBreakerTest { + + @Test + public void testCircuitBreaker() throws InterruptedException { + boolean enable = true; + int windowInSeconds = 2; + int errorThresholdPercentage = 100; + int requestVolumeThreshold = 10; + int sleepWindowInSeconds = 1; + int attemptRequestCount = 10; + + CircuitBreakerImpl circuitBreaker = + new CircuitBreakerImpl( + enable, + windowInSeconds, + errorThresholdPercentage, + requestVolumeThreshold, + sleepWindowInSeconds, + attemptRequestCount); + CircuitBreakerMetrics metrics = circuitBreaker.getMetrics(); + + // initial state: CLOSE + assertTrue(!circuitBreaker.isOpen()); + assertEquals(circuitBreaker.getStatus(), CircuitBreaker.Status.CLOSED); + + // CLOSE => OPEN + for (int i = 1; i <= requestVolumeThreshold; i++) { + metrics.failure(); + } + Thread.sleep(windowInSeconds * 1000); + metrics.failure(); + assertTrue(circuitBreaker.isOpen()); + assertEquals(circuitBreaker.getStatus(), CircuitBreaker.Status.OPEN); + + // OPEN => HALF_OPEN + Thread.sleep(sleepWindowInSeconds * 1000); + assertTrue(circuitBreaker.attemptExecution()); + assertTrue(circuitBreaker.isOpen()); + assertEquals(circuitBreaker.getStatus(), CircuitBreaker.Status.HALF_OPEN); + + // HALF_OPEN => OPEN + circuitBreaker.markAttemptFailure(); + assertTrue(circuitBreaker.isOpen()); + assertEquals(circuitBreaker.getStatus(), CircuitBreaker.Status.OPEN); + + // OPEN => HALF_OPEN + Thread.sleep(sleepWindowInSeconds * 1000); + assertTrue(circuitBreaker.attemptExecution()); + circuitBreaker.markAttemptSuccess(); + assertTrue(circuitBreaker.isOpen()); + assertEquals(circuitBreaker.getStatus(), CircuitBreaker.Status.HALF_OPEN); + + // HALF_OPEN => CLOSED + for (int i = 1; i < attemptRequestCount; i++) { + assertTrue(circuitBreaker.attemptExecution()); + circuitBreaker.markAttemptSuccess(); + } + assertTrue(!circuitBreaker.isOpen()); + assertEquals(circuitBreaker.getStatus(), CircuitBreaker.Status.CLOSED); + } +} diff --git a/src/test/java/org/tikv/raw/CircuitBreakerRawKVClientTest.java b/src/test/java/org/tikv/raw/CircuitBreakerRawKVClientTest.java new file mode 100644 index 00000000000..3693031f17f --- /dev/null +++ b/src/test/java/org/tikv/raw/CircuitBreakerRawKVClientTest.java @@ -0,0 +1,85 @@ +package org.tikv.raw; + +import static org.junit.Assert.assertTrue; + +import com.google.protobuf.ByteString; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.tikv.BaseRawKVTest; +import org.tikv.common.TiConfiguration; +import org.tikv.common.TiSession; +import org.tikv.common.exception.CircuitBreakerOpenException; + +public class CircuitBreakerRawKVClientTest extends BaseRawKVTest { + private boolean enable = true; + private int windowInSeconds = 2; + private int errorThresholdPercentage = 100; + private int requestVolumeThreshold = 10; + private int sleepWindowInSeconds = 1; + private int attemptRequestCount = 10; + + private TiSession session; + private CircuitBreakerRawKVClient client; + + @Before + public void setup() { + TiConfiguration conf = createTiConfiguration(); + conf.setCircuitBreakEnable(enable); + conf.setCircuitBreakAvailabilityWindowInSeconds(windowInSeconds); + conf.setCircuitBreakAvailabilityErrorThresholdPercentage(errorThresholdPercentage); + conf.setCircuitBreakAvailabilityRequestVolumnThreshold(requestVolumeThreshold); + conf.setCircuitBreakSleepWindowInSeconds(sleepWindowInSeconds); + conf.setCircuitBreakAttemptRequestCount(attemptRequestCount); + session = TiSession.create(conf); + client = session.createCircuitBreakerRawClient(); + } + + @After + public void tearDown() throws Exception { + if (session != null) { + session.close(); + } + } + + @Test + public void testCircuitBreaker() throws InterruptedException { + // CLOSED => OPEN + { + for (int i = 1; i < requestVolumeThreshold; i++) { + error(); + } + Thread.sleep(windowInSeconds * 1000); + error(); + + Exception error = null; + try { + client.get(ByteString.copyFromUtf8("key")); + assertTrue(false); + } catch (Exception e) { + error = e; + } + assertTrue(error instanceof CircuitBreakerOpenException); + } + + // OPEN => CLOSED + { + Thread.sleep(sleepWindowInSeconds * 1000); + for (int i = 1; i <= attemptRequestCount; i++) { + success(); + } + client.get(ByteString.copyFromUtf8("key")); + } + } + + private void success() { + client.get(ByteString.copyFromUtf8("key")); + } + + private void error() { + try { + client.callWithCircuitBreaker(() -> 1 / 0); + } catch (Exception ignored) { + } + } +} From b16a67d90f9209a6c950b4e012078d0946e9b4bc Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Wed, 8 Dec 2021 10:32:45 +0800 Subject: [PATCH 02/17] use Timer to check metrics timeout Signed-off-by: marsishandsome --- .../tikv/common/failsafe/CircuitBreaker.java | 4 ++- .../common/failsafe/CircuitBreakerImpl.java | 6 ++++ .../failsafe/CircuitBreakerMetrics.java | 4 ++- .../failsafe/CircuitBreakerMetricsImpl.java | 29 +++++++++++++++++-- .../tikv/raw/CircuitBreakerRawKVClient.java | 17 ++++++++++- .../failsafe/CircuitBreakerMetricsTest.java | 27 ++++++++--------- .../common/failsafe/CircuitBreakerTest.java | 7 +++-- .../raw/CircuitBreakerRawKVClientTest.java | 7 +++-- 8 files changed, 76 insertions(+), 25 deletions(-) diff --git a/src/main/java/org/tikv/common/failsafe/CircuitBreaker.java b/src/main/java/org/tikv/common/failsafe/CircuitBreaker.java index cab27aecebb..054b00bb4a0 100644 --- a/src/main/java/org/tikv/common/failsafe/CircuitBreaker.java +++ b/src/main/java/org/tikv/common/failsafe/CircuitBreaker.java @@ -15,7 +15,9 @@ package org.tikv.common.failsafe; -public interface CircuitBreaker { +import java.io.Closeable; + +public interface CircuitBreaker extends Closeable { enum Status { CLOSED, diff --git a/src/main/java/org/tikv/common/failsafe/CircuitBreakerImpl.java b/src/main/java/org/tikv/common/failsafe/CircuitBreakerImpl.java index e6f3cdf0d95..5d48746b98a 100644 --- a/src/main/java/org/tikv/common/failsafe/CircuitBreakerImpl.java +++ b/src/main/java/org/tikv/common/failsafe/CircuitBreakerImpl.java @@ -15,6 +15,7 @@ package org.tikv.common.failsafe; +import java.io.IOException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import org.slf4j.Logger; @@ -184,4 +185,9 @@ private void halfOpen2Open() { logger.info("HALF_OPEN => OPEN"); } } + + @Override + public void close() throws IOException { + metrics.close(); + } } diff --git a/src/main/java/org/tikv/common/failsafe/CircuitBreakerMetrics.java b/src/main/java/org/tikv/common/failsafe/CircuitBreakerMetrics.java index 3a285b4a7df..c8c0ff9da94 100644 --- a/src/main/java/org/tikv/common/failsafe/CircuitBreakerMetrics.java +++ b/src/main/java/org/tikv/common/failsafe/CircuitBreakerMetrics.java @@ -15,7 +15,9 @@ package org.tikv.common.failsafe; -public interface CircuitBreakerMetrics { +import java.io.Closeable; + +public interface CircuitBreakerMetrics extends Closeable { /** Record a successful call. */ void success(); diff --git a/src/main/java/org/tikv/common/failsafe/CircuitBreakerMetricsImpl.java b/src/main/java/org/tikv/common/failsafe/CircuitBreakerMetricsImpl.java index 20f7e42d1af..5d9cb071919 100644 --- a/src/main/java/org/tikv/common/failsafe/CircuitBreakerMetricsImpl.java +++ b/src/main/java/org/tikv/common/failsafe/CircuitBreakerMetricsImpl.java @@ -15,10 +15,15 @@ package org.tikv.common.failsafe; +import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,22 +34,35 @@ public class CircuitBreakerMetricsImpl implements CircuitBreakerMetrics { private final List listeners; private final AtomicReference currentMetrics; + private final ScheduledExecutorService scheduler; + private static final int SCHEDULER_INITIAL_DELAY = 1000; + private static final int SCHEDULER_PERIOD = 1000; + public CircuitBreakerMetricsImpl(int windowInSeconds) { this.windowInMS = windowInSeconds * 1000; - listeners = new ArrayList<>(); + this.listeners = new ArrayList<>(); this.currentMetrics = new AtomicReference<>(new SingleWindowMetrics()); + + scheduler = + new ScheduledThreadPoolExecutor( + 1, + new BasicThreadFactory.Builder() + .namingPattern("circuit-breaker-metrics-%d") + .daemon(true) + .build()); + + scheduler.scheduleAtFixedRate( + this::checkTimeout, SCHEDULER_INITIAL_DELAY, SCHEDULER_PERIOD, TimeUnit.MILLISECONDS); } @Override public void success() { currentMetrics.get().success(); - checkTimeout(); } @Override public void failure() { currentMetrics.get().failure(); - checkTimeout(); } private void checkTimeout() { @@ -65,6 +83,11 @@ public void addListener(MetricsListener metricsListener) { listeners.add(metricsListener); } + @Override + public void close() throws IOException { + scheduler.shutdown(); + } + /** Instead of using SingleWindowMetrics, it is better to use RollingWindowMetrics. */ static class SingleWindowMetrics { private final long startMS = System.currentTimeMillis(); diff --git a/src/main/java/org/tikv/raw/CircuitBreakerRawKVClient.java b/src/main/java/org/tikv/raw/CircuitBreakerRawKVClient.java index f7621db0e80..4735ad28e87 100644 --- a/src/main/java/org/tikv/raw/CircuitBreakerRawKVClient.java +++ b/src/main/java/org/tikv/raw/CircuitBreakerRawKVClient.java @@ -217,7 +217,22 @@ void callWithCircuitBreaker(Function0 func) { @Override public void close() throws Exception { - client.close(); + Exception err = null; + try { + client.close(); + } catch (Exception e) { + err = e; + } + + try { + circuitBreaker.close(); + } catch (Exception e) { + err = e; + } + + if (err != null) { + throw err; + } } public interface Function1 { diff --git a/src/test/java/org/tikv/common/failsafe/CircuitBreakerMetricsTest.java b/src/test/java/org/tikv/common/failsafe/CircuitBreakerMetricsTest.java index 010f847e210..323bf32c706 100644 --- a/src/test/java/org/tikv/common/failsafe/CircuitBreakerMetricsTest.java +++ b/src/test/java/org/tikv/common/failsafe/CircuitBreakerMetricsTest.java @@ -3,66 +3,67 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import java.io.IOException; import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; public class CircuitBreakerMetricsTest { private static final int TEST_COUNT = 10; private static final int WINDOW_IN_SECONDS = 1; + private static final int SLEEP_DELTA = 100; @Test - public void testAllSuccess() throws InterruptedException { + public void testAllSuccess() throws InterruptedException, IOException { CircuitBreakerMetricsImpl metrics = new CircuitBreakerMetricsImpl(WINDOW_IN_SECONDS); AtomicReference healthCounts = new AtomicReference<>(); MetricsListener metricsListener = healthCounts::set; metrics.addListener(metricsListener); - for (int i = 1; i < TEST_COUNT; i++) { + for (int i = 1; i <= TEST_COUNT; i++) { metrics.success(); } - Thread.sleep(WINDOW_IN_SECONDS * 1000); - metrics.success(); + Thread.sleep(WINDOW_IN_SECONDS * 1000 + SLEEP_DELTA); assertNotNull(healthCounts.get()); assertEquals(healthCounts.get().getTotalRequests(), TEST_COUNT); assertEquals(healthCounts.get().getErrorPercentage(), 0); + metrics.close(); } @Test - public void testAllFailure() throws InterruptedException { + public void testAllFailure() throws InterruptedException, IOException { CircuitBreakerMetricsImpl metrics = new CircuitBreakerMetricsImpl(WINDOW_IN_SECONDS); AtomicReference healthCounts = new AtomicReference<>(); MetricsListener metricsListener = healthCounts::set; metrics.addListener(metricsListener); - for (int i = 1; i < TEST_COUNT; i++) { + for (int i = 1; i <= TEST_COUNT; i++) { metrics.failure(); } - Thread.sleep(WINDOW_IN_SECONDS * 1000); - metrics.failure(); + Thread.sleep(WINDOW_IN_SECONDS * 1000 + SLEEP_DELTA); assertNotNull(healthCounts.get()); assertEquals(healthCounts.get().getTotalRequests(), TEST_COUNT); assertEquals(healthCounts.get().getErrorPercentage(), 100); + metrics.close(); } @Test - public void testHalfFailure() throws InterruptedException { + public void testHalfFailure() throws InterruptedException, IOException { CircuitBreakerMetricsImpl metrics = new CircuitBreakerMetricsImpl(WINDOW_IN_SECONDS); AtomicReference healthCounts = new AtomicReference<>(); MetricsListener metricsListener = healthCounts::set; metrics.addListener(metricsListener); - for (int i = 1; i < TEST_COUNT; i++) { + for (int i = 1; i <= TEST_COUNT; i++) { metrics.failure(); metrics.success(); } - metrics.failure(); - Thread.sleep(WINDOW_IN_SECONDS * 1000); - metrics.success(); + Thread.sleep(WINDOW_IN_SECONDS * 1000 + SLEEP_DELTA); assertNotNull(healthCounts.get()); assertEquals(healthCounts.get().getTotalRequests(), TEST_COUNT * 2); assertEquals(healthCounts.get().getErrorPercentage(), 50); + metrics.close(); } } diff --git a/src/test/java/org/tikv/common/failsafe/CircuitBreakerTest.java b/src/test/java/org/tikv/common/failsafe/CircuitBreakerTest.java index b2f09ead687..1bb21c3a638 100644 --- a/src/test/java/org/tikv/common/failsafe/CircuitBreakerTest.java +++ b/src/test/java/org/tikv/common/failsafe/CircuitBreakerTest.java @@ -16,6 +16,8 @@ public void testCircuitBreaker() throws InterruptedException { int sleepWindowInSeconds = 1; int attemptRequestCount = 10; + int sleepDelta = 100; + CircuitBreakerImpl circuitBreaker = new CircuitBreakerImpl( enable, @@ -34,8 +36,7 @@ public void testCircuitBreaker() throws InterruptedException { for (int i = 1; i <= requestVolumeThreshold; i++) { metrics.failure(); } - Thread.sleep(windowInSeconds * 1000); - metrics.failure(); + Thread.sleep(windowInSeconds * 1000 + sleepDelta); assertTrue(circuitBreaker.isOpen()); assertEquals(circuitBreaker.getStatus(), CircuitBreaker.Status.OPEN); @@ -51,7 +52,7 @@ public void testCircuitBreaker() throws InterruptedException { assertEquals(circuitBreaker.getStatus(), CircuitBreaker.Status.OPEN); // OPEN => HALF_OPEN - Thread.sleep(sleepWindowInSeconds * 1000); + Thread.sleep(sleepWindowInSeconds * 1000 + sleepDelta); assertTrue(circuitBreaker.attemptExecution()); circuitBreaker.markAttemptSuccess(); assertTrue(circuitBreaker.isOpen()); diff --git a/src/test/java/org/tikv/raw/CircuitBreakerRawKVClientTest.java b/src/test/java/org/tikv/raw/CircuitBreakerRawKVClientTest.java index 3693031f17f..1c6d0b647b7 100644 --- a/src/test/java/org/tikv/raw/CircuitBreakerRawKVClientTest.java +++ b/src/test/java/org/tikv/raw/CircuitBreakerRawKVClientTest.java @@ -19,6 +19,8 @@ public class CircuitBreakerRawKVClientTest extends BaseRawKVTest { private int sleepWindowInSeconds = 1; private int attemptRequestCount = 10; + private int sleepDelta = 100; + private TiSession session; private CircuitBreakerRawKVClient client; @@ -46,11 +48,10 @@ public void tearDown() throws Exception { public void testCircuitBreaker() throws InterruptedException { // CLOSED => OPEN { - for (int i = 1; i < requestVolumeThreshold; i++) { + for (int i = 1; i <= requestVolumeThreshold; i++) { error(); } - Thread.sleep(windowInSeconds * 1000); - error(); + Thread.sleep(windowInSeconds * 1000 + sleepDelta); Exception error = null; try { From 591473fbb7e9512a7eec9569911e379a040b5cf2 Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Wed, 8 Dec 2021 10:52:13 +0800 Subject: [PATCH 03/17] rename tikv.circuit.break to tikv.circuit_break Signed-off-by: marsishandsome --- src/main/java/org/tikv/common/ConfigUtils.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/tikv/common/ConfigUtils.java b/src/main/java/org/tikv/common/ConfigUtils.java index 8f5f1613675..940fe646f8a 100644 --- a/src/main/java/org/tikv/common/ConfigUtils.java +++ b/src/main/java/org/tikv/common/ConfigUtils.java @@ -92,17 +92,17 @@ public class ConfigUtils { public static final String TIKV_KEY_CERT_CHAIN = "tikv.key_cert_chain"; public static final String TIKV_KEY_FILE = "tikv.key_file"; - public static final String TiKV_CIRCUIT_BREAK_ENABLE = "tikv.circuit.break.enable"; + public static final String TiKV_CIRCUIT_BREAK_ENABLE = "tikv.circuit_break.enable"; public static final String TiKV_CIRCUIT_BREAK_AVAILABILITY_WINDOW_IN_SECONDS = - "tikv.circuit.break.trigger.availability.window_in_seconds"; + "tikv.circuit_break.trigger.availability.window_in_seconds"; public static final String TiKV_CIRCUIT_BREAK_AVAILABILITY_ERROR_THRESHOLD_PERCENTAGE = - "tikv.circuit.break.trigger.availability.error_threshold_percentage"; + "tikv.circuit_break.trigger.availability.error_threshold_percentage"; public static final String TiKV_CIRCUIT_BREAK_AVAILABILITY_REQUEST_VOLUMN_THRESHOLD = - "tikv.circuit.break.trigger.availability.request_volumn_threshold"; + "tikv.circuit_break.trigger.availability.request_volumn_threshold"; public static final String TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS = - "tikv.circuit.break.trigger.sleep_window_in_seconds"; + "tikv.circuit_break.trigger.sleep_window_in_seconds"; public static final String TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT = - "tikv.circuit.break.trigger.attempt_request_count"; + "tikv.circuit_break.trigger.attempt_request_count"; public static final String TIFLASH_ENABLE = "tiflash.enable"; public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379"; From 1df58eb3a14a19835873931d6d4704ec0bea9597 Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Wed, 8 Dec 2021 11:11:13 +0800 Subject: [PATCH 04/17] remove isOpen and getStatus from CircuitBreaker Signed-off-by: marsishandsome --- .../org/tikv/common/failsafe/CircuitBreaker.java | 14 -------------- .../tikv/common/failsafe/CircuitBreakerImpl.java | 6 ++---- 2 files changed, 2 insertions(+), 18 deletions(-) diff --git a/src/main/java/org/tikv/common/failsafe/CircuitBreaker.java b/src/main/java/org/tikv/common/failsafe/CircuitBreaker.java index 054b00bb4a0..2fec0a9f47a 100644 --- a/src/main/java/org/tikv/common/failsafe/CircuitBreaker.java +++ b/src/main/java/org/tikv/common/failsafe/CircuitBreaker.java @@ -33,20 +33,6 @@ enum Status { */ boolean allowRequest(); - /** - * Whether the circuit is currently open (tripped). - * - * @return boolean state of circuit breaker - */ - boolean isOpen(); - - /** - * Get the status of circuit breaker. - * - * @return The status of circuit breaker. - */ - Status getStatus(); - /** * Invoked at start of command execution to attempt an execution. This is non-idempotent - it may * modify internal state. diff --git a/src/main/java/org/tikv/common/failsafe/CircuitBreakerImpl.java b/src/main/java/org/tikv/common/failsafe/CircuitBreakerImpl.java index 5d48746b98a..bf0fa881ef3 100644 --- a/src/main/java/org/tikv/common/failsafe/CircuitBreakerImpl.java +++ b/src/main/java/org/tikv/common/failsafe/CircuitBreakerImpl.java @@ -104,13 +104,11 @@ public boolean allowRequest() { return !isOpen(); } - @Override - public boolean isOpen() { + boolean isOpen() { return circuitOpened.get() >= 0; } - @Override - public Status getStatus() { + Status getStatus() { return status.get(); } From 5ac0c350c7f4805e1d7d40870c7bb107668de096 Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Wed, 8 Dec 2021 12:37:40 +0800 Subject: [PATCH 05/17] rename success() -> recordSuccess() Signed-off-by: marsishandsome --- .../org/tikv/common/failsafe/CircuitBreaker.java | 4 ++-- .../org/tikv/common/failsafe/CircuitBreakerImpl.java | 4 ++-- .../tikv/common/failsafe/CircuitBreakerMetrics.java | 4 ++-- .../common/failsafe/CircuitBreakerMetricsImpl.java | 12 ++++++------ .../java/org/tikv/raw/CircuitBreakerRawKVClient.java | 12 ++++++------ .../common/failsafe/CircuitBreakerMetricsTest.java | 8 ++++---- .../org/tikv/common/failsafe/CircuitBreakerTest.java | 8 ++++---- 7 files changed, 26 insertions(+), 26 deletions(-) diff --git a/src/main/java/org/tikv/common/failsafe/CircuitBreaker.java b/src/main/java/org/tikv/common/failsafe/CircuitBreaker.java index 2fec0a9f47a..d327f80de66 100644 --- a/src/main/java/org/tikv/common/failsafe/CircuitBreaker.java +++ b/src/main/java/org/tikv/common/failsafe/CircuitBreaker.java @@ -40,10 +40,10 @@ enum Status { boolean attemptExecution(); /** Invoked on successful executions as part of feedback mechanism when in a half-open state. */ - void markAttemptSuccess(); + void recordAttemptSuccess(); /** Invoked on unsuccessful executions as part of feedback mechanism when in a half-open state. */ - void markAttemptFailure(); + void recordAttemptFailure(); /** Get the Circuit Breaker Metrics Object. */ CircuitBreakerMetrics getMetrics(); diff --git a/src/main/java/org/tikv/common/failsafe/CircuitBreakerImpl.java b/src/main/java/org/tikv/common/failsafe/CircuitBreakerImpl.java index bf0fa881ef3..5881ac85051 100644 --- a/src/main/java/org/tikv/common/failsafe/CircuitBreakerImpl.java +++ b/src/main/java/org/tikv/common/failsafe/CircuitBreakerImpl.java @@ -113,14 +113,14 @@ Status getStatus() { } @Override - public void markAttemptSuccess() { + public void recordAttemptSuccess() { if (attemptSuccessCount.incrementAndGet() >= this.attemptRequestCount) { halfOpen2Close(); } } @Override - public void markAttemptFailure() { + public void recordAttemptFailure() { halfOpen2Open(); } diff --git a/src/main/java/org/tikv/common/failsafe/CircuitBreakerMetrics.java b/src/main/java/org/tikv/common/failsafe/CircuitBreakerMetrics.java index c8c0ff9da94..3131b42bcdb 100644 --- a/src/main/java/org/tikv/common/failsafe/CircuitBreakerMetrics.java +++ b/src/main/java/org/tikv/common/failsafe/CircuitBreakerMetrics.java @@ -19,10 +19,10 @@ public interface CircuitBreakerMetrics extends Closeable { /** Record a successful call. */ - void success(); + void recordSuccess(); /** Record a failure call. */ - void failure(); + void recordFailure(); /** Add metrics listener. */ void addListener(MetricsListener metricsListener); diff --git a/src/main/java/org/tikv/common/failsafe/CircuitBreakerMetricsImpl.java b/src/main/java/org/tikv/common/failsafe/CircuitBreakerMetricsImpl.java index 5d9cb071919..956b0323bef 100644 --- a/src/main/java/org/tikv/common/failsafe/CircuitBreakerMetricsImpl.java +++ b/src/main/java/org/tikv/common/failsafe/CircuitBreakerMetricsImpl.java @@ -56,13 +56,13 @@ public CircuitBreakerMetricsImpl(int windowInSeconds) { } @Override - public void success() { - currentMetrics.get().success(); + public void recordSuccess() { + currentMetrics.get().recordSuccess(); } @Override - public void failure() { - currentMetrics.get().failure(); + public void recordFailure() { + currentMetrics.get().recordFailure(); } private void checkTimeout() { @@ -94,11 +94,11 @@ static class SingleWindowMetrics { private final AtomicLong totalCount = new AtomicLong(0); private final AtomicLong errorCount = new AtomicLong(0); - public void success() { + public void recordSuccess() { totalCount.incrementAndGet(); } - public void failure() { + public void recordFailure() { totalCount.incrementAndGet(); errorCount.incrementAndGet(); } diff --git a/src/main/java/org/tikv/raw/CircuitBreakerRawKVClient.java b/src/main/java/org/tikv/raw/CircuitBreakerRawKVClient.java index 4735ad28e87..8ea98a7f57d 100644 --- a/src/main/java/org/tikv/raw/CircuitBreakerRawKVClient.java +++ b/src/main/java/org/tikv/raw/CircuitBreakerRawKVClient.java @@ -180,23 +180,23 @@ T callWithCircuitBreaker(Function1 func) { if (circuitBreaker.allowRequest()) { try { T result = func.apply(); - circuitBreakerMetrics.success(); + circuitBreakerMetrics.recordSuccess(); return result; } catch (Exception e) { - circuitBreakerMetrics.failure(); + circuitBreakerMetrics.recordFailure(); throw e; } } else if (circuitBreaker.attemptExecution()) { logger.info("attemptExecution"); try { T result = func.apply(); - circuitBreakerMetrics.success(); - circuitBreaker.markAttemptSuccess(); + circuitBreakerMetrics.recordSuccess(); + circuitBreaker.recordAttemptSuccess(); logger.info("markSuccess"); return result; } catch (Exception e) { - circuitBreakerMetrics.failure(); - circuitBreaker.markAttemptFailure(); + circuitBreakerMetrics.recordFailure(); + circuitBreaker.recordAttemptFailure(); logger.info("markNonSuccess"); throw e; } diff --git a/src/test/java/org/tikv/common/failsafe/CircuitBreakerMetricsTest.java b/src/test/java/org/tikv/common/failsafe/CircuitBreakerMetricsTest.java index 323bf32c706..973bf5dfb82 100644 --- a/src/test/java/org/tikv/common/failsafe/CircuitBreakerMetricsTest.java +++ b/src/test/java/org/tikv/common/failsafe/CircuitBreakerMetricsTest.java @@ -21,7 +21,7 @@ public void testAllSuccess() throws InterruptedException, IOException { metrics.addListener(metricsListener); for (int i = 1; i <= TEST_COUNT; i++) { - metrics.success(); + metrics.recordSuccess(); } Thread.sleep(WINDOW_IN_SECONDS * 1000 + SLEEP_DELTA); assertNotNull(healthCounts.get()); @@ -39,7 +39,7 @@ public void testAllFailure() throws InterruptedException, IOException { metrics.addListener(metricsListener); for (int i = 1; i <= TEST_COUNT; i++) { - metrics.failure(); + metrics.recordFailure(); } Thread.sleep(WINDOW_IN_SECONDS * 1000 + SLEEP_DELTA); assertNotNull(healthCounts.get()); @@ -57,8 +57,8 @@ public void testHalfFailure() throws InterruptedException, IOException { metrics.addListener(metricsListener); for (int i = 1; i <= TEST_COUNT; i++) { - metrics.failure(); - metrics.success(); + metrics.recordFailure(); + metrics.recordSuccess(); } Thread.sleep(WINDOW_IN_SECONDS * 1000 + SLEEP_DELTA); assertNotNull(healthCounts.get()); diff --git a/src/test/java/org/tikv/common/failsafe/CircuitBreakerTest.java b/src/test/java/org/tikv/common/failsafe/CircuitBreakerTest.java index 1bb21c3a638..0ab9b5caca1 100644 --- a/src/test/java/org/tikv/common/failsafe/CircuitBreakerTest.java +++ b/src/test/java/org/tikv/common/failsafe/CircuitBreakerTest.java @@ -34,7 +34,7 @@ public void testCircuitBreaker() throws InterruptedException { // CLOSE => OPEN for (int i = 1; i <= requestVolumeThreshold; i++) { - metrics.failure(); + metrics.recordFailure(); } Thread.sleep(windowInSeconds * 1000 + sleepDelta); assertTrue(circuitBreaker.isOpen()); @@ -47,21 +47,21 @@ public void testCircuitBreaker() throws InterruptedException { assertEquals(circuitBreaker.getStatus(), CircuitBreaker.Status.HALF_OPEN); // HALF_OPEN => OPEN - circuitBreaker.markAttemptFailure(); + circuitBreaker.recordAttemptFailure(); assertTrue(circuitBreaker.isOpen()); assertEquals(circuitBreaker.getStatus(), CircuitBreaker.Status.OPEN); // OPEN => HALF_OPEN Thread.sleep(sleepWindowInSeconds * 1000 + sleepDelta); assertTrue(circuitBreaker.attemptExecution()); - circuitBreaker.markAttemptSuccess(); + circuitBreaker.recordAttemptSuccess(); assertTrue(circuitBreaker.isOpen()); assertEquals(circuitBreaker.getStatus(), CircuitBreaker.Status.HALF_OPEN); // HALF_OPEN => CLOSED for (int i = 1; i < attemptRequestCount; i++) { assertTrue(circuitBreaker.attemptExecution()); - circuitBreaker.markAttemptSuccess(); + circuitBreaker.recordAttemptSuccess(); } assertTrue(!circuitBreaker.isOpen()); assertEquals(circuitBreaker.getStatus(), CircuitBreaker.Status.CLOSED); From 42b8f40c3d0f799c437a8ad49462e358faa751d9 Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Wed, 8 Dec 2021 12:41:19 +0800 Subject: [PATCH 06/17] isOpen -> allowRequest Signed-off-by: marsishandsome --- src/main/java/org/tikv/common/failsafe/CircuitBreakerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/tikv/common/failsafe/CircuitBreakerImpl.java b/src/main/java/org/tikv/common/failsafe/CircuitBreakerImpl.java index 5881ac85051..847e93b8754 100644 --- a/src/main/java/org/tikv/common/failsafe/CircuitBreakerImpl.java +++ b/src/main/java/org/tikv/common/failsafe/CircuitBreakerImpl.java @@ -126,7 +126,7 @@ public void recordAttemptFailure() { @Override public boolean attemptExecution() { - if (!isOpen()) { + if (allowRequest()) { return true; } else { if (isAfterSleepWindow()) { From 069308f004ab74ff48ad2e64fd89a473cc41d0f4 Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Wed, 8 Dec 2021 13:06:46 +0800 Subject: [PATCH 07/17] move to package org.tikv.service Signed-off-by: marsishandsome --- .../java/org/tikv/raw/CircuitBreakerRawKVClient.java | 12 ++++++------ src/main/java/org/tikv/raw/RawKVClient.java | 2 +- .../{BaseRawKVClient.java => RawKVClientBase.java} | 2 +- .../{common => service}/failsafe/CircuitBreaker.java | 2 +- .../failsafe/CircuitBreakerImpl.java | 2 +- .../failsafe/CircuitBreakerMetrics.java | 2 +- .../failsafe/CircuitBreakerMetricsImpl.java | 2 +- .../{common => service}/failsafe/HealthCounts.java | 2 +- .../failsafe/MetricsListener.java | 2 +- .../failsafe/CircuitBreakerMetricsTest.java | 2 +- .../failsafe/CircuitBreakerTest.java | 2 +- 11 files changed, 16 insertions(+), 16 deletions(-) rename src/main/java/org/tikv/raw/{BaseRawKVClient.java => RawKVClientBase.java} (99%) rename src/main/java/org/tikv/{common => service}/failsafe/CircuitBreaker.java (97%) rename src/main/java/org/tikv/{common => service}/failsafe/CircuitBreakerImpl.java (99%) rename src/main/java/org/tikv/{common => service}/failsafe/CircuitBreakerMetrics.java (95%) rename src/main/java/org/tikv/{common => service}/failsafe/CircuitBreakerMetricsImpl.java (99%) rename src/main/java/org/tikv/{common => service}/failsafe/HealthCounts.java (97%) rename src/main/java/org/tikv/{common => service}/failsafe/MetricsListener.java (94%) rename src/test/java/org/tikv/{common => service}/failsafe/CircuitBreakerMetricsTest.java (98%) rename src/test/java/org/tikv/{common => service}/failsafe/CircuitBreakerTest.java (98%) diff --git a/src/main/java/org/tikv/raw/CircuitBreakerRawKVClient.java b/src/main/java/org/tikv/raw/CircuitBreakerRawKVClient.java index 8ea98a7f57d..38e23f0d17f 100644 --- a/src/main/java/org/tikv/raw/CircuitBreakerRawKVClient.java +++ b/src/main/java/org/tikv/raw/CircuitBreakerRawKVClient.java @@ -23,21 +23,21 @@ import org.slf4j.LoggerFactory; import org.tikv.common.TiConfiguration; import org.tikv.common.exception.CircuitBreakerOpenException; -import org.tikv.common.failsafe.CircuitBreaker; -import org.tikv.common.failsafe.CircuitBreakerImpl; -import org.tikv.common.failsafe.CircuitBreakerMetrics; +import org.tikv.service.failsafe.CircuitBreaker; +import org.tikv.service.failsafe.CircuitBreakerImpl; +import org.tikv.service.failsafe.CircuitBreakerMetrics; import org.tikv.common.util.Pair; import org.tikv.common.util.ScanOption; import org.tikv.kvproto.Kvrpcpb; -public class CircuitBreakerRawKVClient implements BaseRawKVClient { +public class CircuitBreakerRawKVClient implements RawKVClientBase { private static final Logger logger = LoggerFactory.getLogger(CircuitBreakerRawKVClient.class); - private final BaseRawKVClient client; + private final RawKVClientBase client; private final CircuitBreaker circuitBreaker; private final CircuitBreakerMetrics circuitBreakerMetrics; - public CircuitBreakerRawKVClient(BaseRawKVClient client, TiConfiguration conf) { + public CircuitBreakerRawKVClient(RawKVClientBase client, TiConfiguration conf) { this.client = client; this.circuitBreaker = new CircuitBreakerImpl(conf); this.circuitBreakerMetrics = this.circuitBreaker.getMetrics(); diff --git a/src/main/java/org/tikv/raw/RawKVClient.java b/src/main/java/org/tikv/raw/RawKVClient.java index 336a48536c9..c0e507c6b5f 100644 --- a/src/main/java/org/tikv/raw/RawKVClient.java +++ b/src/main/java/org/tikv/raw/RawKVClient.java @@ -44,7 +44,7 @@ import org.tikv.common.util.*; import org.tikv.kvproto.Kvrpcpb.KvPair; -public class RawKVClient implements BaseRawKVClient { +public class RawKVClient implements RawKVClientBase { private final TiSession tiSession; private final RegionStoreClientBuilder clientBuilder; private final TiConfiguration conf; diff --git a/src/main/java/org/tikv/raw/BaseRawKVClient.java b/src/main/java/org/tikv/raw/RawKVClientBase.java similarity index 99% rename from src/main/java/org/tikv/raw/BaseRawKVClient.java rename to src/main/java/org/tikv/raw/RawKVClientBase.java index 2c963f11e71..9c55f0afb40 100644 --- a/src/main/java/org/tikv/raw/BaseRawKVClient.java +++ b/src/main/java/org/tikv/raw/RawKVClientBase.java @@ -23,7 +23,7 @@ import org.tikv.common.util.ScanOption; import org.tikv.kvproto.Kvrpcpb; -public interface BaseRawKVClient extends AutoCloseable { +public interface RawKVClientBase extends AutoCloseable { // https://www.github.com/pingcap/tidb/blob/master/store/tikv/rawkv.go int MAX_RAW_SCAN_LIMIT = 10240; int MAX_RAW_BATCH_LIMIT = 1024; diff --git a/src/main/java/org/tikv/common/failsafe/CircuitBreaker.java b/src/main/java/org/tikv/service/failsafe/CircuitBreaker.java similarity index 97% rename from src/main/java/org/tikv/common/failsafe/CircuitBreaker.java rename to src/main/java/org/tikv/service/failsafe/CircuitBreaker.java index d327f80de66..327f9799a9f 100644 --- a/src/main/java/org/tikv/common/failsafe/CircuitBreaker.java +++ b/src/main/java/org/tikv/service/failsafe/CircuitBreaker.java @@ -13,7 +13,7 @@ * limitations under the License. */ -package org.tikv.common.failsafe; +package org.tikv.service.failsafe; import java.io.Closeable; diff --git a/src/main/java/org/tikv/common/failsafe/CircuitBreakerImpl.java b/src/main/java/org/tikv/service/failsafe/CircuitBreakerImpl.java similarity index 99% rename from src/main/java/org/tikv/common/failsafe/CircuitBreakerImpl.java rename to src/main/java/org/tikv/service/failsafe/CircuitBreakerImpl.java index 847e93b8754..08b3a1da58a 100644 --- a/src/main/java/org/tikv/common/failsafe/CircuitBreakerImpl.java +++ b/src/main/java/org/tikv/service/failsafe/CircuitBreakerImpl.java @@ -13,7 +13,7 @@ * limitations under the License. */ -package org.tikv.common.failsafe; +package org.tikv.service.failsafe; import java.io.IOException; import java.util.concurrent.atomic.AtomicLong; diff --git a/src/main/java/org/tikv/common/failsafe/CircuitBreakerMetrics.java b/src/main/java/org/tikv/service/failsafe/CircuitBreakerMetrics.java similarity index 95% rename from src/main/java/org/tikv/common/failsafe/CircuitBreakerMetrics.java rename to src/main/java/org/tikv/service/failsafe/CircuitBreakerMetrics.java index 3131b42bcdb..6287a9f199e 100644 --- a/src/main/java/org/tikv/common/failsafe/CircuitBreakerMetrics.java +++ b/src/main/java/org/tikv/service/failsafe/CircuitBreakerMetrics.java @@ -13,7 +13,7 @@ * limitations under the License. */ -package org.tikv.common.failsafe; +package org.tikv.service.failsafe; import java.io.Closeable; diff --git a/src/main/java/org/tikv/common/failsafe/CircuitBreakerMetricsImpl.java b/src/main/java/org/tikv/service/failsafe/CircuitBreakerMetricsImpl.java similarity index 99% rename from src/main/java/org/tikv/common/failsafe/CircuitBreakerMetricsImpl.java rename to src/main/java/org/tikv/service/failsafe/CircuitBreakerMetricsImpl.java index 956b0323bef..3261c26b8ea 100644 --- a/src/main/java/org/tikv/common/failsafe/CircuitBreakerMetricsImpl.java +++ b/src/main/java/org/tikv/service/failsafe/CircuitBreakerMetricsImpl.java @@ -13,7 +13,7 @@ * limitations under the License. */ -package org.tikv.common.failsafe; +package org.tikv.service.failsafe; import java.io.IOException; import java.util.ArrayList; diff --git a/src/main/java/org/tikv/common/failsafe/HealthCounts.java b/src/main/java/org/tikv/service/failsafe/HealthCounts.java similarity index 97% rename from src/main/java/org/tikv/common/failsafe/HealthCounts.java rename to src/main/java/org/tikv/service/failsafe/HealthCounts.java index 2a756902daf..68f986aa489 100644 --- a/src/main/java/org/tikv/common/failsafe/HealthCounts.java +++ b/src/main/java/org/tikv/service/failsafe/HealthCounts.java @@ -13,7 +13,7 @@ * limitations under the License. */ -package org.tikv.common.failsafe; +package org.tikv.service.failsafe; public class HealthCounts { private final long totalCount; diff --git a/src/main/java/org/tikv/common/failsafe/MetricsListener.java b/src/main/java/org/tikv/service/failsafe/MetricsListener.java similarity index 94% rename from src/main/java/org/tikv/common/failsafe/MetricsListener.java rename to src/main/java/org/tikv/service/failsafe/MetricsListener.java index 51eda2de6a7..2fd59b46617 100644 --- a/src/main/java/org/tikv/common/failsafe/MetricsListener.java +++ b/src/main/java/org/tikv/service/failsafe/MetricsListener.java @@ -13,7 +13,7 @@ * limitations under the License. */ -package org.tikv.common.failsafe; +package org.tikv.service.failsafe; public interface MetricsListener { void onNext(HealthCounts healthCounts); diff --git a/src/test/java/org/tikv/common/failsafe/CircuitBreakerMetricsTest.java b/src/test/java/org/tikv/service/failsafe/CircuitBreakerMetricsTest.java similarity index 98% rename from src/test/java/org/tikv/common/failsafe/CircuitBreakerMetricsTest.java rename to src/test/java/org/tikv/service/failsafe/CircuitBreakerMetricsTest.java index 973bf5dfb82..a8cbc3576fb 100644 --- a/src/test/java/org/tikv/common/failsafe/CircuitBreakerMetricsTest.java +++ b/src/test/java/org/tikv/service/failsafe/CircuitBreakerMetricsTest.java @@ -1,4 +1,4 @@ -package org.tikv.common.failsafe; +package org.tikv.service.failsafe; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; diff --git a/src/test/java/org/tikv/common/failsafe/CircuitBreakerTest.java b/src/test/java/org/tikv/service/failsafe/CircuitBreakerTest.java similarity index 98% rename from src/test/java/org/tikv/common/failsafe/CircuitBreakerTest.java rename to src/test/java/org/tikv/service/failsafe/CircuitBreakerTest.java index 0ab9b5caca1..766d5bff709 100644 --- a/src/test/java/org/tikv/common/failsafe/CircuitBreakerTest.java +++ b/src/test/java/org/tikv/service/failsafe/CircuitBreakerTest.java @@ -1,4 +1,4 @@ -package org.tikv.common.failsafe; +package org.tikv.service.failsafe; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; From f4559cced9e1c03b00de69ca5bb54496ec69964c Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Wed, 8 Dec 2021 13:24:18 +0800 Subject: [PATCH 08/17] rename CircuitBreakerRawKVClient to SmartRawKVClient Signed-off-by: marsishandsome --- src/main/java/org/tikv/common/TiSession.java | 6 +++--- ...BreakerRawKVClient.java => SmartRawKVClient.java} | 12 ++++++------ ...awKVClientTest.java => SmartRawKVClientTest.java} | 6 +++--- 3 files changed, 12 insertions(+), 12 deletions(-) rename src/main/java/org/tikv/raw/{CircuitBreakerRawKVClient.java => SmartRawKVClient.java} (96%) rename src/test/java/org/tikv/raw/{CircuitBreakerRawKVClientTest.java => SmartRawKVClientTest.java} (93%) diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index 8b3a33f8f9a..d1171dbcf4f 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -42,8 +42,8 @@ import org.tikv.common.util.*; import org.tikv.kvproto.ImportSstpb; import org.tikv.kvproto.Metapb; -import org.tikv.raw.CircuitBreakerRawKVClient; import org.tikv.raw.RawKVClient; +import org.tikv.raw.SmartRawKVClient; import org.tikv.txn.KVClient; import org.tikv.txn.TxnKVClient; @@ -127,9 +127,9 @@ public RawKVClient createRawClient() { return new RawKVClient(this, this.getRegionStoreClientBuilder()); } - public CircuitBreakerRawKVClient createCircuitBreakerRawClient() { + public SmartRawKVClient createSmartRawClient() { RawKVClient rawKVClient = createRawClient(); - return new CircuitBreakerRawKVClient(rawKVClient, getConf()); + return new SmartRawKVClient(rawKVClient, getConf()); } public KVClient createKVClient() { diff --git a/src/main/java/org/tikv/raw/CircuitBreakerRawKVClient.java b/src/main/java/org/tikv/raw/SmartRawKVClient.java similarity index 96% rename from src/main/java/org/tikv/raw/CircuitBreakerRawKVClient.java rename to src/main/java/org/tikv/raw/SmartRawKVClient.java index 38e23f0d17f..e7a75db232a 100644 --- a/src/main/java/org/tikv/raw/CircuitBreakerRawKVClient.java +++ b/src/main/java/org/tikv/raw/SmartRawKVClient.java @@ -23,21 +23,21 @@ import org.slf4j.LoggerFactory; import org.tikv.common.TiConfiguration; import org.tikv.common.exception.CircuitBreakerOpenException; -import org.tikv.service.failsafe.CircuitBreaker; -import org.tikv.service.failsafe.CircuitBreakerImpl; -import org.tikv.service.failsafe.CircuitBreakerMetrics; import org.tikv.common.util.Pair; import org.tikv.common.util.ScanOption; import org.tikv.kvproto.Kvrpcpb; +import org.tikv.service.failsafe.CircuitBreaker; +import org.tikv.service.failsafe.CircuitBreakerImpl; +import org.tikv.service.failsafe.CircuitBreakerMetrics; -public class CircuitBreakerRawKVClient implements RawKVClientBase { - private static final Logger logger = LoggerFactory.getLogger(CircuitBreakerRawKVClient.class); +public class SmartRawKVClient implements RawKVClientBase { + private static final Logger logger = LoggerFactory.getLogger(SmartRawKVClient.class); private final RawKVClientBase client; private final CircuitBreaker circuitBreaker; private final CircuitBreakerMetrics circuitBreakerMetrics; - public CircuitBreakerRawKVClient(RawKVClientBase client, TiConfiguration conf) { + public SmartRawKVClient(RawKVClientBase client, TiConfiguration conf) { this.client = client; this.circuitBreaker = new CircuitBreakerImpl(conf); this.circuitBreakerMetrics = this.circuitBreaker.getMetrics(); diff --git a/src/test/java/org/tikv/raw/CircuitBreakerRawKVClientTest.java b/src/test/java/org/tikv/raw/SmartRawKVClientTest.java similarity index 93% rename from src/test/java/org/tikv/raw/CircuitBreakerRawKVClientTest.java rename to src/test/java/org/tikv/raw/SmartRawKVClientTest.java index 1c6d0b647b7..7c52049271e 100644 --- a/src/test/java/org/tikv/raw/CircuitBreakerRawKVClientTest.java +++ b/src/test/java/org/tikv/raw/SmartRawKVClientTest.java @@ -11,7 +11,7 @@ import org.tikv.common.TiSession; import org.tikv.common.exception.CircuitBreakerOpenException; -public class CircuitBreakerRawKVClientTest extends BaseRawKVTest { +public class SmartRawKVClientTest extends BaseRawKVTest { private boolean enable = true; private int windowInSeconds = 2; private int errorThresholdPercentage = 100; @@ -22,7 +22,7 @@ public class CircuitBreakerRawKVClientTest extends BaseRawKVTest { private int sleepDelta = 100; private TiSession session; - private CircuitBreakerRawKVClient client; + private SmartRawKVClient client; @Before public void setup() { @@ -34,7 +34,7 @@ public void setup() { conf.setCircuitBreakSleepWindowInSeconds(sleepWindowInSeconds); conf.setCircuitBreakAttemptRequestCount(attemptRequestCount); session = TiSession.create(conf); - client = session.createCircuitBreakerRawClient(); + client = session.createSmartRawClient(); } @After From 09718d02e088da9dade301a51300d896b3d3af7e Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Tue, 7 Dec 2021 16:54:44 +0800 Subject: [PATCH 09/17] add metrics for CircuitBreaker Signed-off-by: marsishandsome --- .../java/org/tikv/raw/SmartRawKVClient.java | 102 +++++++++++++----- .../tikv/service/failsafe/CircuitBreaker.java | 16 ++- .../service/failsafe/CircuitBreakerImpl.java | 21 ++++ .../failsafe/CircuitBreakerMetricsImpl.java | 25 ++++- .../org/tikv/raw/SmartRawKVClientTest.java | 2 +- 5 files changed, 131 insertions(+), 35 deletions(-) diff --git a/src/main/java/org/tikv/raw/SmartRawKVClient.java b/src/main/java/org/tikv/raw/SmartRawKVClient.java index e7a75db232a..f6824bc5e72 100644 --- a/src/main/java/org/tikv/raw/SmartRawKVClient.java +++ b/src/main/java/org/tikv/raw/SmartRawKVClient.java @@ -16,6 +16,8 @@ package org.tikv.raw; import com.google.protobuf.ByteString; +import io.prometheus.client.Counter; +import io.prometheus.client.Histogram; import java.util.List; import java.util.Map; import java.util.Optional; @@ -33,6 +35,34 @@ public class SmartRawKVClient implements RawKVClientBase { private static final Logger logger = LoggerFactory.getLogger(SmartRawKVClient.class); + private static final Histogram REQUEST_LATENCY = + Histogram.build() + .name("client_java_smart_raw_requests_latency") + .help("client smart raw request latency.") + .labelNames("type") + .register(); + + private static final Counter REQUEST_SUCCESS = + Counter.build() + .name("client_java_smart_raw_requests_success") + .help("client smart raw request success.") + .labelNames("type") + .register(); + + private static final Counter REQUEST_FAILURE = + Counter.build() + .name("client_java_smart_raw_requests_failure") + .help("client smart raw request failure.") + .labelNames("type") + .register(); + + private static final Counter CIRCUIT_BREAKER_OPENED = + Counter.build() + .name("client_java_smart_raw_circuit_breaker_opened") + .help("client smart raw circuit breaker opened.") + .labelNames("type") + .register(); + private final RawKVClientBase client; private final CircuitBreaker circuitBreaker; private final CircuitBreakerMetrics circuitBreakerMetrics; @@ -45,138 +75,152 @@ public SmartRawKVClient(RawKVClientBase client, TiConfiguration conf) { @Override public void put(ByteString key, ByteString value) { - callWithCircuitBreaker(() -> client.put(key, value)); + callWithCircuitBreaker("put", () -> client.put(key, value)); } @Override public void put(ByteString key, ByteString value, long ttl) { - callWithCircuitBreaker(() -> client.put(key, value, ttl)); + callWithCircuitBreaker("put", () -> client.put(key, value, ttl)); } @Override public Optional putIfAbsent(ByteString key, ByteString value) { - return callWithCircuitBreaker(() -> client.putIfAbsent(key, value)); + return callWithCircuitBreaker("putIfAbsent", () -> client.putIfAbsent(key, value)); } @Override public Optional putIfAbsent(ByteString key, ByteString value, long ttl) { - return callWithCircuitBreaker(() -> client.putIfAbsent(key, value, ttl)); + return callWithCircuitBreaker("putIfAbsent", () -> client.putIfAbsent(key, value, ttl)); } @Override public void compareAndSet(ByteString key, Optional prevValue, ByteString value) { - callWithCircuitBreaker(() -> client.compareAndSet(key, prevValue, value)); + callWithCircuitBreaker("compareAndSet", () -> client.compareAndSet(key, prevValue, value)); } @Override public void compareAndSet( ByteString key, Optional prevValue, ByteString value, long ttl) { - callWithCircuitBreaker(() -> client.compareAndSet(key, prevValue, value, ttl)); + callWithCircuitBreaker("compareAndSet", () -> client.compareAndSet(key, prevValue, value, ttl)); } @Override public void batchPut(Map kvPairs) { - callWithCircuitBreaker(() -> client.batchPut(kvPairs)); + callWithCircuitBreaker("batchPut", () -> client.batchPut(kvPairs)); } @Override public void batchPut(Map kvPairs, long ttl) { - callWithCircuitBreaker(() -> client.batchPut(kvPairs, ttl)); + callWithCircuitBreaker("batchPut", () -> client.batchPut(kvPairs, ttl)); } @Override public Optional get(ByteString key) { - return callWithCircuitBreaker(() -> client.get(key)); + return callWithCircuitBreaker("get", () -> client.get(key)); } @Override public List batchGet(List keys) { - return callWithCircuitBreaker(() -> client.batchGet(keys)); + return callWithCircuitBreaker("batchGet", () -> client.batchGet(keys)); } @Override public void batchDelete(List keys) { - callWithCircuitBreaker(() -> client.batchDelete(keys)); + callWithCircuitBreaker("batchDelete", () -> client.batchDelete(keys)); } @Override public Optional getKeyTTL(ByteString key) { - return callWithCircuitBreaker(() -> client.getKeyTTL(key)); + return callWithCircuitBreaker("getKeyTTL", () -> client.getKeyTTL(key)); } @Override public List> batchScanKeys( List> ranges, int eachLimit) { - return callWithCircuitBreaker(() -> client.batchScanKeys(ranges, eachLimit)); + return callWithCircuitBreaker("batchScanKeys", () -> client.batchScanKeys(ranges, eachLimit)); } @Override public List> batchScan(List ranges) { - return callWithCircuitBreaker(() -> client.batchScan(ranges)); + return callWithCircuitBreaker("batchScan", () -> client.batchScan(ranges)); } @Override public List scan(ByteString startKey, ByteString endKey, int limit) { - return callWithCircuitBreaker(() -> client.scan(startKey, endKey, limit)); + return callWithCircuitBreaker("scan", () -> client.scan(startKey, endKey, limit)); } @Override public List scan( ByteString startKey, ByteString endKey, int limit, boolean keyOnly) { - return callWithCircuitBreaker(() -> client.scan(startKey, endKey, limit, keyOnly)); + return callWithCircuitBreaker("scan", () -> client.scan(startKey, endKey, limit, keyOnly)); } @Override public List scan(ByteString startKey, int limit) { - return callWithCircuitBreaker(() -> client.scan(startKey, limit)); + return callWithCircuitBreaker("scan", () -> client.scan(startKey, limit)); } @Override public List scan(ByteString startKey, int limit, boolean keyOnly) { - return callWithCircuitBreaker(() -> client.scan(startKey, limit, keyOnly)); + return callWithCircuitBreaker("scan", () -> client.scan(startKey, limit, keyOnly)); } @Override public List scan(ByteString startKey, ByteString endKey) { - return callWithCircuitBreaker(() -> client.scan(startKey, endKey)); + return callWithCircuitBreaker("scan", () -> client.scan(startKey, endKey)); } @Override public List scan(ByteString startKey, ByteString endKey, boolean keyOnly) { - return callWithCircuitBreaker(() -> client.scan(startKey, endKey, keyOnly)); + return callWithCircuitBreaker("scan", () -> client.scan(startKey, endKey, keyOnly)); } @Override public List scanPrefix(ByteString prefixKey, int limit, boolean keyOnly) { - return callWithCircuitBreaker(() -> client.scanPrefix(prefixKey, limit, keyOnly)); + return callWithCircuitBreaker("scanPrefix", () -> client.scanPrefix(prefixKey, limit, keyOnly)); } @Override public List scanPrefix(ByteString prefixKey) { - return callWithCircuitBreaker(() -> client.scanPrefix(prefixKey)); + return callWithCircuitBreaker("scanPrefix", () -> client.scanPrefix(prefixKey)); } @Override public List scanPrefix(ByteString prefixKey, boolean keyOnly) { - return callWithCircuitBreaker(() -> client.scanPrefix(prefixKey, keyOnly)); + return callWithCircuitBreaker("scanPrefix", () -> client.scanPrefix(prefixKey, keyOnly)); } @Override public void delete(ByteString key) { - callWithCircuitBreaker(() -> client.delete(key)); + callWithCircuitBreaker("delete", () -> client.delete(key)); } @Override public void deleteRange(ByteString startKey, ByteString endKey) { - callWithCircuitBreaker(() -> client.deleteRange(startKey, endKey)); + callWithCircuitBreaker("deleteRange", () -> client.deleteRange(startKey, endKey)); } @Override public void deletePrefix(ByteString key) { - callWithCircuitBreaker(() -> client.deletePrefix(key)); + callWithCircuitBreaker("deletePrefix", () -> client.deletePrefix(key)); + } + + T callWithCircuitBreaker(String funcName, Function1 func) { + Histogram.Timer requestTimer = REQUEST_LATENCY.labels(funcName).startTimer(); + try { + T result = callWithCircuitBreaker0(funcName, func); + REQUEST_SUCCESS.labels(funcName).inc(); + return result; + } catch (Exception e) { + REQUEST_FAILURE.labels(funcName).inc(); + throw e; + } finally { + requestTimer.observeDuration(); + } } - T callWithCircuitBreaker(Function1 func) { + private T callWithCircuitBreaker0(String funcName, Function1 func) { if (circuitBreaker.allowRequest()) { try { T result = func.apply(); @@ -202,12 +246,14 @@ T callWithCircuitBreaker(Function1 func) { } } else { logger.warn("Circuit Breaker Opened"); + CIRCUIT_BREAKER_OPENED.labels(funcName).inc(); throw new CircuitBreakerOpenException(); } } - void callWithCircuitBreaker(Function0 func) { + private void callWithCircuitBreaker(String funcName, Function0 func) { callWithCircuitBreaker( + funcName, (Function1) () -> { func.apply(); diff --git a/src/main/java/org/tikv/service/failsafe/CircuitBreaker.java b/src/main/java/org/tikv/service/failsafe/CircuitBreaker.java index 327f9799a9f..e1c4f1e2bc4 100644 --- a/src/main/java/org/tikv/service/failsafe/CircuitBreaker.java +++ b/src/main/java/org/tikv/service/failsafe/CircuitBreaker.java @@ -20,9 +20,19 @@ public interface CircuitBreaker extends Closeable { enum Status { - CLOSED, - OPEN, - HALF_OPEN; + CLOSED(0), + HALF_OPEN(1), + OPEN(2); + + private final int value; + + private Status(int value) { + this.value = value; + } + + public int getValue() { + return value; + } } /** diff --git a/src/main/java/org/tikv/service/failsafe/CircuitBreakerImpl.java b/src/main/java/org/tikv/service/failsafe/CircuitBreakerImpl.java index 08b3a1da58a..d8c4dca1607 100644 --- a/src/main/java/org/tikv/service/failsafe/CircuitBreakerImpl.java +++ b/src/main/java/org/tikv/service/failsafe/CircuitBreakerImpl.java @@ -15,6 +15,8 @@ package org.tikv.service.failsafe; +import io.prometheus.client.Counter; +import io.prometheus.client.Gauge; import java.io.IOException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -25,6 +27,19 @@ public class CircuitBreakerImpl implements CircuitBreaker { private static final Logger logger = LoggerFactory.getLogger(CircuitBreakerImpl.class); + private static final Gauge CIRCUIT_BREAKER_STATUS = + Gauge.build() + .name("client_java_circuit_breaker_status") + .help("client circuit breaker status.") + .register(); + + private static final Counter CIRCUIT_BREAKER_ATTEMPT_COUNTER = + Counter.build() + .name("client_java_circuit_breaker_attempt_counter") + .help("client circuit breaker attempt counter.") + .labelNames("type") + .register(); + private final boolean enable; private final int windowInSeconds; private final int errorThresholdPercentage; @@ -114,6 +129,7 @@ Status getStatus() { @Override public void recordAttemptSuccess() { + CIRCUIT_BREAKER_ATTEMPT_COUNTER.labels("success").inc(); if (attemptSuccessCount.incrementAndGet() >= this.attemptRequestCount) { halfOpen2Close(); } @@ -121,6 +137,7 @@ public void recordAttemptSuccess() { @Override public void recordAttemptFailure() { + CIRCUIT_BREAKER_ATTEMPT_COUNTER.labels("failure").inc(); halfOpen2Open(); } @@ -154,6 +171,7 @@ private void close2Open() { // it sets the start time for the sleep window circuitOpened.set(System.currentTimeMillis()); logger.info("CLOSED => OPEN"); + CIRCUIT_BREAKER_STATUS.set(Status.OPEN.getValue()); } } @@ -162,6 +180,7 @@ private void halfOpen2Close() { // This thread wins the race to close the circuit circuitOpened.set(-1L); logger.info("HALF_OPEN => CLOSED"); + CIRCUIT_BREAKER_STATUS.set(Status.CLOSED.getValue()); } } @@ -172,6 +191,7 @@ private void open2HalfOpen() { attemptCount.set(0); attemptSuccessCount.set(0); logger.info("OPEN => HALF_OPEN"); + CIRCUIT_BREAKER_STATUS.set(Status.HALF_OPEN.getValue()); } } @@ -181,6 +201,7 @@ private void halfOpen2Open() { // it resets the start time for the sleep window circuitOpened.set(System.currentTimeMillis()); logger.info("HALF_OPEN => OPEN"); + CIRCUIT_BREAKER_STATUS.set(Status.OPEN.getValue()); } } diff --git a/src/main/java/org/tikv/service/failsafe/CircuitBreakerMetricsImpl.java b/src/main/java/org/tikv/service/failsafe/CircuitBreakerMetricsImpl.java index 3261c26b8ea..f27766bb8be 100644 --- a/src/main/java/org/tikv/service/failsafe/CircuitBreakerMetricsImpl.java +++ b/src/main/java/org/tikv/service/failsafe/CircuitBreakerMetricsImpl.java @@ -15,6 +15,7 @@ package org.tikv.service.failsafe; +import io.prometheus.client.Gauge; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -30,6 +31,15 @@ public class CircuitBreakerMetricsImpl implements CircuitBreakerMetrics { private static final Logger logger = LoggerFactory.getLogger(CircuitBreakerMetricsImpl.class); + private static final Gauge CIRCUIT_BREAKER_REQUEST_IN_WINDOW = + Gauge.build() + .name("client_java_circuit_breaker_request_in_window") + .help("client circuit breaker request in window.") + .labelNames("type") + .register(); + private static final String TOTAL_LABEL = "total"; + private static final String ERROR_LABEL = "error"; + private final int windowInMS; private final List listeners; private final AtomicReference currentMetrics; @@ -94,13 +104,22 @@ static class SingleWindowMetrics { private final AtomicLong totalCount = new AtomicLong(0); private final AtomicLong errorCount = new AtomicLong(0); + public SingleWindowMetrics() { + CIRCUIT_BREAKER_REQUEST_IN_WINDOW.labels(TOTAL_LABEL).set(0); + CIRCUIT_BREAKER_REQUEST_IN_WINDOW.labels(ERROR_LABEL).set(0); + } + public void recordSuccess() { - totalCount.incrementAndGet(); + long total = totalCount.incrementAndGet(); + CIRCUIT_BREAKER_REQUEST_IN_WINDOW.labels(TOTAL_LABEL).set(total); } public void recordFailure() { - totalCount.incrementAndGet(); - errorCount.incrementAndGet(); + long total = totalCount.incrementAndGet(); + CIRCUIT_BREAKER_REQUEST_IN_WINDOW.labels(TOTAL_LABEL).set(total); + + long error = errorCount.incrementAndGet(); + CIRCUIT_BREAKER_REQUEST_IN_WINDOW.labels(ERROR_LABEL).set(error); } public HealthCounts getHealthCounts() { diff --git a/src/test/java/org/tikv/raw/SmartRawKVClientTest.java b/src/test/java/org/tikv/raw/SmartRawKVClientTest.java index 7c52049271e..61bb70d6507 100644 --- a/src/test/java/org/tikv/raw/SmartRawKVClientTest.java +++ b/src/test/java/org/tikv/raw/SmartRawKVClientTest.java @@ -79,7 +79,7 @@ private void success() { private void error() { try { - client.callWithCircuitBreaker(() -> 1 / 0); + client.callWithCircuitBreaker("error", () -> 1 / 0); } catch (Exception ignored) { } } From 8beee24caf0d694a1124564e4e431ffa4e92c33d Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Wed, 8 Dec 2021 14:35:34 +0800 Subject: [PATCH 10/17] fix ut Signed-off-by: marsishandsome --- src/test/java/org/tikv/raw/CASTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/org/tikv/raw/CASTest.java b/src/test/java/org/tikv/raw/CASTest.java index e8cccf5c99c..a331b4d06f9 100644 --- a/src/test/java/org/tikv/raw/CASTest.java +++ b/src/test/java/org/tikv/raw/CASTest.java @@ -74,7 +74,7 @@ public void rawPutIfAbsentTest() { Optional res2 = client.putIfAbsent(key, value2, ttl); assertEquals(res2.get(), value); try { - Thread.sleep(ttl * 1000); + Thread.sleep(ttl * 1000 + 100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } From f3f8869cd4a650fddf06965c85c3c9465cfdb815 Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Wed, 8 Dec 2021 16:51:27 +0800 Subject: [PATCH 11/17] remove error metrics Signed-off-by: marsishandsome --- .../service/failsafe/CircuitBreakerImpl.java | 11 --------- .../failsafe/CircuitBreakerMetricsImpl.java | 24 +++---------------- 2 files changed, 3 insertions(+), 32 deletions(-) diff --git a/src/main/java/org/tikv/service/failsafe/CircuitBreakerImpl.java b/src/main/java/org/tikv/service/failsafe/CircuitBreakerImpl.java index d8c4dca1607..1dd3249b95c 100644 --- a/src/main/java/org/tikv/service/failsafe/CircuitBreakerImpl.java +++ b/src/main/java/org/tikv/service/failsafe/CircuitBreakerImpl.java @@ -16,7 +16,6 @@ package org.tikv.service.failsafe; import io.prometheus.client.Counter; -import io.prometheus.client.Gauge; import java.io.IOException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -27,12 +26,6 @@ public class CircuitBreakerImpl implements CircuitBreaker { private static final Logger logger = LoggerFactory.getLogger(CircuitBreakerImpl.class); - private static final Gauge CIRCUIT_BREAKER_STATUS = - Gauge.build() - .name("client_java_circuit_breaker_status") - .help("client circuit breaker status.") - .register(); - private static final Counter CIRCUIT_BREAKER_ATTEMPT_COUNTER = Counter.build() .name("client_java_circuit_breaker_attempt_counter") @@ -171,7 +164,6 @@ private void close2Open() { // it sets the start time for the sleep window circuitOpened.set(System.currentTimeMillis()); logger.info("CLOSED => OPEN"); - CIRCUIT_BREAKER_STATUS.set(Status.OPEN.getValue()); } } @@ -180,7 +172,6 @@ private void halfOpen2Close() { // This thread wins the race to close the circuit circuitOpened.set(-1L); logger.info("HALF_OPEN => CLOSED"); - CIRCUIT_BREAKER_STATUS.set(Status.CLOSED.getValue()); } } @@ -191,7 +182,6 @@ private void open2HalfOpen() { attemptCount.set(0); attemptSuccessCount.set(0); logger.info("OPEN => HALF_OPEN"); - CIRCUIT_BREAKER_STATUS.set(Status.HALF_OPEN.getValue()); } } @@ -201,7 +191,6 @@ private void halfOpen2Open() { // it resets the start time for the sleep window circuitOpened.set(System.currentTimeMillis()); logger.info("HALF_OPEN => OPEN"); - CIRCUIT_BREAKER_STATUS.set(Status.OPEN.getValue()); } } diff --git a/src/main/java/org/tikv/service/failsafe/CircuitBreakerMetricsImpl.java b/src/main/java/org/tikv/service/failsafe/CircuitBreakerMetricsImpl.java index f27766bb8be..896dfbd670e 100644 --- a/src/main/java/org/tikv/service/failsafe/CircuitBreakerMetricsImpl.java +++ b/src/main/java/org/tikv/service/failsafe/CircuitBreakerMetricsImpl.java @@ -15,7 +15,6 @@ package org.tikv.service.failsafe; -import io.prometheus.client.Gauge; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -31,15 +30,6 @@ public class CircuitBreakerMetricsImpl implements CircuitBreakerMetrics { private static final Logger logger = LoggerFactory.getLogger(CircuitBreakerMetricsImpl.class); - private static final Gauge CIRCUIT_BREAKER_REQUEST_IN_WINDOW = - Gauge.build() - .name("client_java_circuit_breaker_request_in_window") - .help("client circuit breaker request in window.") - .labelNames("type") - .register(); - private static final String TOTAL_LABEL = "total"; - private static final String ERROR_LABEL = "error"; - private final int windowInMS; private final List listeners; private final AtomicReference currentMetrics; @@ -104,22 +94,14 @@ static class SingleWindowMetrics { private final AtomicLong totalCount = new AtomicLong(0); private final AtomicLong errorCount = new AtomicLong(0); - public SingleWindowMetrics() { - CIRCUIT_BREAKER_REQUEST_IN_WINDOW.labels(TOTAL_LABEL).set(0); - CIRCUIT_BREAKER_REQUEST_IN_WINDOW.labels(ERROR_LABEL).set(0); - } - public void recordSuccess() { - long total = totalCount.incrementAndGet(); - CIRCUIT_BREAKER_REQUEST_IN_WINDOW.labels(TOTAL_LABEL).set(total); + totalCount.incrementAndGet(); } public void recordFailure() { - long total = totalCount.incrementAndGet(); - CIRCUIT_BREAKER_REQUEST_IN_WINDOW.labels(TOTAL_LABEL).set(total); + totalCount.incrementAndGet(); - long error = errorCount.incrementAndGet(); - CIRCUIT_BREAKER_REQUEST_IN_WINDOW.labels(ERROR_LABEL).set(error); + errorCount.incrementAndGet(); } public HealthCounts getHealthCounts() { From 929f3db66c6794ba9b8a4fd042de67c4bf33154d Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Wed, 8 Dec 2021 17:29:00 +0800 Subject: [PATCH 12/17] address code review Signed-off-by: marsishandsome --- .../failsafe/CircuitBreakerMetricsImpl.java | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/src/main/java/org/tikv/service/failsafe/CircuitBreakerMetricsImpl.java b/src/main/java/org/tikv/service/failsafe/CircuitBreakerMetricsImpl.java index 896dfbd670e..cf34d5ea716 100644 --- a/src/main/java/org/tikv/service/failsafe/CircuitBreakerMetricsImpl.java +++ b/src/main/java/org/tikv/service/failsafe/CircuitBreakerMetricsImpl.java @@ -52,7 +52,7 @@ public CircuitBreakerMetricsImpl(int windowInSeconds) { .build()); scheduler.scheduleAtFixedRate( - this::checkTimeout, SCHEDULER_INITIAL_DELAY, SCHEDULER_PERIOD, TimeUnit.MILLISECONDS); + this::onReachCircuitWindow, SCHEDULER_INITIAL_DELAY, SCHEDULER_PERIOD, TimeUnit.MILLISECONDS); } @Override @@ -65,16 +65,18 @@ public void recordFailure() { currentMetrics.get().recordFailure(); } - private void checkTimeout() { + private void onReachCircuitWindow() { SingleWindowMetrics singleWindowMetrics = currentMetrics.get(); - if (System.currentTimeMillis() >= singleWindowMetrics.getStartMS() + windowInMS) { - if (currentMetrics.compareAndSet(singleWindowMetrics, new SingleWindowMetrics())) { - logger.info("window timeout, reset SingleWindowMetrics"); - HealthCounts healthCounts = singleWindowMetrics.getHealthCounts(); - for (MetricsListener metricsListener : listeners) { - metricsListener.onNext(healthCounts); - } - } + if (System.currentTimeMillis() < singleWindowMetrics.getStartMS() + windowInMS) { + return; + } + if (!currentMetrics.compareAndSet(singleWindowMetrics, new SingleWindowMetrics())) { + return; + } + logger.info("window timeout, reset SingleWindowMetrics"); + HealthCounts healthCounts = singleWindowMetrics.getHealthCounts(); + for (MetricsListener metricsListener : listeners) { + metricsListener.onNext(healthCounts); } } From 9932daba652f9754253da551de41783fc55bf9b8 Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Wed, 8 Dec 2021 17:43:54 +0800 Subject: [PATCH 13/17] fix close problem Signed-off-by: marsishandsome --- .../java/org/tikv/raw/SmartRawKVClient.java | 18 ++---------------- .../failsafe/CircuitBreakerMetricsImpl.java | 5 ++++- 2 files changed, 6 insertions(+), 17 deletions(-) diff --git a/src/main/java/org/tikv/raw/SmartRawKVClient.java b/src/main/java/org/tikv/raw/SmartRawKVClient.java index f6824bc5e72..847e003f8e8 100644 --- a/src/main/java/org/tikv/raw/SmartRawKVClient.java +++ b/src/main/java/org/tikv/raw/SmartRawKVClient.java @@ -263,22 +263,8 @@ private void callWithCircuitBreaker(String funcName, Function0 func) { @Override public void close() throws Exception { - Exception err = null; - try { - client.close(); - } catch (Exception e) { - err = e; - } - - try { - circuitBreaker.close(); - } catch (Exception e) { - err = e; - } - - if (err != null) { - throw err; - } + circuitBreaker.close(); + client.close(); } public interface Function1 { diff --git a/src/main/java/org/tikv/service/failsafe/CircuitBreakerMetricsImpl.java b/src/main/java/org/tikv/service/failsafe/CircuitBreakerMetricsImpl.java index cf34d5ea716..f4a3407da09 100644 --- a/src/main/java/org/tikv/service/failsafe/CircuitBreakerMetricsImpl.java +++ b/src/main/java/org/tikv/service/failsafe/CircuitBreakerMetricsImpl.java @@ -52,7 +52,10 @@ public CircuitBreakerMetricsImpl(int windowInSeconds) { .build()); scheduler.scheduleAtFixedRate( - this::onReachCircuitWindow, SCHEDULER_INITIAL_DELAY, SCHEDULER_PERIOD, TimeUnit.MILLISECONDS); + this::onReachCircuitWindow, + SCHEDULER_INITIAL_DELAY, + SCHEDULER_PERIOD, + TimeUnit.MILLISECONDS); } @Override From 0015363f4c571518198afc3017ec590b68155111 Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Wed, 8 Dec 2021 17:57:32 +0800 Subject: [PATCH 14/17] update log level Signed-off-by: marsishandsome --- src/main/java/org/tikv/raw/SmartRawKVClient.java | 8 ++++---- .../org/tikv/service/failsafe/CircuitBreakerImpl.java | 2 +- .../tikv/service/failsafe/CircuitBreakerMetricsImpl.java | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/tikv/raw/SmartRawKVClient.java b/src/main/java/org/tikv/raw/SmartRawKVClient.java index 847e003f8e8..a616f05a8e6 100644 --- a/src/main/java/org/tikv/raw/SmartRawKVClient.java +++ b/src/main/java/org/tikv/raw/SmartRawKVClient.java @@ -231,21 +231,21 @@ private T callWithCircuitBreaker0(String funcName, Function1 func) { throw e; } } else if (circuitBreaker.attemptExecution()) { - logger.info("attemptExecution"); + logger.debug("attemptExecution"); try { T result = func.apply(); circuitBreakerMetrics.recordSuccess(); circuitBreaker.recordAttemptSuccess(); - logger.info("markSuccess"); + logger.debug("markSuccess"); return result; } catch (Exception e) { circuitBreakerMetrics.recordFailure(); circuitBreaker.recordAttemptFailure(); - logger.info("markNonSuccess"); + logger.debug("markNonSuccess"); throw e; } } else { - logger.warn("Circuit Breaker Opened"); + logger.debug("Circuit Breaker Opened"); CIRCUIT_BREAKER_OPENED.labels(funcName).inc(); throw new CircuitBreakerOpenException(); } diff --git a/src/main/java/org/tikv/service/failsafe/CircuitBreakerImpl.java b/src/main/java/org/tikv/service/failsafe/CircuitBreakerImpl.java index 1dd3249b95c..074ed298d89 100644 --- a/src/main/java/org/tikv/service/failsafe/CircuitBreakerImpl.java +++ b/src/main/java/org/tikv/service/failsafe/CircuitBreakerImpl.java @@ -76,7 +76,7 @@ public CircuitBreakerImpl( private MetricsListener getMetricsListener() { return hc -> { - logger.info("onNext " + hc.toString()); + logger.debug("onNext " + hc.toString()); // check if we are past the requestVolumeThreshold if (hc.getTotalRequests() < requestVolumeThreshold) { // we are not past the minimum volume threshold for the stat window, diff --git a/src/main/java/org/tikv/service/failsafe/CircuitBreakerMetricsImpl.java b/src/main/java/org/tikv/service/failsafe/CircuitBreakerMetricsImpl.java index f4a3407da09..da497efcb71 100644 --- a/src/main/java/org/tikv/service/failsafe/CircuitBreakerMetricsImpl.java +++ b/src/main/java/org/tikv/service/failsafe/CircuitBreakerMetricsImpl.java @@ -76,7 +76,7 @@ private void onReachCircuitWindow() { if (!currentMetrics.compareAndSet(singleWindowMetrics, new SingleWindowMetrics())) { return; } - logger.info("window timeout, reset SingleWindowMetrics"); + logger.debug("window timeout, reset SingleWindowMetrics"); HealthCounts healthCounts = singleWindowMetrics.getHealthCounts(); for (MetricsListener metricsListener : listeners) { metricsListener.onNext(healthCounts); From cddd86fb72aa70390131c6148e742974c54bc6d6 Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Wed, 8 Dec 2021 18:01:00 +0800 Subject: [PATCH 15/17] address code review Signed-off-by: marsishandsome --- src/main/java/org/tikv/raw/SmartRawKVClient.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/tikv/raw/SmartRawKVClient.java b/src/main/java/org/tikv/raw/SmartRawKVClient.java index a616f05a8e6..80bb572c819 100644 --- a/src/main/java/org/tikv/raw/SmartRawKVClient.java +++ b/src/main/java/org/tikv/raw/SmartRawKVClient.java @@ -65,12 +65,10 @@ public class SmartRawKVClient implements RawKVClientBase { private final RawKVClientBase client; private final CircuitBreaker circuitBreaker; - private final CircuitBreakerMetrics circuitBreakerMetrics; public SmartRawKVClient(RawKVClientBase client, TiConfiguration conf) { this.client = client; this.circuitBreaker = new CircuitBreakerImpl(conf); - this.circuitBreakerMetrics = this.circuitBreaker.getMetrics(); } @Override @@ -224,22 +222,22 @@ private T callWithCircuitBreaker0(String funcName, Function1 func) { if (circuitBreaker.allowRequest()) { try { T result = func.apply(); - circuitBreakerMetrics.recordSuccess(); + circuitBreaker.getMetrics().recordSuccess(); return result; } catch (Exception e) { - circuitBreakerMetrics.recordFailure(); + circuitBreaker.getMetrics().recordFailure(); throw e; } } else if (circuitBreaker.attemptExecution()) { logger.debug("attemptExecution"); try { T result = func.apply(); - circuitBreakerMetrics.recordSuccess(); + circuitBreaker.getMetrics().recordSuccess(); circuitBreaker.recordAttemptSuccess(); logger.debug("markSuccess"); return result; } catch (Exception e) { - circuitBreakerMetrics.recordFailure(); + circuitBreaker.getMetrics().recordFailure(); circuitBreaker.recordAttemptFailure(); logger.debug("markNonSuccess"); throw e; From b6e8636ff365198af74efc99077844d0459caaf7 Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Wed, 8 Dec 2021 18:14:24 +0800 Subject: [PATCH 16/17] format Signed-off-by: marsishandsome --- src/main/java/org/tikv/raw/SmartRawKVClient.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/org/tikv/raw/SmartRawKVClient.java b/src/main/java/org/tikv/raw/SmartRawKVClient.java index 80bb572c819..33a8981e81b 100644 --- a/src/main/java/org/tikv/raw/SmartRawKVClient.java +++ b/src/main/java/org/tikv/raw/SmartRawKVClient.java @@ -30,7 +30,6 @@ import org.tikv.kvproto.Kvrpcpb; import org.tikv.service.failsafe.CircuitBreaker; import org.tikv.service.failsafe.CircuitBreakerImpl; -import org.tikv.service.failsafe.CircuitBreakerMetrics; public class SmartRawKVClient implements RawKVClientBase { private static final Logger logger = LoggerFactory.getLogger(SmartRawKVClient.class); From 5fcf19b7c70f938f938742531498df6f2f1b454a Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Wed, 8 Dec 2021 23:04:33 +0800 Subject: [PATCH 17/17] use NoOpCircuitBreakerMetrics if enable=false Signed-off-by: marsishandsome --- .../service/failsafe/CircuitBreakerImpl.java | 3 +- .../failsafe/NoOpCircuitBreakerMetrics.java | 40 +++++++++++++++++++ 2 files changed, 42 insertions(+), 1 deletion(-) create mode 100644 src/main/java/org/tikv/service/failsafe/NoOpCircuitBreakerMetrics.java diff --git a/src/main/java/org/tikv/service/failsafe/CircuitBreakerImpl.java b/src/main/java/org/tikv/service/failsafe/CircuitBreakerImpl.java index 074ed298d89..7f2231bd7ab 100644 --- a/src/main/java/org/tikv/service/failsafe/CircuitBreakerImpl.java +++ b/src/main/java/org/tikv/service/failsafe/CircuitBreakerImpl.java @@ -70,7 +70,8 @@ public CircuitBreakerImpl( this.requestVolumeThreshold = requestVolumeThreshold; this.sleepWindowInSeconds = sleepWindowInSeconds; this.attemptRequestCount = attemptRequestCount; - this.metrics = new CircuitBreakerMetricsImpl(windowInSeconds); + this.metrics = + enable ? new CircuitBreakerMetricsImpl(windowInSeconds) : new NoOpCircuitBreakerMetrics(); this.metrics.addListener(getMetricsListener()); } diff --git a/src/main/java/org/tikv/service/failsafe/NoOpCircuitBreakerMetrics.java b/src/main/java/org/tikv/service/failsafe/NoOpCircuitBreakerMetrics.java new file mode 100644 index 00000000000..e5474fb6e12 --- /dev/null +++ b/src/main/java/org/tikv/service/failsafe/NoOpCircuitBreakerMetrics.java @@ -0,0 +1,40 @@ +/* + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.tikv.service.failsafe; + +import java.io.IOException; + +public class NoOpCircuitBreakerMetrics implements CircuitBreakerMetrics { + @Override + public void recordSuccess() { + // do nothing + } + + @Override + public void recordFailure() { + // do nothing + } + + @Override + public void addListener(MetricsListener metricsListener) { + // do nothing + } + + @Override + public void close() throws IOException { + // do nothing + } +}