diff --git a/.ci/integration_test.groovy b/.ci/integration_test.groovy index c5cb281c96e..2d7a28ff4ea 100644 --- a/.ci/integration_test.groovy +++ b/.ci/integration_test.groovy @@ -1,8 +1,8 @@ def call(ghprbActualCommit, ghprbPullId, ghprbPullTitle, ghprbPullLink, ghprbPullDescription, credentialsId) { - def TIDB_BRANCH = "release-4.0" - def TIKV_BRANCH = "release-4.0" - def PD_BRANCH = "release-4.0" + def TIDB_BRANCH = "release-5.0" + def TIKV_BRANCH = "release-5.0" + def PD_BRANCH = "release-5.0" // parse tidb branch def m1 = ghprbCommentBody =~ /tidb\s*=\s*([^\s\\]+)(\s|\\|$)/ diff --git a/.gitignore b/.gitignore index c0b20beaed2..3d372b4332e 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ pub.sh # ignore compiled classes target +.classpath # ignore version info src/main/java/com/pingcap/tikv/TiVersion.java diff --git a/config/tikv.toml b/config/tikv.toml index 287abc69e41..f525fa753c5 100644 --- a/config/tikv.toml +++ b/config/tikv.toml @@ -3,3 +3,6 @@ [raftstore] # set store capacity, if no set, use disk capacity. capacity = "8G" + +[storage] +enable-ttl = true diff --git a/pom.xml b/pom.xml index 4e4cff5be70..90e7cf0c466 100644 --- a/pom.xml +++ b/pom.xml @@ -57,6 +57,8 @@ + 1.8 + 1.8 UTF-8 UTF-8 3.5.1 @@ -64,7 +66,7 @@ 1.7.16 1.24.0 1.6.6 - 2.10.0 + 2.12.3 3.0.1 0.4.1 2.9.9 @@ -386,7 +388,7 @@ org.apache.maven.plugins maven-javadoc-plugin - 2.9.1 + 3.0.1 ${javadoc.skip} diff --git a/src/main/java/org/tikv/common/exception/RawCASConflictException.java b/src/main/java/org/tikv/common/exception/RawCASConflictException.java new file mode 100644 index 00000000000..a1b066beb44 --- /dev/null +++ b/src/main/java/org/tikv/common/exception/RawCASConflictException.java @@ -0,0 +1,50 @@ +/* + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.tikv.common.exception; + +import com.google.protobuf.ByteString; +import java.util.Optional; +import org.tikv.common.codec.KeyUtils; + +public class RawCASConflictException extends RuntimeException { + + private final ByteString key; + private final Optional expectedPrevValue; + private final Optional prevValue; + + public RawCASConflictException( + ByteString key, Optional expectedPrevValue, Optional prevValue) { + super( + String.format( + "key=%s expectedPrevValue=%s prevValue=%s", + KeyUtils.formatBytes(key), expectedPrevValue, prevValue)); + this.key = key; + this.expectedPrevValue = expectedPrevValue; + this.prevValue = prevValue; + } + + public ByteString getKey() { + return this.key; + } + + public Optional getExpectedPrevValue() { + return this.expectedPrevValue; + } + + public Optional getPrevValue() { + return this.prevValue; + } +} diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java index e6f37a99495..7fbfade535b 100644 --- a/src/main/java/org/tikv/common/region/RegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java @@ -797,7 +797,7 @@ public List splitRegion(Iterable splitKeys) { // APIs for Raw Scan/Put/Get/Delete - public ByteString rawGet(BackOffer backOffer, ByteString key) { + public Optional rawGet(BackOffer backOffer, ByteString key) { Histogram.Timer requestTimer = GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_get").startTimer(); try { @@ -817,7 +817,7 @@ public ByteString rawGet(BackOffer backOffer, ByteString key) { } } - private ByteString rawGetHelper(RawGetResponse resp) { + private Optional rawGetHelper(RawGetResponse resp) { if (resp == null) { this.regionManager.onRequestFail(region); throw new TiClientInternalException("RawGetResponse failed without a cause"); @@ -829,10 +829,14 @@ private ByteString rawGetHelper(RawGetResponse resp) { if (resp.hasRegionError()) { throw new RegionException(resp.getRegionError()); } - return resp.getValue(); + if (resp.getNotFound()) { + return Optional.empty(); + } else { + return Optional.of(resp.getValue()); + } } - public Long rawGetKeyTTL(BackOffer backOffer, ByteString key) { + public Optional rawGetKeyTTL(BackOffer backOffer, ByteString key) { Histogram.Timer requestTimer = GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_get_key_ttl").startTimer(); try { @@ -853,7 +857,7 @@ public Long rawGetKeyTTL(BackOffer backOffer, ByteString key) { } } - private Long rawGetKeyTTLHelper(RawGetKeyTTLResponse resp) { + private Optional rawGetKeyTTLHelper(RawGetKeyTTLResponse resp) { if (resp == null) { this.regionManager.onRequestFail(region); throw new TiClientInternalException("RawGetResponse failed without a cause"); @@ -866,9 +870,9 @@ private Long rawGetKeyTTLHelper(RawGetKeyTTLResponse resp) { throw new RegionException(resp.getRegionError()); } if (resp.getNotFound()) { - return null; + return Optional.empty(); } - return resp.getTtl(); + return Optional.of(resp.getTtl()); } public void rawDelete(BackOffer backOffer, ByteString key) { @@ -944,8 +948,13 @@ private void rawPutHelper(RawPutResponse resp) { } } - public ByteString rawPutIfAbsent( - BackOffer backOffer, ByteString key, ByteString value, long ttl) { + public void rawCompareAndSet( + BackOffer backOffer, + ByteString key, + Optional prevValue, + ByteString value, + long ttl) + throws RawCASConflictException { Histogram.Timer requestTimer = GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_put_if_absent").startTimer(); try { @@ -955,7 +964,8 @@ public ByteString rawPutIfAbsent( .setContext(region.getReplicaContext(storeType)) .setKey(key) .setValue(value) - .setPreviousNotExist(true) + .setPreviousValue(prevValue.orElse(ByteString.EMPTY)) + .setPreviousNotExist(!prevValue.isPresent()) .setTtl(ttl) .build(); @@ -964,13 +974,15 @@ public ByteString rawPutIfAbsent( regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null); RawCASResponse resp = callWithRetry(backOffer, TikvGrpc.getRawCompareAndSwapMethod(), factory, handler); - return rawPutIfAbsentHelper(resp); + rawCompareAndSetHelper(key, prevValue, resp); } finally { requestTimer.observeDuration(); } } - private ByteString rawPutIfAbsentHelper(RawCASResponse resp) { + private void rawCompareAndSetHelper( + ByteString key, Optional expectedPrevValue, RawCASResponse resp) + throws RawCASConflictException { if (resp == null) { this.regionManager.onRequestFail(region); throw new TiClientInternalException("RawPutResponse failed without a cause"); @@ -982,10 +994,14 @@ private ByteString rawPutIfAbsentHelper(RawCASResponse resp) { if (resp.hasRegionError()) { throw new RegionException(resp.getRegionError()); } - if (resp.getSucceed()) { - return ByteString.EMPTY; + if (!resp.getSucceed()) { + if (resp.getPreviousNotExist()) { + throw new RawCASConflictException(key, expectedPrevValue, Optional.empty()); + } else { + throw new RawCASConflictException( + key, expectedPrevValue, Optional.of(resp.getPreviousValue())); + } } - return resp.getPreviousValue(); } public List rawBatchGet(BackOffer backoffer, List keys) { diff --git a/src/main/java/org/tikv/raw/RawKVClient.java b/src/main/java/org/tikv/raw/RawKVClient.java index 40dc7124de4..cd37bf71592 100644 --- a/src/main/java/org/tikv/raw/RawKVClient.java +++ b/src/main/java/org/tikv/raw/RawKVClient.java @@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory; import org.tikv.common.TiConfiguration; import org.tikv.common.TiSession; +import org.tikv.common.exception.RawCASConflictException; import org.tikv.common.exception.TiKVException; import org.tikv.common.key.Key; import org.tikv.common.operation.iterator.RawScanIterator; @@ -139,10 +140,10 @@ public void put(ByteString key, ByteString value, long ttl) { * * @param key key * @param value value - * @return a ByteString. returns ByteString.EMPTY if the value is written successfully. returns - * the previous key if the value already exists, and does not write to TiKV. + * @return a ByteString. returns Optional.EMPTY if the value is written successfully. returns the + * previous key if the value already exists, and does not write to TiKV. */ - public ByteString putIfAbsent(ByteString key, ByteString value) { + public Optional putIfAbsent(ByteString key, ByteString value) { return putIfAbsent(key, value, 0L); } @@ -152,20 +153,49 @@ public ByteString putIfAbsent(ByteString key, ByteString value) { * @param key key * @param value value * @param ttl TTL of key (in seconds), 0 means the key will never be outdated. - * @return a ByteString. returns ByteString.EMPTY if the value is written successfully. returns - * the previous key if the value already exists, and does not write to TiKV. + * @return a ByteString. returns Optional.EMPTY if the value is written successfully. returns the + * previous key if the value already exists, and does not write to TiKV. */ - public ByteString putIfAbsent(ByteString key, ByteString value, long ttl) { - String label = "client_raw_put_if_absent"; + public Optional putIfAbsent(ByteString key, ByteString value, long ttl) { + try { + compareAndSet(key, Optional.empty(), value, ttl); + return Optional.empty(); + } catch (RawCASConflictException e) { + return e.getPrevValue(); + } + } + + /** + * Put a key-value pair if the prevValue matched the value in TiKV. This API is atomic. + * + * @param key key + * @param value value + */ + public void compareAndSet(ByteString key, Optional prevValue, ByteString value) + throws RawCASConflictException { + compareAndSet(key, prevValue, value, 0L); + } + + /** + * pair if the prevValue matched the value in TiKV. This API is atomic. + * + * @param key key + * @param value value + * @param ttl TTL of key (in seconds), 0 means the key will never be outdated. + */ + public void compareAndSet( + ByteString key, Optional prevValue, ByteString value, long ttl) + throws RawCASConflictException { + String label = "client_raw_compare_and_set"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); try { BackOffer backOffer = defaultBackOff(); while (true) { RegionStoreClient client = clientBuilder.build(key); try { - ByteString result = client.rawPutIfAbsent(backOffer, key, value, ttl); + client.rawCompareAndSet(backOffer, key, prevValue, value, ttl); RAW_REQUEST_SUCCESS.labels(label).inc(); - return result; + return; } catch (final TiKVException e) { backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e); } @@ -236,7 +266,7 @@ private void batchPut(Map kvPairs, long ttl, boolean ato * @param key raw key * @return a ByteString value if key exists, ByteString.EMPTY if key does not exist */ - public ByteString get(ByteString key) { + public Optional get(ByteString key) { String label = "client_raw_get"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); try { @@ -244,7 +274,7 @@ public ByteString get(ByteString key) { while (true) { RegionStoreClient client = clientBuilder.build(key); try { - ByteString result = client.rawGet(defaultBackOff(), key); + Optional result = client.rawGet(defaultBackOff(), key); RAW_REQUEST_SUCCESS.labels(label).inc(); return result; } catch (final TiKVException e) { @@ -322,7 +352,7 @@ private void batchDelete(List keys, boolean atomic) { * @return a Long indicating the TTL of key ttl is a non-null long value indicating TTL if key * exists. - ttl=0 if the key will never be outdated. - ttl=null if the key does not exist */ - public Long getKeyTTL(ByteString key) { + public Optional getKeyTTL(ByteString key) { String label = "client_raw_get_key_ttl"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); try { @@ -330,7 +360,7 @@ public Long getKeyTTL(ByteString key) { while (true) { RegionStoreClient client = clientBuilder.build(key); try { - Long result = client.rawGetKeyTTL(defaultBackOff(), key); + Optional result = client.rawGetKeyTTL(defaultBackOff(), key); RAW_REQUEST_SUCCESS.labels(label).inc(); return result; } catch (final TiKVException e) { diff --git a/src/test/java/org/tikv/common/RegionStoreClientTest.java b/src/test/java/org/tikv/common/RegionStoreClientTest.java index e74c5823aef..670b36ccf69 100644 --- a/src/test/java/org/tikv/common/RegionStoreClientTest.java +++ b/src/test/java/org/tikv/common/RegionStoreClientTest.java @@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableList; import com.google.protobuf.ByteString; import java.util.List; +import java.util.Optional; import org.junit.Test; import org.tikv.common.region.RegionManager; import org.tikv.common.region.RegionStoreClient; @@ -65,13 +66,13 @@ public void rawGetTest() throws Exception { public void doRawGetTest(RegionStoreClient client) throws Exception { server.put("key1", "value1"); - ByteString value = client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("key1")); - assertEquals(ByteString.copyFromUtf8("value1"), value); + Optional value = client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("key1")); + assertEquals(ByteString.copyFromUtf8("value1"), value.get()); server.putError("error1", KVMockServer.NOT_LEADER); // since not_leader is retryable, so the result should be correct. value = client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("key1")); - assertEquals(ByteString.copyFromUtf8("value1"), value); + assertEquals(ByteString.copyFromUtf8("value1"), value.get()); server.putError("failure", KVMockServer.STALE_EPOCH); try { diff --git a/src/test/java/org/tikv/raw/RawKVClientTest.java b/src/test/java/org/tikv/raw/RawKVClientTest.java index b6be9eeeedb..c394666fa1a 100644 --- a/src/test/java/org/tikv/raw/RawKVClientTest.java +++ b/src/test/java/org/tikv/raw/RawKVClientTest.java @@ -9,14 +9,15 @@ import java.util.stream.Collectors; import org.apache.commons.lang3.RandomStringUtils; import org.junit.After; +import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.TiConfiguration; import org.tikv.common.TiSession; import org.tikv.common.codec.KeyUtils; +import org.tikv.common.exception.RawCASConflictException; import org.tikv.common.exception.TiKVException; import org.tikv.common.key.Key; import org.tikv.common.util.FastByteComparisons; @@ -95,30 +96,45 @@ public void tearDown() throws Exception { } } - // tikv-4.0 does not support atomic api - @Ignore - public void atomicAPITest() { + @Test + public void rawCASTest() { + if (!initialized) return; + ByteString key = ByteString.copyFromUtf8("key_atomic"); + ByteString value = ByteString.copyFromUtf8("value"); + ByteString value2 = ByteString.copyFromUtf8("value2"); + client.delete(key); + client.compareAndSet(key, Optional.empty(), value); + Assert.assertEquals(value, client.get(key).get()); + try { + client.compareAndSet(key, Optional.empty(), value2); + Assert.fail("compareAndSet should fail."); + } catch (RawCASConflictException err) { + Assert.assertEquals(value, err.getPrevValue().get()); + } + } + + @Test + public void rawPutIfAbsentTest() { if (!initialized) return; long ttl = 10; ByteString key = ByteString.copyFromUtf8("key_atomic"); ByteString value = ByteString.copyFromUtf8("value"); ByteString value2 = ByteString.copyFromUtf8("value2"); client.delete(key); - ByteString res1 = client.putIfAbsent(key, value, ttl); - assertTrue(res1.isEmpty()); - ByteString res2 = client.putIfAbsent(key, value2, ttl); - assertEquals(value, res2); + Optional res1 = client.putIfAbsent(key, value, ttl); + assertFalse(res1.isPresent()); + Optional res2 = client.putIfAbsent(key, value2, ttl); + assertEquals(res2.get(), value); try { Thread.sleep(ttl * 1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } - ByteString res3 = client.putIfAbsent(key, value, ttl); - assertTrue(res3.isEmpty()); + Optional res3 = client.putIfAbsent(key, value, ttl); + assertFalse(res3.isPresent()); } - // tikv-4.0 doest not support ttl - @Ignore + @Test public void getKeyTTLTest() { if (!initialized) return; long ttl = 10; @@ -126,19 +142,19 @@ public void getKeyTTLTest() { ByteString value = ByteString.copyFromUtf8("value"); client.put(key, value, ttl); for (int i = 0; i < 9; i++) { - Long t = client.getKeyTTL(key); - logger.info("current ttl of key is " + t); + Optional t = client.getKeyTTL(key); + logger.info("current ttl of key is " + t.orElse(null)); try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } - Long t = client.getKeyTTL(key); - if (t == null) { - logger.info("key outdated."); + Optional t = client.getKeyTTL(key); + if (t.isPresent()) { + logger.info("key not outdated: " + t.get()); } else { - logger.info("key not outdated: " + t); + logger.info("key outdated."); } } @@ -275,10 +291,10 @@ public void simpleTest() { try { checkDeleteRange(ByteString.EMPTY, ByteString.EMPTY); - checkEmpty(kv); - checkEmpty(kv1); - checkEmpty(kv2); - checkEmpty(kv3); + checkNotExist(key); + checkNotExist(key1); + checkNotExist(key2); + checkNotExist(key3); checkPut(kv); checkPut(kv1); checkPut(kv2); @@ -532,7 +548,7 @@ private void rawGetTest(int getCases, boolean benchmark) { } else { int i = 0; for (Map.Entry pair : data.entrySet()) { - assertEquals(pair.getValue(), client.get(pair.getKey())); + assertEquals(client.get(pair.getKey()), Optional.of(pair.getValue())); i++; if (i >= getCases) { break; @@ -795,13 +811,13 @@ private void checkPut(Kvrpcpb.KvPair kv) { private void checkPut(ByteString key, ByteString value) { client.put(key, value); - assertEquals(value, client.get(key)); + assertEquals(client.get(key).orElse(null), value); } private void checkBatchPut(Map kvPairs) { client.batchPut(kvPairs); for (Map.Entry kvPair : kvPairs.entrySet()) { - assertEquals(kvPair.getValue(), client.get(kvPair.getKey())); + assertEquals(client.get(kvPair.getKey()).orElse(null), kvPair.getValue()); } } @@ -863,7 +879,7 @@ private void checkBatchScanKeys(List> ranges) { private void checkDelete(ByteString key) { client.delete(key); - checkEmpty(key); + checkNotExist(key); } private void checkDeleteRange(ByteString startKey, ByteString endKey) { @@ -876,30 +892,26 @@ private void checkDeleteRange(ByteString startKey, ByteString endKey) { private void checkPutTTL(ByteString key, ByteString value, long ttl) { client.put(key, value, ttl); - assertEquals(value, client.get(key)); + assert client.get(key).orElse(null).equals(value); } private void checkGetKeyTTL(ByteString key, long ttl) { - Long t = client.getKeyTTL(key); - assertNotNull(t); - assertTrue(t <= ttl && t > 0); + Optional t = client.getKeyTTL(key); + assertTrue(t.isPresent()); + assertTrue(t.get() <= ttl && t.get() > 0); } private void checkGetTTLTimeOut(ByteString key) { - assertTrue(client.get(key).isEmpty()); + assertFalse(client.get(key).isPresent()); } private void checkGetKeyTTLTimeOut(ByteString key) { - Long t = client.getKeyTTL(key); - assertNull(t); - } - - private void checkEmpty(Kvrpcpb.KvPair kv) { - checkEmpty(kv.getKey()); + Optional t = client.getKeyTTL(key); + assertFalse(t.isPresent()); } - private void checkEmpty(ByteString key) { - assertTrue(client.get(key).isEmpty()); + private void checkNotExist(ByteString key) { + assertFalse(client.get(key).isPresent()); } private static ByteString rawKey(String key) {