Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions src/main/java/org/tikv/common/TiSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.ByteString;
import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
Expand All @@ -34,8 +39,15 @@
import org.tikv.common.importer.SwitchTiKVModeClient;
import org.tikv.common.key.Key;
import org.tikv.common.meta.TiTimestamp;
import org.tikv.common.region.*;
import org.tikv.common.util.*;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.TiRegion;
import org.tikv.common.region.TiStore;
import org.tikv.common.util.BackOffFunction;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ChannelFactory;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.common.util.Pair;
import org.tikv.kvproto.Errorpb;
import org.tikv.kvproto.ImportSstpb;
import org.tikv.kvproto.Metapb;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.AbstractGRPCClient;
Expand All @@ -48,6 +48,7 @@
public abstract class AbstractRegionStoreClient
extends AbstractGRPCClient<TikvGrpc.TikvBlockingStub, TikvGrpc.TikvFutureStub>
implements RegionErrorReceiver {

private static final Logger logger = LoggerFactory.getLogger(AbstractRegionStoreClient.class);

public static final Histogram SEEK_LEADER_STORE_DURATION =
Expand Down Expand Up @@ -230,6 +231,7 @@ private Boolean seekLeaderStore(BackOffer backOffer) {
// switch to leader store
store = currentLeaderStore;
updateClientStub();
return true;
}
return false;
}
Expand Down Expand Up @@ -374,6 +376,7 @@ private TiStore switchProxyStore(BackOffer backOffer) {
}

