Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 29 additions & 21 deletions src/main/java/org/tikv/common/region/RegionStoreClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ public List<KvPair> scan(
this,
lockResolverClient,
resp -> resp.hasRegionError() ? resp.getRegionError() : null,
resp -> null,
resp -> resp.hasError() ? resp.getError() : null,
resolveLockResult -> addResolvedLocks(version, resolveLockResult.getResolvedLocks()),
version,
forWrite);
Expand All @@ -366,13 +366,14 @@ public List<KvPair> scan(
// we need to update region after retry
region = regionManager.getRegionByKey(startKey, backOffer);

if (isScanSuccess(backOffer, resp)) {
return doScan(resp);
if (handleScanResponse(backOffer, resp, version, forWrite)) {
return resp.getPairsList();
}
}
}

private boolean isScanSuccess(BackOffer backOffer, ScanResponse resp) {
private boolean handleScanResponse(
BackOffer backOffer, ScanResponse resp, long version, boolean forWrite) {
if (resp == null) {
this.regionManager.onRequestFail(region);
throw new TiClientInternalException("ScanResponse failed without a cause");
Expand All @@ -381,28 +382,35 @@ private boolean isScanSuccess(BackOffer backOffer, ScanResponse resp) {
backOffer.doBackOff(BoRegionMiss, new RegionException(resp.getRegionError()));
return false;
}
return true;
}

// TODO: resolve locks after scan
private List<KvPair> doScan(ScanResponse resp) {
// Check if kvPair contains error, it should be a Lock if hasError is true.
List<KvPair> kvPairs = resp.getPairsList();
List<KvPair> newKvPairs = new ArrayList<>();
for (KvPair kvPair : kvPairs) {
// Resolve locks
// Note: Memory lock conflict is returned by both `ScanResponse.error` &
// `ScanResponse.pairs[0].error`, while other key errors are returned by
// `ScanResponse.pairs.error`
// See https://github.com/pingcap/kvproto/pull/697
List<Lock> locks = new ArrayList<>();
for (KvPair kvPair : resp.getPairsList()) {
if (kvPair.hasError()) {
Lock lock = AbstractLockResolverClient.extractLockFromKeyErr(kvPair.getError(), codec);
newKvPairs.add(
KvPair.newBuilder()
.setError(kvPair.getError())
.setValue(kvPair.getValue())
.setKey(lock.getKey())
.build());
} else {
newKvPairs.add(codec.decodeKvPair(kvPair));
locks.add(lock);
}
}
if (!locks.isEmpty()) {
ResolveLockResult resolveLockResult =
lockResolverClient.resolveLocks(backOffer, version, locks, forWrite);
addResolvedLocks(version, resolveLockResult.getResolvedLocks());

long msBeforeExpired = resolveLockResult.getMsBeforeTxnExpired();
if (msBeforeExpired > 0) {
// if not resolve all locks, we wait and retry
backOffer.doBackOffWithMaxSleep(
BoTxnLockFast, msBeforeExpired, new KeyException(locks.toString()));
}

return false;
}
return Collections.unmodifiableList(newKvPairs);

return true;
}

public List<KvPair> scan(BackOffer backOffer, ByteString startKey, long version) {
Expand Down
173 changes: 168 additions & 5 deletions src/test/java/org/tikv/common/KVMockServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.key.Key;
import org.tikv.common.meta.TiTimestamp;
import org.tikv.common.region.TiRegion;
import org.tikv.kvproto.Coprocessor;
import org.tikv.kvproto.Errorpb;
Expand All @@ -67,6 +68,10 @@ public class KVMockServer extends TikvGrpc.TikvImplBase {

private final Map<Key, Supplier<Kvrpcpb.KeyError.Builder>> keyErrMap = new HashMap<>();

private final Map<Key, Supplier<Kvrpcpb.LockInfo.Builder>> lockMap = new HashMap<>();
private final Map<Long, Supplier<Kvrpcpb.CheckTxnStatusResponse.Builder>> txnStatusMap =
new HashMap<>();

// for KV error
public static final int ABORT = 1;
public static final int RETRY = 2;
Expand Down Expand Up @@ -117,9 +122,68 @@ public void putError(String key, Supplier<Errorpb.Error.Builder> builder) {
regionErrMap.put(toRawKey(key.getBytes(StandardCharsets.UTF_8)), builder);
}

public void removeError(String key) {
regionErrMap.remove(toRawKey(key.getBytes(StandardCharsets.UTF_8)));
}

// putWithLock is used to "prewrite" key-value without "commit"
public void putWithLock(
ByteString key, ByteString value, ByteString primaryKey, Long startTs, Long ttl) {
put(key, value);

Kvrpcpb.LockInfo.Builder lock =
Kvrpcpb.LockInfo.newBuilder()
.setPrimaryLock(primaryKey)
.setLockVersion(startTs)
.setKey(key)
.setLockTtl(ttl);
lockMap.put(toRawKey(key), () -> lock);
}

public void removeLock(ByteString key) {
lockMap.remove(toRawKey(key));
}

public boolean hasLock(ByteString key) {
return lockMap.containsKey(toRawKey(key));
}

// putTxnStatus is used to save transaction status
// commitTs > 0: committed
// commitTs == 0 && key is empty: rollback
// commitTs == 0 && key not empty: locked by key
public void putTxnStatus(Long startTs, Long commitTs, ByteString key) {
if (commitTs > 0 || (commitTs == 0 && key.isEmpty())) { // committed || rollback
Kvrpcpb.CheckTxnStatusResponse.Builder txnStatus =
Kvrpcpb.CheckTxnStatusResponse.newBuilder()
.setCommitVersion(commitTs)
.setLockTtl(0)
.setAction(Kvrpcpb.Action.NoAction);
txnStatusMap.put(startTs, () -> txnStatus);
} else { // locked
Kvrpcpb.LockInfo.Builder lock = lockMap.get(toRawKey(key)).get();
Kvrpcpb.CheckTxnStatusResponse.Builder txnStatus =
Kvrpcpb.CheckTxnStatusResponse.newBuilder()
.setCommitVersion(commitTs)
.setLockTtl(lock.getLockTtl())
.setAction(Kvrpcpb.Action.NoAction)
.setLockInfo(lock);
txnStatusMap.put(startTs, () -> txnStatus);
}
}

// putTxnStatus is used to save transaction status
// commitTs > 0: committed
// commitTs == 0: rollback
public void putTxnStatus(Long startTs, Long commitTs) {
putTxnStatus(startTs, commitTs, ByteString.EMPTY);
}

public void clearAllMap() {
dataMap.clear();
regionErrMap.clear();
lockMap.clear();
txnStatusMap.clear();
}

private Errorpb.Error verifyContext(Context context) throws Exception {
Expand Down Expand Up @@ -255,9 +319,12 @@ public void kvGet(
return;
}

Supplier<Kvrpcpb.LockInfo.Builder> lock = lockMap.get(key);
Supplier<Kvrpcpb.KeyError.Builder> errProvider = keyErrMap.remove(key);
if (errProvider != null) {
builder.setError(errProvider.get().build());
} else if (lock != null) {
builder.setError(Kvrpcpb.KeyError.newBuilder().setLocked(lock.get()));
} else {
ByteString value = dataMap.get(key);
builder.setValue(value);
Expand Down Expand Up @@ -299,11 +366,17 @@ public void kvScan(
kvs.entrySet()
.stream()
.map(
kv ->
Kvrpcpb.KvPair.newBuilder()
.setKey(kv.getKey().toByteString())
.setValue(kv.getValue())
.build())
kv -> {
Kvrpcpb.KvPair.Builder kvBuilder =
Kvrpcpb.KvPair.newBuilder()
.setKey(kv.getKey().toByteString())
.setValue(kv.getValue());
Supplier<Kvrpcpb.LockInfo.Builder> lock = lockMap.get(kv.getKey());
if (lock != null) {
kvBuilder.setError(Kvrpcpb.KeyError.newBuilder().setLocked(lock.get()));
}
return kvBuilder.build();
})
.collect(Collectors.toList()));
}
responseObserver.onNext(builder.build());
Expand Down Expand Up @@ -354,6 +427,96 @@ public void kvBatchGet(
}
}

@Override
public void kvCheckTxnStatus(
org.tikv.kvproto.Kvrpcpb.CheckTxnStatusRequest request,
io.grpc.stub.StreamObserver<org.tikv.kvproto.Kvrpcpb.CheckTxnStatusResponse>
responseObserver) {
logger.info("KVMockServer.kvCheckTxnStatus");
try {
Long startTs = request.getLockTs();
Long currentTs = request.getCurrentTs();
logger.info("kvCheckTxnStatus for txn: " + startTs);
Kvrpcpb.CheckTxnStatusResponse.Builder builder = Kvrpcpb.CheckTxnStatusResponse.newBuilder();

Error e = verifyContext(request.getContext());
if (e != null) {
responseObserver.onNext(builder.setRegionError(e).build());
responseObserver.onCompleted();
return;
}

Supplier<Kvrpcpb.CheckTxnStatusResponse.Builder> txnStatus = txnStatusMap.get(startTs);
if (txnStatus != null) {
Kvrpcpb.CheckTxnStatusResponse resp = txnStatus.get().build();
if (resp.getCommitVersion() == 0
&& resp.getLockTtl() > 0
&& TiTimestamp.extractPhysical(startTs) + resp.getLockInfo().getLockTtl()
< TiTimestamp.extractPhysical(currentTs)) {
ByteString key = resp.getLockInfo().getKey();
logger.info(
String.format(
"kvCheckTxnStatus rollback expired txn: %d, remove lock: %s",
startTs, key.toStringUtf8()));
removeLock(key);
putTxnStatus(startTs, 0L, ByteString.EMPTY);
resp = txnStatusMap.get(startTs).get().build();
}
logger.info("kvCheckTxnStatus resp: " + resp);
responseObserver.onNext(resp);
} else {
builder.setError(
Kvrpcpb.KeyError.newBuilder()
.setTxnNotFound(
Kvrpcpb.TxnNotFound.newBuilder()
.setPrimaryKey(request.getPrimaryKey())
.setStartTs(startTs)));
logger.info("kvCheckTxnStatus, TxnNotFound");
responseObserver.onNext(builder.build());
}
responseObserver.onCompleted();
} catch (Exception e) {
logger.error("kvCheckTxnStatus error: " + e);
responseObserver.onError(Status.INTERNAL.asRuntimeException());
}
}

@Override
public void kvResolveLock(
org.tikv.kvproto.Kvrpcpb.ResolveLockRequest request,
io.grpc.stub.StreamObserver<org.tikv.kvproto.Kvrpcpb.ResolveLockResponse> responseObserver) {
logger.info("KVMockServer.kvResolveLock");
try {
Long startTs = request.getStartVersion();
Long commitTs = request.getCommitVersion();
logger.info(
String.format(
"kvResolveLock for txn: %d, commitTs: %d, keys: %d",
startTs, commitTs, request.getKeysCount()));
Kvrpcpb.ResolveLockResponse.Builder builder = Kvrpcpb.ResolveLockResponse.newBuilder();

Error e = verifyContext(request.getContext());
if (e != null) {
responseObserver.onNext(builder.setRegionError(e).build());
responseObserver.onCompleted();
return;
}

if (request.getKeysCount() == 0) {
lockMap.entrySet().removeIf(entry -> entry.getValue().get().getLockVersion() == startTs);
} else {
for (int i = 0; i < request.getKeysCount(); i++) {
removeLock(request.getKeys(i));
}
}

responseObserver.onNext(builder.build());
responseObserver.onCompleted();
} catch (Exception e) {
responseObserver.onError(Status.INTERNAL.asRuntimeException());
}
}

@Override
public void coprocessor(
org.tikv.kvproto.Coprocessor.Request requestWrap,
Expand Down
6 changes: 4 additions & 2 deletions src/test/java/org/tikv/common/MockServerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class MockServerTest extends PDMockServerTest {
public void setup() throws IOException {
super.setup();

port = GrpcUtils.getFreePort();

Metapb.Region r =
Metapb.Region.newBuilder()
.setRegionEpoch(Metapb.RegionEpoch.newBuilder().setConfVer(1).setVersion(2))
Expand All @@ -51,7 +53,7 @@ public void setup() throws IOException {
List<Metapb.Store> s =
ImmutableList.of(
Metapb.Store.newBuilder()
.setAddress("localhost:1234")
.setAddress(LOCAL_ADDR + ":" + port)
.setVersion("5.0.0")
.setId(13)
.build());
Expand All @@ -70,6 +72,6 @@ public void setup() throws IOException {
(request) -> Pdpb.GetStoreResponse.newBuilder().setStore(store).build());
}
server = new KVMockServer();
port = server.start(region);
server.start(region, port);
}
}
7 changes: 5 additions & 2 deletions src/test/java/org/tikv/common/PDClientMockTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,12 @@ public void testSwitchLeader() throws Exception {
@Test
public void testTso() throws Exception {
try (PDClient client = session.getPDClient()) {
Long current = System.currentTimeMillis();
TiTimestamp ts = client.getTimestamp(defaultBackOff());
// Test pdServer is set to generate physical == logical + 1
assertEquals(ts.getPhysical(), ts.getLogical() + 1);
// Test pdServer is set to generate physical to current, logical to 1
assertTrue(ts.getPhysical() >= current);
assertTrue(ts.getPhysical() < current + 100);
assertEquals(ts.getLogical(), 1);
}
}

Expand Down
16 changes: 13 additions & 3 deletions src/test/java/org/tikv/common/PDMockServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,17 @@ public void getMembers(GetMembersRequest request, StreamObserver<GetMembersRespo
@Override
public StreamObserver<TsoRequest> tso(StreamObserver<TsoResponse> resp) {
return new StreamObserver<TsoRequest>() {
private int physical = 1;
private int logical = 0;
private long physical = System.currentTimeMillis();
private long logical = 0;

private void updateTso() {
logical++;
if (logical >= (1 << 18)) {
logical = 0;
physical++;
}
physical = Math.max(physical, System.currentTimeMillis());
}

@Override
public void onNext(TsoRequest value) {}
Expand All @@ -86,7 +95,8 @@ public void onError(Throwable t) {}

@Override
public void onCompleted() {
resp.onNext(GrpcUtils.makeTsoResponse(clusterId, physical++, logical++));
updateTso();
resp.onNext(GrpcUtils.makeTsoResponse(clusterId, physical, logical));
resp.onCompleted();
}
};
Expand Down
Loading