Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .ci/integration_test.groovy
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
def call(ghprbActualCommit, ghprbPullId, ghprbPullTitle, ghprbPullLink, ghprbPullDescription, credentialsId) {

def TIDB_BRANCH = "release-4.0"
def TIKV_BRANCH = "release-4.0"
def PD_BRANCH = "release-4.0"
def TIDB_BRANCH = "release-5.0"
def TIKV_BRANCH = "release-5.0"
def PD_BRANCH = "release-5.0"

// parse tidb branch
def m1 = ghprbCommentBody =~ /tidb\s*=\s*([^\s\\]+)(\s|\\|$)/
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub.sh

# ignore compiled classes
target
.classpath

# ignore version info
src/main/java/com/pingcap/tikv/TiVersion.java
Expand Down
3 changes: 3 additions & 0 deletions config/tikv.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@
[raftstore]
# set store capacity, if no set, use disk capacity.
capacity = "8G"

[storage]
enable-ttl = true
6 changes: 4 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,16 @@
</scm>

<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<protobuf.version>3.5.1</protobuf.version>
<log4j.version>1.2.17</log4j.version>
<slf4j.version>1.7.16</slf4j.version>
<grpc.version>1.24.0</grpc.version>
<powermock.version>1.6.6</powermock.version>
<jackson.version>2.10.0</jackson.version>
<jackson.version>2.12.3</jackson.version>
<trove4j.version>3.0.1</trove4j.version>
<jetcd.version>0.4.1</jetcd.version>
<joda-time.version>2.9.9</joda-time.version>
Expand Down Expand Up @@ -386,7 +388,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.9.1</version>
<version>3.0.1</version>
<configuration>
<skip>${javadoc.skip}</skip>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.tikv.common.exception;

import com.google.protobuf.ByteString;
import java.util.Optional;
import org.tikv.common.codec.KeyUtils;

public class RawCASConflictException extends RuntimeException {

private final ByteString key;
private final Optional<ByteString> expectedPrevValue;
private final Optional<ByteString> prevValue;

public RawCASConflictException(
ByteString key, Optional<ByteString> expectedPrevValue, Optional<ByteString> prevValue) {
super(
String.format(
"key=%s expectedPrevValue=%s prevValue=%s",
KeyUtils.formatBytes(key), expectedPrevValue, prevValue));
this.key = key;
this.expectedPrevValue = expectedPrevValue;
this.prevValue = prevValue;
}

public ByteString getKey() {
return this.key;
}

public Optional<ByteString> getExpectedPrevValue() {
return this.expectedPrevValue;
}

public Optional<ByteString> getPrevValue() {
return this.prevValue;
}
}
46 changes: 31 additions & 15 deletions src/main/java/org/tikv/common/region/RegionStoreClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,7 @@ public List<TiRegion> splitRegion(Iterable<ByteString> splitKeys) {

// APIs for Raw Scan/Put/Get/Delete

public ByteString rawGet(BackOffer backOffer, ByteString key) {
public Optional<ByteString> rawGet(BackOffer backOffer, ByteString key) {
Histogram.Timer requestTimer =
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_get").startTimer();
try {
Expand All @@ -817,7 +817,7 @@ public ByteString rawGet(BackOffer backOffer, ByteString key) {
}
}

private ByteString rawGetHelper(RawGetResponse resp) {
private Optional<ByteString> rawGetHelper(RawGetResponse resp) {
if (resp == null) {
this.regionManager.onRequestFail(region);
throw new TiClientInternalException("RawGetResponse failed without a cause");
Expand All @@ -829,10 +829,14 @@ private ByteString rawGetHelper(RawGetResponse resp) {
if (resp.hasRegionError()) {
throw new RegionException(resp.getRegionError());
}
return resp.getValue();
if (resp.getNotFound()) {
return Optional.empty();
} else {
return Optional.of(resp.getValue());
}
}

public Long rawGetKeyTTL(BackOffer backOffer, ByteString key) {
public Optional<Long> rawGetKeyTTL(BackOffer backOffer, ByteString key) {
Histogram.Timer requestTimer =
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_get_key_ttl").startTimer();
try {
Expand All @@ -853,7 +857,7 @@ public Long rawGetKeyTTL(BackOffer backOffer, ByteString key) {
}
}

private Long rawGetKeyTTLHelper(RawGetKeyTTLResponse resp) {
private Optional<Long> rawGetKeyTTLHelper(RawGetKeyTTLResponse resp) {
if (resp == null) {
this.regionManager.onRequestFail(region);
throw new TiClientInternalException("RawGetResponse failed without a cause");
Expand All @@ -866,9 +870,9 @@ private Long rawGetKeyTTLHelper(RawGetKeyTTLResponse resp) {
throw new RegionException(resp.getRegionError());
}
if (resp.getNotFound()) {
return null;
return Optional.empty();
}
return resp.getTtl();
return Optional.of(resp.getTtl());
}

public void rawDelete(BackOffer backOffer, ByteString key) {
Expand Down Expand Up @@ -944,8 +948,13 @@ private void rawPutHelper(RawPutResponse resp) {
}
}

public ByteString rawPutIfAbsent(
BackOffer backOffer, ByteString key, ByteString value, long ttl) {
public void rawCompareAndSet(
BackOffer backOffer,
ByteString key,
Optional<ByteString> prevValue,
ByteString value,
long ttl)
throws RawCASConflictException {
Histogram.Timer requestTimer =
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_put_if_absent").startTimer();
try {
Expand All @@ -955,7 +964,8 @@ public ByteString rawPutIfAbsent(
.setContext(region.getReplicaContext(storeType))
.setKey(key)
.setValue(value)
.setPreviousNotExist(true)
.setPreviousValue(prevValue.orElse(ByteString.EMPTY))
.setPreviousNotExist(!prevValue.isPresent())
.setTtl(ttl)
.build();

Expand All @@ -964,13 +974,15 @@ public ByteString rawPutIfAbsent(
regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
RawCASResponse resp =
callWithRetry(backOffer, TikvGrpc.getRawCompareAndSwapMethod(), factory, handler);
return rawPutIfAbsentHelper(resp);
rawCompareAndSetHelper(key, prevValue, resp);
} finally {
requestTimer.observeDuration();
}
}

private ByteString rawPutIfAbsentHelper(RawCASResponse resp) {
private void rawCompareAndSetHelper(
ByteString key, Optional<ByteString> expectedPrevValue, RawCASResponse resp)
throws RawCASConflictException {
if (resp == null) {
this.regionManager.onRequestFail(region);
throw new TiClientInternalException("RawPutResponse failed without a cause");
Expand All @@ -982,10 +994,14 @@ private ByteString rawPutIfAbsentHelper(RawCASResponse resp) {
if (resp.hasRegionError()) {
throw new RegionException(resp.getRegionError());
}
if (resp.getSucceed()) {
return ByteString.EMPTY;
if (!resp.getSucceed()) {
if (resp.getPreviousNotExist()) {
throw new RawCASConflictException(key, expectedPrevValue, Optional.empty());
} else {
throw new RawCASConflictException(
key, expectedPrevValue, Optional.of(resp.getPreviousValue()));
}
}
return resp.getPreviousValue();
}

public List<KvPair> rawBatchGet(BackOffer backoffer, List<ByteString> keys) {
Expand Down
56 changes: 43 additions & 13 deletions src/main/java/org/tikv/raw/RawKVClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.slf4j.LoggerFactory;
import org.tikv.common.TiConfiguration;
import org.tikv.common.TiSession;
import org.tikv.common.exception.RawCASConflictException;
import org.tikv.common.exception.TiKVException;
import org.tikv.common.key.Key;
import org.tikv.common.operation.iterator.RawScanIterator;
Expand Down Expand Up @@ -139,10 +140,10 @@ public void put(ByteString key, ByteString value, long ttl) {
*
* @param key key
* @param value value
* @return a ByteString. returns ByteString.EMPTY if the value is written successfully. returns
* the previous key if the value already exists, and does not write to TiKV.
* @return a ByteString. returns Optional.EMPTY if the value is written successfully. returns the
* previous key if the value already exists, and does not write to TiKV.
*/
public ByteString putIfAbsent(ByteString key, ByteString value) {
public Optional<ByteString> putIfAbsent(ByteString key, ByteString value) {
return putIfAbsent(key, value, 0L);
}

Expand All @@ -152,20 +153,49 @@ public ByteString putIfAbsent(ByteString key, ByteString value) {
* @param key key
* @param value value
* @param ttl TTL of key (in seconds), 0 means the key will never be outdated.
* @return a ByteString. returns ByteString.EMPTY if the value is written successfully. returns
* the previous key if the value already exists, and does not write to TiKV.
* @return a ByteString. returns Optional.EMPTY if the value is written successfully. returns the
* previous key if the value already exists, and does not write to TiKV.
*/
public ByteString putIfAbsent(ByteString key, ByteString value, long ttl) {
String label = "client_raw_put_if_absent";
public Optional<ByteString> putIfAbsent(ByteString key, ByteString value, long ttl) {
try {
compareAndSet(key, Optional.empty(), value, ttl);
return Optional.empty();
} catch (RawCASConflictException e) {
return e.getPrevValue();
}
}

/**
* Put a key-value pair if the prevValue matched the value in TiKV. This API is atomic.
*
* @param key key
* @param value value
*/
public void compareAndSet(ByteString key, Optional<ByteString> prevValue, ByteString value)
throws RawCASConflictException {
compareAndSet(key, prevValue, value, 0L);
}

/**
* pair if the prevValue matched the value in TiKV. This API is atomic.
*
* @param key key
* @param value value
* @param ttl TTL of key (in seconds), 0 means the key will never be outdated.
*/
public void compareAndSet(
ByteString key, Optional<ByteString> prevValue, ByteString value, long ttl)
throws RawCASConflictException {
String label = "client_raw_compare_and_set";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
try {
BackOffer backOffer = defaultBackOff();
while (true) {
RegionStoreClient client = clientBuilder.build(key);
try {
ByteString result = client.rawPutIfAbsent(backOffer, key, value, ttl);
client.rawCompareAndSet(backOffer, key, prevValue, value, ttl);
RAW_REQUEST_SUCCESS.labels(label).inc();
return result;
return;
} catch (final TiKVException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
}
Expand Down Expand Up @@ -236,15 +266,15 @@ private void batchPut(Map<ByteString, ByteString> kvPairs, long ttl, boolean ato
* @param key raw key
* @return a ByteString value if key exists, ByteString.EMPTY if key does not exist
*/
public ByteString get(ByteString key) {
public Optional<ByteString> get(ByteString key) {
String label = "client_raw_get";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
try {
BackOffer backOffer = defaultBackOff();
while (true) {
RegionStoreClient client = clientBuilder.build(key);
try {
ByteString result = client.rawGet(defaultBackOff(), key);
Optional<ByteString> result = client.rawGet(defaultBackOff(), key);
RAW_REQUEST_SUCCESS.labels(label).inc();
return result;
} catch (final TiKVException e) {
Expand Down Expand Up @@ -322,15 +352,15 @@ private void batchDelete(List<ByteString> keys, boolean atomic) {
* @return a Long indicating the TTL of key ttl is a non-null long value indicating TTL if key
* exists. - ttl=0 if the key will never be outdated. - ttl=null if the key does not exist
*/
public Long getKeyTTL(ByteString key) {
public Optional<Long> getKeyTTL(ByteString key) {
String label = "client_raw_get_key_ttl";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
try {
BackOffer backOffer = defaultBackOff();
while (true) {
RegionStoreClient client = clientBuilder.build(key);
try {
Long result = client.rawGetKeyTTL(defaultBackOff(), key);
Optional<Long> result = client.rawGetKeyTTL(defaultBackOff(), key);
RAW_REQUEST_SUCCESS.labels(label).inc();
return result;
} catch (final TiKVException e) {
Expand Down
7 changes: 4 additions & 3 deletions src/test/java/org/tikv/common/RegionStoreClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import java.util.List;
import java.util.Optional;
import org.junit.Test;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.RegionStoreClient;
Expand Down Expand Up @@ -65,13 +66,13 @@ public void rawGetTest() throws Exception {

public void doRawGetTest(RegionStoreClient client) throws Exception {
server.put("key1", "value1");
ByteString value = client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("key1"));
assertEquals(ByteString.copyFromUtf8("value1"), value);
Optional<ByteString> value = client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("key1"));
assertEquals(ByteString.copyFromUtf8("value1"), value.get());

server.putError("error1", KVMockServer.NOT_LEADER);
// since not_leader is retryable, so the result should be correct.
value = client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("key1"));
assertEquals(ByteString.copyFromUtf8("value1"), value);
assertEquals(ByteString.copyFromUtf8("value1"), value.get());

server.putError("failure", KVMockServer.STALE_EPOCH);
try {
Expand Down
Loading