private static class SwitchLeaderTask {

private final ListenableFuture<Kvrpcpb.RawGetResponse> task;
private final Metapb.Peer peer;

Expand All @@ -384,6 +387,7 @@ private SwitchLeaderTask(ListenableFuture<Kvrpcpb.RawGetResponse> task, Metapb.P
}

private static class ForwardCheckTask {

private final ListenableFuture<Kvrpcpb.RawGetResponse> task;
private final Metapb.Store store;

Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/tikv/common/region/TiRegion.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.tikv.kvproto.Metapb.Region;

public class TiRegion implements Serializable {

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can do code format in another PR

private static final Logger logger = LoggerFactory.getLogger(TiRegion.class);

private final Region meta;
Expand Down Expand Up @@ -269,6 +270,7 @@ public String toString() {
}

public class RegionVerID {

final long id;
final long confVer;
final long ver;
Expand Down
67 changes: 61 additions & 6 deletions src/test/java/org/tikv/common/KVMockServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,22 @@
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.Status;
import io.grpc.health.v1.HealthCheckRequest;
import io.grpc.health.v1.HealthCheckResponse;
import io.grpc.health.v1.HealthCheckResponse.ServingStatus;
import io.grpc.health.v1.HealthGrpc.HealthImplBase;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.key.Key;
import org.tikv.common.region.TiRegion;
import org.tikv.kvproto.Coprocessor;
Expand All @@ -45,9 +57,11 @@

public class KVMockServer extends TikvGrpc.TikvImplBase {

private static final Logger logger = LoggerFactory.getLogger(KVMockServer.class);
private int port;
private Server server;
private TiRegion region;
private State state = State.Normal;
private final TreeMap<Key, ByteString> dataMap = new TreeMap<>();
private final Map<ByteString, Integer> errorMap = new HashMap<>();

Expand All @@ -64,10 +78,23 @@ public class KVMockServer extends TikvGrpc.TikvImplBase {
public static final int STORE_NOT_MATCH = 9;
public static final int RAFT_ENTRY_TOO_LARGE = 10;

public enum State {
Normal,
Fail
}

public void setState(State state) {
this.state = state;
}

public int getPort() {
return port;
}

public void setRegion(TiRegion region) {
this.region = region;
}

public void put(ByteString key, ByteString value) {
dataMap.put(toRawKey(key), value);
}
Expand Down Expand Up @@ -97,7 +124,7 @@ private void verifyContext(Context context) throws Exception {
if (context.getRegionId() != region.getId()
|| !context.getRegionEpoch().equals(region.getRegionEpoch())
|| !context.getPeer().equals(region.getLeader())) {
throw new Exception();
throw new Exception("context doesn't match");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be better to specifically point out the mismatched field in context.

}
}

Expand All @@ -106,6 +133,11 @@ public void rawGet(
org.tikv.kvproto.Kvrpcpb.RawGetRequest request,
io.grpc.stub.StreamObserver<org.tikv.kvproto.Kvrpcpb.RawGetResponse> responseObserver) {
try {
switch (state) {
case Fail:
throw new Exception(State.Fail.toString());
default:
}
verifyContext(request.getContext());
ByteString key = request.getKey();

Expand All @@ -116,7 +148,12 @@ public void rawGet(
setErrorInfo(errorCode, errBuilder);
builder.setRegionError(errBuilder.build());
} else {
builder.setValue(dataMap.get(toRawKey(key)));
Key rawKey = toRawKey(key);
ByteString value = dataMap.get(rawKey);
if (value == null) {
value = ByteString.EMPTY;
}
builder.setValue(value);
}
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
Expand All @@ -139,7 +176,6 @@ public void rawPut(
if (errorCode != null) {
setErrorInfo(errorCode, errBuilder);
builder.setRegionError(errBuilder.build());
// builder.setError("");
}
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
Expand Down Expand Up @@ -349,14 +385,33 @@ public void coprocessor(
}

public int start(TiRegion region) throws IOException {
int port;
try (ServerSocket s = new ServerSocket(0)) {
port = s.getLocalPort();
}
server = ServerBuilder.forPort(port).addService(this).build().start();
start(region, port);
return port;
}

private static class HealCheck extends HealthImplBase {

@Override
public void check(
HealthCheckRequest request, StreamObserver<HealthCheckResponse> responseObserver) {
responseObserver.onNext(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVING).build());
responseObserver.onCompleted();
}
}

public void start(TiRegion region, int port) throws IOException {
this.port = port;
this.region = region;

logger.info("start mock server on port: " + port);
server =
ServerBuilder.forPort(port).addService(new HealCheck()).addService(this).build().start();
Runtime.getRuntime().addShutdownHook(new Thread(KVMockServer.this::stop));
return port;
}

public void stop() {
Expand Down
11 changes: 7 additions & 4 deletions src/test/java/org/tikv/common/MockServerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@
import org.tikv.kvproto.Pdpb;

public class MockServerTest extends PDMockServerTest {

public KVMockServer server;
public int port;
public TiRegion region;

@Before
@Override
public void setUp() throws IOException {
super.setUp();
public void setup() throws IOException {
super.setup();

Metapb.Region r =
Metapb.Region.newBuilder()
Expand All @@ -62,9 +63,11 @@ public void setUp() throws IOException {
r.getPeers(0),
r.getPeersList(),
s.stream().map(TiStore::new).collect(Collectors.toList()));
pdServer.addGetRegionResp(Pdpb.GetRegionResponse.newBuilder().setRegion(r).build());
leader.addGetRegionListener(
request -> Pdpb.GetRegionResponse.newBuilder().setRegion(r).build());
for (Metapb.Store store : s) {
pdServer.addGetStoreResp(Pdpb.GetStoreResponse.newBuilder().setStore(store).build());
leader.addGetStoreListener(
(request) -> Pdpb.GetStoreResponse.newBuilder().setStore(store).build());
}
server = new KVMockServer();
port = server.start(region);
Expand Down
129 changes: 129 additions & 0 deletions src/test/java/org/tikv/common/MockThreeStoresTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright 2022 TiKV Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.tikv.common;

import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.junit.After;
import org.junit.Before;
import org.tikv.common.region.TiRegion;
import org.tikv.common.region.TiStore;
import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.Pdpb;

public class MockThreeStoresTest extends PDMockServerTest {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename MockThreeStoresTest to MockTiKVClusterTest?
The store number can be config?

Copy link
Member Author

@iosmanthus iosmanthus Feb 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just want to keep it simple enough, for now, refactoring will be introduced in the future.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create an issue to describe this refactoring?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could I just add some comments to this class?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure


protected TiRegion region;
protected List<KVMockServer> servers = new ArrayList<>();
protected List<Metapb.Store> stores;

@Before
@Override
public void setup() throws IOException {
super.setup();

int basePort;
try (ServerSocket s = new ServerSocket(0)) {
basePort = s.getLocalPort();
}

ImmutableList<Metapb.Peer> peers =
ImmutableList.of(
Metapb.Peer.newBuilder().setId(0x1).setStoreId(0x1).build(),
Metapb.Peer.newBuilder().setId(0x2).setStoreId(0x2).build(),
Metapb.Peer.newBuilder().setId(0x3).setStoreId(0x3).build());

Metapb.Region region =
Metapb.Region.newBuilder()
.setRegionEpoch(Metapb.RegionEpoch.newBuilder().setConfVer(1).setVersion(2))
.setId(0xff)
.setStartKey(ByteString.EMPTY)
.setEndKey(ByteString.EMPTY)
.addAllPeers(peers)
.build();

stores =
ImmutableList.of(
Metapb.Store.newBuilder()
.setAddress("127.0.0.1:" + basePort)
.setVersion("5.0.0")
.setId(0x1)
.build(),
Metapb.Store.newBuilder()
.setAddress("127.0.0.1:" + (basePort + 1))
.setVersion("5.0.0")
.setId(0x2)
.build(),
Metapb.Store.newBuilder()
.setAddress("127.0.0.1:" + (basePort + 2))
.setVersion("5.0.0")
.setId(0x3)
.build());

for (PDMockServer server : pdServers) {
server.addGetRegionListener(
request ->
Pdpb.GetRegionResponse.newBuilder()
.setLeader(peers.get(0))
.setRegion(region)
.build());
server.addGetStoreListener(
(request) -> {
int i = (int) request.getStoreId() - 1;
return Pdpb.GetStoreResponse.newBuilder().setStore(stores.get(i)).build();
});
}

this.region =
new TiRegion(
session.getConf(),
region,
region.getPeers(0),
region.getPeersList(),
stores.stream().map(TiStore::new).collect(Collectors.toList()));
for (int i = 0; i < 3; i++) {
KVMockServer server = new KVMockServer();
server.start(this.region, basePort + i);
servers.add(server);
}
}

public void put(ByteString key, ByteString value) {
for (KVMockServer server : servers) {
server.put(key, value);
}
}

public void remove(ByteString key, ByteString value) {
for (KVMockServer server : servers) {
server.remove(key);
}
}

@After
public void tearDown() {
for (KVMockServer server : servers) {
server.stop();
}
}
}
Loading