From cf18ce849ae750367ed5787512b5baa8cfe436f6 Mon Sep 17 00:00:00 2001 From: Andy Lok Date: Fri, 11 Jun 2021 16:32:49 +0800 Subject: [PATCH 01/12] Add CompareAndSet for RawClient and make Get returns Optional Signed-off-by: Andy Lok --- pom.xml | 6 +- .../exception/RawCASConflictException.java | 50 +++++++++++++++++ .../tikv/common/region/RegionStoreClient.java | 46 ++++++++++----- src/main/java/org/tikv/raw/RawKVClient.java | 30 +++++----- .../tikv/common/RegionStoreClientTest.java | 7 ++- .../java/org/tikv/raw/RawKVClientTest.java | 56 +++++++++---------- 6 files changed, 129 insertions(+), 66 deletions(-) create mode 100644 src/main/java/org/tikv/common/exception/RawCASConflictException.java diff --git a/pom.xml b/pom.xml index 4e4cff5be70..90e7cf0c466 100644 --- a/pom.xml +++ b/pom.xml @@ -57,6 +57,8 @@ + 1.8 + 1.8 UTF-8 UTF-8 3.5.1 @@ -64,7 +66,7 @@ 1.7.16 1.24.0 1.6.6 - 2.10.0 + 2.12.3 3.0.1 0.4.1 2.9.9 @@ -386,7 +388,7 @@ org.apache.maven.plugins maven-javadoc-plugin - 2.9.1 + 3.0.1 ${javadoc.skip} diff --git a/src/main/java/org/tikv/common/exception/RawCASConflictException.java b/src/main/java/org/tikv/common/exception/RawCASConflictException.java new file mode 100644 index 00000000000..5d3bd066d69 --- /dev/null +++ b/src/main/java/org/tikv/common/exception/RawCASConflictException.java @@ -0,0 +1,50 @@ +/* + * Copyright 2020 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.tikv.common.exception; + +import com.google.protobuf.ByteString; +import java.util.Optional; +import org.tikv.common.codec.KeyUtils; + +public class RawCASConflictException extends RuntimeException { + + private final ByteString key; + private final Optional expectedPrevValue; + private final Optional currValue; + + public RawCASConflictException( + ByteString key, Optional expectedPrevValue, Optional currValue) { + super( + String.format( + "key=%s expectedPrevValue=%s currValue=%s", + KeyUtils.formatBytes(key), expectedPrevValue, currValue)); + this.key = key; + this.expectedPrevValue = expectedPrevValue; + this.currValue = currValue; + } + + public ByteString getKey() { + return this.key; + } + + public Optional getExpectedPrevValue() { + return this.expectedPrevValue; + } + + public Optional getCurrValue() { + return this.currValue; + } +} diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java index 6ea51aa9cb8..ff48887afba 100644 --- a/src/main/java/org/tikv/common/region/RegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java @@ -796,7 +796,7 @@ public List splitRegion(Iterable splitKeys) { // APIs for Raw Scan/Put/Get/Delete - public ByteString rawGet(BackOffer backOffer, ByteString key) { + public Optional rawGet(BackOffer backOffer, ByteString key) { Histogram.Timer requestTimer = GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_get").startTimer(); try { @@ -816,7 +816,7 @@ public ByteString rawGet(BackOffer backOffer, ByteString key) { } } - private ByteString rawGetHelper(RawGetResponse resp) { + private Optional rawGetHelper(RawGetResponse resp) { if (resp == null) { this.regionManager.onRequestFail(region); throw new TiClientInternalException("RawGetResponse failed without a cause"); @@ -828,10 +828,14 @@ private ByteString rawGetHelper(RawGetResponse resp) { if (resp.hasRegionError()) { throw new RegionException(resp.getRegionError()); } - return resp.getValue(); + if (resp.getNotFound()) { + return Optional.empty(); + } else { + return Optional.of(resp.getValue()); + } } - public Long rawGetKeyTTL(BackOffer backOffer, ByteString key) { + public Optional rawGetKeyTTL(BackOffer backOffer, ByteString key) { Histogram.Timer requestTimer = GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_get_key_ttl").startTimer(); try { @@ -852,7 +856,7 @@ public Long rawGetKeyTTL(BackOffer backOffer, ByteString key) { } } - private Long rawGetKeyTTLHelper(RawGetKeyTTLResponse resp) { + private Optional rawGetKeyTTLHelper(RawGetKeyTTLResponse resp) { if (resp == null) { this.regionManager.onRequestFail(region); throw new TiClientInternalException("RawGetResponse failed without a cause"); @@ -865,9 +869,9 @@ private Long rawGetKeyTTLHelper(RawGetKeyTTLResponse resp) { throw new RegionException(resp.getRegionError()); } if (resp.getNotFound()) { - return null; + return Optional.empty(); } - return resp.getTtl(); + return Optional.of(resp.getTtl()); } public void rawDelete(BackOffer backOffer, ByteString key) { @@ -943,8 +947,13 @@ private void rawPutHelper(RawPutResponse resp) { } } - public ByteString rawPutIfAbsent( - BackOffer backOffer, ByteString key, ByteString value, long ttl) { + public void rawCompareAndSet( + BackOffer backOffer, + ByteString key, + Optional prevValue, + ByteString value, + long ttl) + throws RawCASConflictException { Histogram.Timer requestTimer = GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_put_if_absent").startTimer(); try { @@ -954,7 +963,8 @@ public ByteString rawPutIfAbsent( .setContext(region.getReplicaContext(storeType)) .setKey(key) .setValue(value) - .setPreviousNotExist(true) + .setPreviousValue(prevValue.orElse(ByteString.EMPTY)) + .setPreviousNotExist(!prevValue.isPresent()) .setTtl(ttl) .build(); @@ -963,13 +973,15 @@ public ByteString rawPutIfAbsent( regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null); RawCASResponse resp = callWithRetry(backOffer, TikvGrpc.getRawCompareAndSwapMethod(), factory, handler); - return rawPutIfAbsentHelper(resp); + rawCompareAndSetHelper(key, prevValue, resp); } finally { requestTimer.observeDuration(); } } - private ByteString rawPutIfAbsentHelper(RawCASResponse resp) { + private void rawCompareAndSetHelper( + ByteString key, Optional expectedPrevValue, RawCASResponse resp) + throws RawCASConflictException { if (resp == null) { this.regionManager.onRequestFail(region); throw new TiClientInternalException("RawPutResponse failed without a cause"); @@ -981,10 +993,14 @@ private ByteString rawPutIfAbsentHelper(RawCASResponse resp) { if (resp.hasRegionError()) { throw new RegionException(resp.getRegionError()); } - if (resp.getSucceed()) { - return ByteString.EMPTY; + if (!resp.getSucceed()) { + if (resp.getPreviousNotExist()) { + throw new RawCASConflictException(key, expectedPrevValue, Optional.empty()); + } else { + throw new RawCASConflictException( + key, expectedPrevValue, Optional.of(resp.getPreviousValue())); + } } - return resp.getPreviousValue(); } public List rawBatchGet(BackOffer backoffer, List keys) { diff --git a/src/main/java/org/tikv/raw/RawKVClient.java b/src/main/java/org/tikv/raw/RawKVClient.java index 9e590e20f5b..26c36777325 100644 --- a/src/main/java/org/tikv/raw/RawKVClient.java +++ b/src/main/java/org/tikv/raw/RawKVClient.java @@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory; import org.tikv.common.TiConfiguration; import org.tikv.common.TiSession; +import org.tikv.common.exception.RawCASConflictException; import org.tikv.common.exception.TiKVException; import org.tikv.common.key.Key; import org.tikv.common.operation.iterator.RawScanIterator; @@ -135,37 +136,34 @@ public void put(ByteString key, ByteString value, long ttl) { } /** - * Put a key-value pair if it does not exist. This API is atomic. + * Put a key-value pair if the prevValue matched the value in TiKV. This API is atomic. * * @param key key * @param value value - * @return a ByteString. returns ByteString.EMPTY if the value is written successfully. returns - * the previous key if the value already exists, and does not write to TiKV. */ - public ByteString putIfAbsent(ByteString key, ByteString value) { - return putIfAbsent(key, value, 0L); + public void compareAndSet(ByteString key, Optional prevValue, ByteString value) { + compareAndSet(key, prevValue, value, 0L); } /** - * Put a key-value pair with TTL if it does not exist. This API is atomic. + * Put a key-value pair if the prevValue matched the value in TiKV. This API is atomic. * * @param key key * @param value value * @param ttl TTL of key (in seconds), 0 means the key will never be outdated. - * @return a ByteString. returns ByteString.EMPTY if the value is written successfully. returns - * the previous key if the value already exists, and does not write to TiKV. */ - public ByteString putIfAbsent(ByteString key, ByteString value, long ttl) { - String label = "client_raw_put_if_absent"; + public void compareAndSet( + ByteString key, Optional prevValue, ByteString value, long ttl) + throws RawCASConflictException { + String label = "client_raw_compare_and_swap"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); try { BackOffer backOffer = defaultBackOff(); while (true) { RegionStoreClient client = clientBuilder.build(key); try { - ByteString result = client.rawPutIfAbsent(backOffer, key, value, ttl); + client.rawCompareAndSet(backOffer, key, prevValue, value, ttl); RAW_REQUEST_SUCCESS.labels(label).inc(); - return result; } catch (final TiKVException e) { backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e); } @@ -236,7 +234,7 @@ private void batchPut(Map kvPairs, long ttl, boolean ato * @param key raw key * @return a ByteString value if key exists, ByteString.EMPTY if key does not exist */ - public ByteString get(ByteString key) { + public Optional get(ByteString key) { String label = "client_raw_get"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); try { @@ -244,7 +242,7 @@ public ByteString get(ByteString key) { while (true) { RegionStoreClient client = clientBuilder.build(key); try { - ByteString result = client.rawGet(defaultBackOff(), key); + Optional result = client.rawGet(defaultBackOff(), key); RAW_REQUEST_SUCCESS.labels(label).inc(); return result; } catch (final TiKVException e) { @@ -322,7 +320,7 @@ private void batchDelete(List keys, boolean atomic) { * @return a Long indicating the TTL of key ttl is a non-null long value indicating TTL if key * exists. - ttl=0 if the key will never be outdated. - ttl=null if the key does not exist */ - public Long getKeyTTL(ByteString key) { + public Optional getKeyTTL(ByteString key) { String label = "client_raw_get_key_ttl"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); try { @@ -330,7 +328,7 @@ public Long getKeyTTL(ByteString key) { while (true) { RegionStoreClient client = clientBuilder.build(key); try { - Long result = client.rawGetKeyTTL(defaultBackOff(), key); + Optional result = client.rawGetKeyTTL(defaultBackOff(), key); RAW_REQUEST_SUCCESS.labels(label).inc(); return result; } catch (final TiKVException e) { diff --git a/src/test/java/org/tikv/common/RegionStoreClientTest.java b/src/test/java/org/tikv/common/RegionStoreClientTest.java index e74c5823aef..670b36ccf69 100644 --- a/src/test/java/org/tikv/common/RegionStoreClientTest.java +++ b/src/test/java/org/tikv/common/RegionStoreClientTest.java @@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableList; import com.google.protobuf.ByteString; import java.util.List; +import java.util.Optional; import org.junit.Test; import org.tikv.common.region.RegionManager; import org.tikv.common.region.RegionStoreClient; @@ -65,13 +66,13 @@ public void rawGetTest() throws Exception { public void doRawGetTest(RegionStoreClient client) throws Exception { server.put("key1", "value1"); - ByteString value = client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("key1")); - assertEquals(ByteString.copyFromUtf8("value1"), value); + Optional value = client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("key1")); + assertEquals(ByteString.copyFromUtf8("value1"), value.get()); server.putError("error1", KVMockServer.NOT_LEADER); // since not_leader is retryable, so the result should be correct. value = client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("key1")); - assertEquals(ByteString.copyFromUtf8("value1"), value); + assertEquals(ByteString.copyFromUtf8("value1"), value.get()); server.putError("failure", KVMockServer.STALE_EPOCH); try { diff --git a/src/test/java/org/tikv/raw/RawKVClientTest.java b/src/test/java/org/tikv/raw/RawKVClientTest.java index 968a77f8a94..dcb7a961df6 100644 --- a/src/test/java/org/tikv/raw/RawKVClientTest.java +++ b/src/test/java/org/tikv/raw/RawKVClientTest.java @@ -15,6 +15,7 @@ import org.tikv.common.TiConfiguration; import org.tikv.common.TiSession; import org.tikv.common.codec.KeyUtils; +import org.tikv.common.exception.RawCASConflictException; import org.tikv.common.exception.TiKVException; import org.tikv.common.key.Key; import org.tikv.common.util.FastByteComparisons; @@ -102,17 +103,12 @@ public void atomicAPITest() { ByteString value = ByteString.copyFromUtf8("value"); ByteString value2 = ByteString.copyFromUtf8("value2"); client.delete(key); - ByteString res1 = client.putIfAbsent(key, value, ttl); - assert res1.isEmpty(); - ByteString res2 = client.putIfAbsent(key, value2, ttl); - assert res2.equals(value); + client.compareAndSet(key, null, value, ttl); try { - Thread.sleep(ttl * 1000); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + client.compareAndSet(key, null, value2, ttl); + } catch (RawCASConflictException err) { + assert err.getCurrValue() == Optional.of(value); } - ByteString res3 = client.putIfAbsent(key, value, ttl); - assert res3.isEmpty(); } // tikv-4.0 doest not support ttl @@ -124,19 +120,19 @@ public void getKeyTTLTest() { ByteString value = ByteString.copyFromUtf8("value"); client.put(key, value, ttl); for (int i = 0; i < 9; i++) { - Long t = client.getKeyTTL(key); - logger.info("current ttl of key is " + t); + Optional t = client.getKeyTTL(key); + logger.info("current ttl of key is " + t.orElse(null)); try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } - Long t = client.getKeyTTL(key); - if (t == null) { - logger.info("key outdated."); + Optional t = client.getKeyTTL(key); + if (t.isPresent()) { + logger.info("key not outdated: " + t.get()); } else { - logger.info("key not outdated: " + t); + logger.info("key outdated."); } } @@ -267,8 +263,8 @@ public void simpleTest() { Kvrpcpb.KvPair kv2 = Kvrpcpb.KvPair.newBuilder().setKey(key2).setValue(value2).build(); try { - checkEmpty(key1); - checkEmpty(key2); + checkNotExist(key1); + checkNotExist(key2); checkPut(key1, value1); checkPut(key2, value2); List result = new ArrayList<>(); @@ -513,7 +509,7 @@ private void rawGetTest(int getCases, boolean benchmark) { } else { int i = 0; for (Map.Entry pair : data.entrySet()) { - assert client.get(pair.getKey()).equals(pair.getValue()); + assert client.get(pair.getKey()).equals(Optional.of(pair.getValue())); i++; if (i >= getCases) { break; @@ -772,13 +768,13 @@ private void checkBatchGet(List keys) { private void checkPut(ByteString key, ByteString value) { client.put(key, value); - assert client.get(key).equals(value); + assert client.get(key).orElse(null).equals(value); } private void checkBatchPut(Map kvPairs) { client.batchPut(kvPairs); for (Map.Entry kvPair : kvPairs.entrySet()) { - assert client.get(kvPair.getKey()).equals(kvPair.getValue()); + assert client.get(kvPair.getKey()).orElse(null).equals(kvPair.getValue()); } } @@ -840,7 +836,7 @@ private void checkBatchScanKeys(List> ranges) { private void checkDelete(ByteString key) { client.delete(key); - checkEmpty(key); + checkNotExist(key); } private void checkDeleteRange(ByteString startKey, ByteString endKey) { @@ -853,26 +849,26 @@ private void checkDeleteRange(ByteString startKey, ByteString endKey) { private void checkPutTTL(ByteString key, ByteString value, long ttl) { client.put(key, value, ttl); - assert client.get(key).equals(value); + assert client.get(key).orElse(null).equals(value); } private void checkGetKeyTTL(ByteString key, long ttl) { - Long t = client.getKeyTTL(key); - assert t != null; - assert t <= ttl && t > 0; + Optional t = client.getKeyTTL(key); + assert t.isPresent(); + assert t.get() <= ttl && t.get() > 0; } private void checkGetTTLTimeOut(ByteString key) { - assert client.get(key).isEmpty(); + assert !client.get(key).isPresent(); } private void checkGetKeyTTLTimeOut(ByteString key) { - Long t = client.getKeyTTL(key); - assert t == null; + Optional t = client.getKeyTTL(key); + assert !t.isPresent(); } - private void checkEmpty(ByteString key) { - assert client.get(key).isEmpty(); + private void checkNotExist(ByteString key) { + assert !client.get(key).isPresent(); } private static ByteString rawKey(String key) { From 203ce5146a64684842e9cbb8a195396e8d172b69 Mon Sep 17 00:00:00 2001 From: Andy Lok Date: Fri, 11 Jun 2021 18:17:59 +0800 Subject: [PATCH 02/12] Apply suggestions from code review Signed-off-by: Andy Lok Co-authored-by: Liangliang Gu --- .../java/org/tikv/common/exception/RawCASConflictException.java | 2 +- src/main/java/org/tikv/raw/RawKVClient.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/tikv/common/exception/RawCASConflictException.java b/src/main/java/org/tikv/common/exception/RawCASConflictException.java index 5d3bd066d69..73512f8efd7 100644 --- a/src/main/java/org/tikv/common/exception/RawCASConflictException.java +++ b/src/main/java/org/tikv/common/exception/RawCASConflictException.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 PingCAP, Inc. + * Copyright 2021 PingCAP, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/tikv/raw/RawKVClient.java b/src/main/java/org/tikv/raw/RawKVClient.java index 26c36777325..ecdc8c6943a 100644 --- a/src/main/java/org/tikv/raw/RawKVClient.java +++ b/src/main/java/org/tikv/raw/RawKVClient.java @@ -141,7 +141,7 @@ public void put(ByteString key, ByteString value, long ttl) { * @param key key * @param value value */ - public void compareAndSet(ByteString key, Optional prevValue, ByteString value) { + public void compareAndSet(ByteString key, Optional prevValue, ByteString value) throws RawCASConflictException { compareAndSet(key, prevValue, value, 0L); } From 9c14257ff49a62608687ba04282c19cb21c6a34f Mon Sep 17 00:00:00 2001 From: Andy Lok Date: Fri, 11 Jun 2021 18:30:53 +0800 Subject: [PATCH 03/12] Format code Signed-off-by: Andy Lok --- .classpath | 73 +++++++++++++++++++++ .settings/org.eclipse.core.resources.prefs | 9 +++ .settings/org.eclipse.jdt.apt.core.prefs | 2 + .settings/org.eclipse.jdt.core.prefs | 9 +++ .settings/org.eclipse.m2e.core.prefs | 4 ++ .vscode/settings.json | 3 + src/main/java/org/tikv/raw/RawKVClient.java | 3 +- 7 files changed, 102 insertions(+), 1 deletion(-) create mode 100644 .classpath create mode 100644 .settings/org.eclipse.core.resources.prefs create mode 100644 .settings/org.eclipse.jdt.apt.core.prefs create mode 100644 .settings/org.eclipse.jdt.core.prefs create mode 100644 .settings/org.eclipse.m2e.core.prefs create mode 100644 .vscode/settings.json diff --git a/.classpath b/.classpath new file mode 100644 index 00000000000..c30d349d92d --- /dev/null +++ b/.classpath @@ -0,0 +1,73 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/.settings/org.eclipse.core.resources.prefs b/.settings/org.eclipse.core.resources.prefs new file mode 100644 index 00000000000..7ad0634b708 --- /dev/null +++ b/.settings/org.eclipse.core.resources.prefs @@ -0,0 +1,9 @@ +eclipse.preferences.version=1 +encoding//src/main/java=UTF-8 +encoding//src/test/java=UTF-8 +encoding//src/test/resources=UTF-8 +encoding//target/generated-sources/antlr4/java/org/tikv/common/parser=UTF-8 +encoding//target/generated-sources/protobuf/grpc-java=UTF-8 +encoding//target/generated-sources/protobuf/java=UTF-8 +encoding/=UTF-8 +encoding/proto=UTF-8 diff --git a/.settings/org.eclipse.jdt.apt.core.prefs b/.settings/org.eclipse.jdt.apt.core.prefs new file mode 100644 index 00000000000..d4313d4b25e --- /dev/null +++ b/.settings/org.eclipse.jdt.apt.core.prefs @@ -0,0 +1,2 @@ +eclipse.preferences.version=1 +org.eclipse.jdt.apt.aptEnabled=false diff --git a/.settings/org.eclipse.jdt.core.prefs b/.settings/org.eclipse.jdt.core.prefs new file mode 100644 index 00000000000..1b6e1ef22f9 --- /dev/null +++ b/.settings/org.eclipse.jdt.core.prefs @@ -0,0 +1,9 @@ +eclipse.preferences.version=1 +org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8 +org.eclipse.jdt.core.compiler.compliance=1.8 +org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled +org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning +org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=ignore +org.eclipse.jdt.core.compiler.processAnnotations=disabled +org.eclipse.jdt.core.compiler.release=disabled +org.eclipse.jdt.core.compiler.source=1.8 diff --git a/.settings/org.eclipse.m2e.core.prefs b/.settings/org.eclipse.m2e.core.prefs new file mode 100644 index 00000000000..f897a7f1cb2 --- /dev/null +++ b/.settings/org.eclipse.m2e.core.prefs @@ -0,0 +1,4 @@ +activeProfiles= +eclipse.preferences.version=1 +resolveWorkspaceProjects=true +version=1 diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000000..c5f3f6b9c75 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "java.configuration.updateBuildConfiguration": "interactive" +} \ No newline at end of file diff --git a/src/main/java/org/tikv/raw/RawKVClient.java b/src/main/java/org/tikv/raw/RawKVClient.java index ecdc8c6943a..f5688325e2f 100644 --- a/src/main/java/org/tikv/raw/RawKVClient.java +++ b/src/main/java/org/tikv/raw/RawKVClient.java @@ -141,7 +141,8 @@ public void put(ByteString key, ByteString value, long ttl) { * @param key key * @param value value */ - public void compareAndSet(ByteString key, Optional prevValue, ByteString value) throws RawCASConflictException { + public void compareAndSet(ByteString key, Optional prevValue, ByteString value) + throws RawCASConflictException { compareAndSet(key, prevValue, value, 0L); } From 7a102b63c27f949994f48f9a7eef42b63e20dfbd Mon Sep 17 00:00:00 2001 From: Andy Lok Date: Tue, 15 Jun 2021 13:27:22 +0800 Subject: [PATCH 04/12] Add putIfAbsent Signed-off-by: Andy Lok --- src/main/java/org/tikv/raw/RawKVClient.java | 32 ++++++++++++++++++- .../java/org/tikv/raw/RawKVClientTest.java | 24 +++++++++++++- 2 files changed, 54 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/tikv/raw/RawKVClient.java b/src/main/java/org/tikv/raw/RawKVClient.java index f5688325e2f..7401ab4a432 100644 --- a/src/main/java/org/tikv/raw/RawKVClient.java +++ b/src/main/java/org/tikv/raw/RawKVClient.java @@ -135,6 +135,36 @@ public void put(ByteString key, ByteString value, long ttl) { } } + /** + * Put a key-value pair if it does not exist. This API is atomic. + * + * @param key key + * @param value value + * @return a ByteString. returns Optional.EMPTY if the value is written successfully. returns the + * previous key if the value already exists, and does not write to TiKV. + */ + public Optional putIfAbsent(ByteString key, ByteString value) { + return putIfAbsent(key, value, 0L); + } + + /** + * Put a key-value pair with TTL if it does not exist. This API is atomic. + * + * @param key key + * @param value value + * @param ttl TTL of key (in seconds), 0 means the key will never be outdated. + * @return a ByteString. returns Optional.EMPTY if the value is written successfully. returns the + * previous key if the value already exists, and does not write to TiKV. + */ + public Optional putIfAbsent(ByteString key, ByteString value, long ttl) { + try { + compareAndSet(key, Optional.empty(), value, ttl); + return Optional.empty(); + } catch (RawCASConflictException e) { + return e.getCurrValue(); + } + } + /** * Put a key-value pair if the prevValue matched the value in TiKV. This API is atomic. * @@ -147,7 +177,7 @@ public void compareAndSet(ByteString key, Optional prevValue, ByteSt } /** - * Put a key-value pair if the prevValue matched the value in TiKV. This API is atomic. + * pair if the prevValue matched the value in TiKV. This API is atomic. * * @param key key * @param value value diff --git a/src/test/java/org/tikv/raw/RawKVClientTest.java b/src/test/java/org/tikv/raw/RawKVClientTest.java index dcb7a961df6..48287bca832 100644 --- a/src/test/java/org/tikv/raw/RawKVClientTest.java +++ b/src/test/java/org/tikv/raw/RawKVClientTest.java @@ -96,7 +96,7 @@ public void tearDown() throws Exception { // tikv-4.0 does not support atomic api @Ignore - public void atomicAPITest() { + public void rawCASTest() { if (!initialized) return; long ttl = 10; ByteString key = ByteString.copyFromUtf8("key_atomic"); @@ -111,6 +111,28 @@ public void atomicAPITest() { } } + // tikv-4.0 does not support atomic api + @Ignore + public void rawPutIfAbsentTest() { + if (!initialized) return; + long ttl = 10; + ByteString key = ByteString.copyFromUtf8("key_atomic"); + ByteString value = ByteString.copyFromUtf8("value"); + ByteString value2 = ByteString.copyFromUtf8("value2"); + client.delete(key); + ByteString res1 = client.putIfAbsent(key, value, ttl); + assert res1.isEmpty(); + ByteString res2 = client.putIfAbsent(key, value2, ttl); + assert res2.equals(value); + try { + Thread.sleep(ttl * 1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + ByteString res3 = client.putIfAbsent(key, value, ttl); + assert res3.isEmpty(); + } + // tikv-4.0 doest not support ttl @Ignore public void getKeyTTLTest() { From 438bdf9cc2b4b5700b0527676f1ddb219aba0d2e Mon Sep 17 00:00:00 2001 From: Andy Lok Date: Tue, 15 Jun 2021 15:58:48 +0800 Subject: [PATCH 05/12] Rename sth Signed-off-by: Andy Lok --- .../common/exception/RawCASConflictException.java | 14 +++++++------- src/main/java/org/tikv/raw/RawKVClient.java | 2 +- src/test/java/org/tikv/raw/RawKVClientTest.java | 2 +- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/tikv/common/exception/RawCASConflictException.java b/src/main/java/org/tikv/common/exception/RawCASConflictException.java index 73512f8efd7..a1b066beb44 100644 --- a/src/main/java/org/tikv/common/exception/RawCASConflictException.java +++ b/src/main/java/org/tikv/common/exception/RawCASConflictException.java @@ -23,17 +23,17 @@ public class RawCASConflictException extends RuntimeException { private final ByteString key; private final Optional expectedPrevValue; - private final Optional currValue; + private final Optional prevValue; public RawCASConflictException( - ByteString key, Optional expectedPrevValue, Optional currValue) { + ByteString key, Optional expectedPrevValue, Optional prevValue) { super( String.format( - "key=%s expectedPrevValue=%s currValue=%s", - KeyUtils.formatBytes(key), expectedPrevValue, currValue)); + "key=%s expectedPrevValue=%s prevValue=%s", + KeyUtils.formatBytes(key), expectedPrevValue, prevValue)); this.key = key; this.expectedPrevValue = expectedPrevValue; - this.currValue = currValue; + this.prevValue = prevValue; } public ByteString getKey() { @@ -44,7 +44,7 @@ public Optional getExpectedPrevValue() { return this.expectedPrevValue; } - public Optional getCurrValue() { - return this.currValue; + public Optional getPrevValue() { + return this.prevValue; } } diff --git a/src/main/java/org/tikv/raw/RawKVClient.java b/src/main/java/org/tikv/raw/RawKVClient.java index 7401ab4a432..075f956584c 100644 --- a/src/main/java/org/tikv/raw/RawKVClient.java +++ b/src/main/java/org/tikv/raw/RawKVClient.java @@ -161,7 +161,7 @@ public Optional putIfAbsent(ByteString key, ByteString value, long t compareAndSet(key, Optional.empty(), value, ttl); return Optional.empty(); } catch (RawCASConflictException e) { - return e.getCurrValue(); + return e.getPrevValue(); } } diff --git a/src/test/java/org/tikv/raw/RawKVClientTest.java b/src/test/java/org/tikv/raw/RawKVClientTest.java index 48287bca832..67a252b4934 100644 --- a/src/test/java/org/tikv/raw/RawKVClientTest.java +++ b/src/test/java/org/tikv/raw/RawKVClientTest.java @@ -107,7 +107,7 @@ public void rawCASTest() { try { client.compareAndSet(key, null, value2, ttl); } catch (RawCASConflictException err) { - assert err.getCurrValue() == Optional.of(value); + assert err.getPrevValue() == Optional.of(value); } } From dc60a08b8849b452db3d52bf73195207c117de20 Mon Sep 17 00:00:00 2001 From: Andy Lok Date: Wed, 16 Jun 2021 14:41:45 +0800 Subject: [PATCH 06/12] Remove .vscode Signed-off-by: Andy Lok --- .vscode/settings.json | 3 --- 1 file changed, 3 deletions(-) delete mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index c5f3f6b9c75..00000000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "java.configuration.updateBuildConfiguration": "interactive" -} \ No newline at end of file From 2d892d99762c238b12cf0be031e3728b2409f0f8 Mon Sep 17 00:00:00 2001 From: Andy Lok Date: Wed, 16 Jun 2021 14:45:21 +0800 Subject: [PATCH 07/12] Remove .settings Signed-off-by: Andy Lok --- .settings/org.eclipse.core.resources.prefs | 9 --------- .settings/org.eclipse.jdt.apt.core.prefs | 2 -- .settings/org.eclipse.jdt.core.prefs | 9 --------- .settings/org.eclipse.m2e.core.prefs | 4 ---- 4 files changed, 24 deletions(-) delete mode 100644 .settings/org.eclipse.core.resources.prefs delete mode 100644 .settings/org.eclipse.jdt.apt.core.prefs delete mode 100644 .settings/org.eclipse.jdt.core.prefs delete mode 100644 .settings/org.eclipse.m2e.core.prefs diff --git a/.settings/org.eclipse.core.resources.prefs b/.settings/org.eclipse.core.resources.prefs deleted file mode 100644 index 7ad0634b708..00000000000 --- a/.settings/org.eclipse.core.resources.prefs +++ /dev/null @@ -1,9 +0,0 @@ -eclipse.preferences.version=1 -encoding//src/main/java=UTF-8 -encoding//src/test/java=UTF-8 -encoding//src/test/resources=UTF-8 -encoding//target/generated-sources/antlr4/java/org/tikv/common/parser=UTF-8 -encoding//target/generated-sources/protobuf/grpc-java=UTF-8 -encoding//target/generated-sources/protobuf/java=UTF-8 -encoding/=UTF-8 -encoding/proto=UTF-8 diff --git a/.settings/org.eclipse.jdt.apt.core.prefs b/.settings/org.eclipse.jdt.apt.core.prefs deleted file mode 100644 index d4313d4b25e..00000000000 --- a/.settings/org.eclipse.jdt.apt.core.prefs +++ /dev/null @@ -1,2 +0,0 @@ -eclipse.preferences.version=1 -org.eclipse.jdt.apt.aptEnabled=false diff --git a/.settings/org.eclipse.jdt.core.prefs b/.settings/org.eclipse.jdt.core.prefs deleted file mode 100644 index 1b6e1ef22f9..00000000000 --- a/.settings/org.eclipse.jdt.core.prefs +++ /dev/null @@ -1,9 +0,0 @@ -eclipse.preferences.version=1 -org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8 -org.eclipse.jdt.core.compiler.compliance=1.8 -org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled -org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning -org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=ignore -org.eclipse.jdt.core.compiler.processAnnotations=disabled -org.eclipse.jdt.core.compiler.release=disabled -org.eclipse.jdt.core.compiler.source=1.8 diff --git a/.settings/org.eclipse.m2e.core.prefs b/.settings/org.eclipse.m2e.core.prefs deleted file mode 100644 index f897a7f1cb2..00000000000 --- a/.settings/org.eclipse.m2e.core.prefs +++ /dev/null @@ -1,4 +0,0 @@ -activeProfiles= -eclipse.preferences.version=1 -resolveWorkspaceProjects=true -version=1 From cdb103ec25287acb276b40222ac5126510854a9c Mon Sep 17 00:00:00 2001 From: Andy Lok Date: Wed, 16 Jun 2021 14:46:51 +0800 Subject: [PATCH 08/12] Delete .classpath Signed-off-by: Andy Lok --- .classpath | 73 ------------------------------------------------------ 1 file changed, 73 deletions(-) delete mode 100644 .classpath diff --git a/.classpath b/.classpath deleted file mode 100644 index c30d349d92d..00000000000 --- a/.classpath +++ /dev/null @@ -1,73 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - From 4e79ed7aa4d76db632ed377cb13c878e841de5e1 Mon Sep 17 00:00:00 2001 From: Andy Lok Date: Thu, 17 Jun 2021 15:12:07 +0800 Subject: [PATCH 09/12] Fix deadloop Signed-off-by: Andy Lok --- src/main/java/org/tikv/raw/RawKVClient.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/tikv/raw/RawKVClient.java b/src/main/java/org/tikv/raw/RawKVClient.java index 075f956584c..1f9305e41c2 100644 --- a/src/main/java/org/tikv/raw/RawKVClient.java +++ b/src/main/java/org/tikv/raw/RawKVClient.java @@ -186,7 +186,7 @@ public void compareAndSet(ByteString key, Optional prevValue, ByteSt public void compareAndSet( ByteString key, Optional prevValue, ByteString value, long ttl) throws RawCASConflictException { - String label = "client_raw_compare_and_swap"; + String label = "client_raw_compare_and_set"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); try { BackOffer backOffer = defaultBackOff(); @@ -195,6 +195,7 @@ public void compareAndSet( try { client.rawCompareAndSet(backOffer, key, prevValue, value, ttl); RAW_REQUEST_SUCCESS.labels(label).inc(); + return; } catch (final TiKVException e) { backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e); } From 80ca8bde9a5124976a5c69e6705bd714e0fdf0ce Mon Sep 17 00:00:00 2001 From: Andy Lok Date: Thu, 17 Jun 2021 17:29:14 +0800 Subject: [PATCH 10/12] Enable TTL and CAS test Signed-off-by: Andy Lok --- .../java/org/tikv/raw/RawKVClientTest.java | 32 +++++++++---------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/src/test/java/org/tikv/raw/RawKVClientTest.java b/src/test/java/org/tikv/raw/RawKVClientTest.java index 67a252b4934..3d73c9ccb64 100644 --- a/src/test/java/org/tikv/raw/RawKVClientTest.java +++ b/src/test/java/org/tikv/raw/RawKVClientTest.java @@ -7,8 +7,8 @@ import java.util.stream.Collectors; import org.apache.commons.lang3.RandomStringUtils; import org.junit.After; +import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -94,25 +94,24 @@ public void tearDown() throws Exception { } } - // tikv-4.0 does not support atomic api - @Ignore + @Test public void rawCASTest() { if (!initialized) return; - long ttl = 10; ByteString key = ByteString.copyFromUtf8("key_atomic"); ByteString value = ByteString.copyFromUtf8("value"); ByteString value2 = ByteString.copyFromUtf8("value2"); client.delete(key); - client.compareAndSet(key, null, value, ttl); + client.compareAndSet(key, Optional.empty(), value); + Assert.assertEquals(value, client.get(key).get()); try { - client.compareAndSet(key, null, value2, ttl); + client.compareAndSet(key, Optional.empty(), value2); + Assert.fail("compareAndSet should fail."); } catch (RawCASConflictException err) { - assert err.getPrevValue() == Optional.of(value); + Assert.assertEquals(value, err.getPrevValue().get()); } } - // tikv-4.0 does not support atomic api - @Ignore + @Test public void rawPutIfAbsentTest() { if (!initialized) return; long ttl = 10; @@ -120,21 +119,20 @@ public void rawPutIfAbsentTest() { ByteString value = ByteString.copyFromUtf8("value"); ByteString value2 = ByteString.copyFromUtf8("value2"); client.delete(key); - ByteString res1 = client.putIfAbsent(key, value, ttl); - assert res1.isEmpty(); - ByteString res2 = client.putIfAbsent(key, value2, ttl); - assert res2.equals(value); + Optional res1 = client.putIfAbsent(key, value, ttl); + assert !res1.isPresent(); + Optional res2 = client.putIfAbsent(key, value2, ttl); + assert res2.equals(Optional.of(value)); try { Thread.sleep(ttl * 1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } - ByteString res3 = client.putIfAbsent(key, value, ttl); - assert res3.isEmpty(); + Optional res3 = client.putIfAbsent(key, value, ttl); + assert !res3.isPresent(); } - // tikv-4.0 doest not support ttl - @Ignore + @Test public void getKeyTTLTest() { if (!initialized) return; long ttl = 10; From d4eac7f3df5b8996292f04e55d416973dd1b38ff Mon Sep 17 00:00:00 2001 From: Andy Lok Date: Thu, 17 Jun 2021 18:17:59 +0800 Subject: [PATCH 11/12] Fix test Signed-off-by: Andy Lok --- .gitignore | 1 + src/test/java/org/tikv/raw/RawKVClientTest.java | 10 +++++----- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/.gitignore b/.gitignore index c0b20beaed2..3d372b4332e 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ pub.sh # ignore compiled classes target +.classpath # ignore version info src/main/java/com/pingcap/tikv/TiVersion.java diff --git a/src/test/java/org/tikv/raw/RawKVClientTest.java b/src/test/java/org/tikv/raw/RawKVClientTest.java index a14fb309387..c394666fa1a 100644 --- a/src/test/java/org/tikv/raw/RawKVClientTest.java +++ b/src/test/java/org/tikv/raw/RawKVClientTest.java @@ -124,7 +124,7 @@ public void rawPutIfAbsentTest() { Optional res1 = client.putIfAbsent(key, value, ttl); assertFalse(res1.isPresent()); Optional res2 = client.putIfAbsent(key, value2, ttl); - assertEquals(res2, Optional.of(value)); + assertEquals(res2.get(), value); try { Thread.sleep(ttl * 1000); } catch (InterruptedException e) { @@ -291,10 +291,10 @@ public void simpleTest() { try { checkDeleteRange(ByteString.EMPTY, ByteString.EMPTY); - checkNotExist(kv); - checkNotExist(kv1); - checkNotExist(kv2); - checkNotExist(kv3); + checkNotExist(key); + checkNotExist(key1); + checkNotExist(key2); + checkNotExist(key3); checkPut(kv); checkPut(kv1); checkPut(kv2); From 5fdd79c0d8bb0bb790eb4405f8c923aad978e319 Mon Sep 17 00:00:00 2001 From: Andy Lok Date: Fri, 18 Jun 2021 13:36:01 +0800 Subject: [PATCH 12/12] rebase #202 Signed-off-by: Andy Lok --- .ci/integration_test.groovy | 6 +++--- config/tikv.toml | 3 +++ 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/.ci/integration_test.groovy b/.ci/integration_test.groovy index c5cb281c96e..2d7a28ff4ea 100644 --- a/.ci/integration_test.groovy +++ b/.ci/integration_test.groovy @@ -1,8 +1,8 @@ def call(ghprbActualCommit, ghprbPullId, ghprbPullTitle, ghprbPullLink, ghprbPullDescription, credentialsId) { - def TIDB_BRANCH = "release-4.0" - def TIKV_BRANCH = "release-4.0" - def PD_BRANCH = "release-4.0" + def TIDB_BRANCH = "release-5.0" + def TIKV_BRANCH = "release-5.0" + def PD_BRANCH = "release-5.0" // parse tidb branch def m1 = ghprbCommentBody =~ /tidb\s*=\s*([^\s\\]+)(\s|\\|$)/ diff --git a/config/tikv.toml b/config/tikv.toml index 287abc69e41..f525fa753c5 100644 --- a/config/tikv.toml +++ b/config/tikv.toml @@ -3,3 +3,6 @@ [raftstore] # set store capacity, if no set, use disk capacity. capacity = "8G" + +[storage] +enable-ttl = true