Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 47 additions & 29 deletions src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ private Boolean seekLeaderStore(BackOffer backOffer) {
// switch to leader store
store = currentLeaderStore;
updateClientStub();
return true;
}
return false;
}
Expand Down Expand Up @@ -275,19 +276,27 @@ private Metapb.Peer switchLeaderStore(BackOffer backOffer) {
List<SwitchLeaderTask> responses = new LinkedList<>();
for (Metapb.Peer peer : region.getFollowerList()) {
ByteString key = region.getStartKey();
TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer);
ManagedChannel channel =
channelFactory.getChannel(
peerStore.getAddress(), regionManager.getPDClient().getHostMapping());
TikvGrpc.TikvFutureStub stub =
TikvGrpc.newFutureStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS);
Kvrpcpb.RawGetRequest rawGetRequest =
Kvrpcpb.RawGetRequest.newBuilder()
.setContext(region.getReplicaContext(peer))
.setKey(key)
.build();
ListenableFuture<Kvrpcpb.RawGetResponse> task = stub.rawGet(rawGetRequest);
responses.add(new SwitchLeaderTask(task, peer));
try {
TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer);
ManagedChannel channel =
channelFactory.getChannel(
peerStore.getAddress(), regionManager.getPDClient().getHostMapping());
TikvGrpc.TikvFutureStub stub =
TikvGrpc.newFutureStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS);
Kvrpcpb.RawGetRequest rawGetRequest =
Kvrpcpb.RawGetRequest.newBuilder()
.setContext(region.getReplicaContext(peer))
.setKey(key)
.build();
ListenableFuture<Kvrpcpb.RawGetResponse> task = stub.rawGet(rawGetRequest);
responses.add(new SwitchLeaderTask(task, peer));
} catch (Exception e) {
logger.warn(
"switch region[{}] leader store to {} failed: {}",
region.getId(),
peer.getStoreId(),
e);
}
}
while (true) {
try {
Expand Down Expand Up @@ -326,22 +335,31 @@ private TiStore switchProxyStore(BackOffer backOffer) {
List<ForwardCheckTask> responses = new LinkedList<>();
for (Metapb.Peer peer : region.getFollowerList()) {
ByteString key = region.getStartKey();
TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer);
ManagedChannel channel =
channelFactory.getChannel(
peerStore.getAddress(), regionManager.getPDClient().getHostMapping());
TikvGrpc.TikvFutureStub stub =
TikvGrpc.newFutureStub(channel).withDeadlineAfter(forwardTimeout, TimeUnit.MILLISECONDS);
Metadata header = new Metadata();
header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress());
Kvrpcpb.RawGetRequest rawGetRequest =
Kvrpcpb.RawGetRequest.newBuilder()
.setContext(region.getReplicaContext(region.getLeader()))
.setKey(key)
.build();
ListenableFuture<Kvrpcpb.RawGetResponse> task =
MetadataUtils.attachHeaders(stub, header).rawGet(rawGetRequest);
responses.add(new ForwardCheckTask(task, peerStore.getStore()));
try {
TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer);
ManagedChannel channel =
channelFactory.getChannel(
peerStore.getAddress(), regionManager.getPDClient().getHostMapping());
TikvGrpc.TikvFutureStub stub =
TikvGrpc.newFutureStub(channel)
.withDeadlineAfter(forwardTimeout, TimeUnit.MILLISECONDS);
Metadata header = new Metadata();
header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress());
Kvrpcpb.RawGetRequest rawGetRequest =
Kvrpcpb.RawGetRequest.newBuilder()
.setContext(region.getReplicaContext(region.getLeader()))
.setKey(key)
.build();
ListenableFuture<Kvrpcpb.RawGetResponse> task =
MetadataUtils.attachHeaders(stub, header).rawGet(rawGetRequest);
responses.add(new ForwardCheckTask(task, peerStore.getStore()));
} catch (Exception e) {
logger.warn(
"switch region[{}] leader store to {} failed: {}",
region.getId(),
peer.getStoreId(),
e);
}
}
while (true) {
try {
Expand Down
29 changes: 15 additions & 14 deletions src/main/java/org/tikv/common/region/RegionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.ReadOnlyPDClient;
Expand All @@ -47,6 +46,7 @@

@SuppressWarnings("UnstableApiUsage")
public class RegionManager {

private static final Logger logger = LoggerFactory.getLogger(RegionManager.class);
public static final Histogram GET_REGION_BY_KEY_REQUEST_LATENCY =
HistogramUtils.buildDuration()
Expand Down Expand Up @@ -205,22 +205,23 @@ public Pair<TiRegion, TiStore> getRegionStorePairByKey(
}

public TiRegion createRegion(Metapb.Region region, BackOffer backOffer) {
List<Metapb.Peer> peers = region.getPeersList();
List<TiStore> stores = getRegionStore(peers, backOffer);
return new TiRegion(conf, region, null, peers, stores);
return createRegion(region, null, backOffer);
}

private TiRegion createRegion(Metapb.Region region, Metapb.Peer leader, BackOffer backOffer) {
List<Metapb.Peer> peers = region.getPeersList();
List<TiStore> stores = getRegionStore(peers, backOffer);
return new TiRegion(conf, region, leader, peers, stores);
}

private List<TiStore> getRegionStore(List<Metapb.Peer> peers, BackOffer backOffer) {
return peers
.stream()
.map(p -> getStoreById(p.getStoreId(), backOffer))
.collect(Collectors.toList());
List<Metapb.Peer> peers = new ArrayList<>();
List<TiStore> stores = new ArrayList<>();
for (Metapb.Peer peer : region.getPeersList()) {
try {
stores.add(getStoreById(peer.getStoreId(), backOffer));
peers.add(peer);
} catch (Exception e) {
logger.warn("Store {} not found: {}", peer.getStoreId(), e.toString());
}
}
Metapb.Region newRegion =
Metapb.Region.newBuilder().mergeFrom(region).clearPeers().addAllPeers(peers).build();
return new TiRegion(conf, newRegion, leader, peers, stores);
}

private TiStore getStoreByIdWithBackOff(long id, BackOffer backOffer) {
Expand Down
67 changes: 61 additions & 6 deletions src/test/java/org/tikv/common/KVMockServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,22 @@
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.Status;
import io.grpc.health.v1.HealthCheckRequest;
import io.grpc.health.v1.HealthCheckResponse;
import io.grpc.health.v1.HealthCheckResponse.ServingStatus;
import io.grpc.health.v1.HealthGrpc.HealthImplBase;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.key.Key;
import org.tikv.common.region.TiRegion;
import org.tikv.kvproto.Coprocessor;
Expand All @@ -45,9 +57,11 @@

public class KVMockServer extends TikvGrpc.TikvImplBase {

private static final Logger logger = LoggerFactory.getLogger(KVMockServer.class);
private int port;
private Server server;
private TiRegion region;
private State state = State.Normal;
private final TreeMap<Key, ByteString> dataMap = new TreeMap<>();
private final Map<ByteString, Integer> errorMap = new HashMap<>();

Expand All @@ -64,10 +78,23 @@ public class KVMockServer extends TikvGrpc.TikvImplBase {
public static final int STORE_NOT_MATCH = 9;
public static final int RAFT_ENTRY_TOO_LARGE = 10;

public enum State {
Normal,
Fail
}

public void setState(State state) {
this.state = state;
}

public int getPort() {
return port;
}

public void setRegion(TiRegion region) {
this.region = region;
}

public void put(ByteString key, ByteString value) {
dataMap.put(toRawKey(key), value);
}
Expand Down Expand Up @@ -97,7 +124,7 @@ private void verifyContext(Context context) throws Exception {
if (context.getRegionId() != region.getId()
|| !context.getRegionEpoch().equals(region.getRegionEpoch())
|| !context.getPeer().equals(region.getLeader())) {
throw new Exception();
throw new Exception("context doesn't match");
}
}

Expand All @@ -106,6 +133,11 @@ public void rawGet(
org.tikv.kvproto.Kvrpcpb.RawGetRequest request,
io.grpc.stub.StreamObserver<org.tikv.kvproto.Kvrpcpb.RawGetResponse> responseObserver) {
try {
switch (state) {
case Fail:
throw new Exception(State.Fail.toString());
default:
}
verifyContext(request.getContext());
ByteString key = request.getKey();

Expand All @@ -116,7 +148,12 @@ public void rawGet(
setErrorInfo(errorCode, errBuilder);
builder.setRegionError(errBuilder.build());
} else {
builder.setValue(dataMap.get(toRawKey(key)));
Key rawKey = toRawKey(key);
ByteString value = dataMap.get(rawKey);
if (value == null) {
value = ByteString.EMPTY;
}
builder.setValue(value);
}
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
Expand All @@ -139,7 +176,6 @@ public void rawPut(
if (errorCode != null) {
setErrorInfo(errorCode, errBuilder);
builder.setRegionError(errBuilder.build());
// builder.setError("");
}
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
Expand Down Expand Up @@ -349,14 +385,33 @@ public void coprocessor(
}

public int start(TiRegion region) throws IOException {
int port;
try (ServerSocket s = new ServerSocket(0)) {
port = s.getLocalPort();
}
server = ServerBuilder.forPort(port).addService(this).build().start();
start(region, port);
return port;
}

private static class HealCheck extends HealthImplBase {

@Override
public void check(
HealthCheckRequest request, StreamObserver<HealthCheckResponse> responseObserver) {
responseObserver.onNext(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVING).build());
responseObserver.onCompleted();
}
}

public void start(TiRegion region, int port) throws IOException {
this.port = port;
this.region = region;

logger.info("start mock server on port: " + port);
server =
ServerBuilder.forPort(port).addService(new HealCheck()).addService(this).build().start();
Runtime.getRuntime().addShutdownHook(new Thread(KVMockServer.this::stop));
return port;
}

public void stop() {
Expand Down
11 changes: 7 additions & 4 deletions src/test/java/org/tikv/common/MockServerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@
import org.tikv.kvproto.Pdpb;

public class MockServerTest extends PDMockServerTest {

public KVMockServer server;
public int port;
public TiRegion region;

@Before
@Override
public void setUp() throws IOException {
super.setUp();
public void setup() throws IOException {
super.setup();

Metapb.Region r =
Metapb.Region.newBuilder()
Expand All @@ -62,9 +63,11 @@ public void setUp() throws IOException {
r.getPeers(0),
r.getPeersList(),
s.stream().map(TiStore::new).collect(Collectors.toList()));
pdServer.addGetRegionResp(Pdpb.GetRegionResponse.newBuilder().setRegion(r).build());
leader.addGetRegionListener(
request -> Pdpb.GetRegionResponse.newBuilder().setRegion(r).build());
for (Metapb.Store store : s) {
pdServer.addGetStoreResp(Pdpb.GetStoreResponse.newBuilder().setStore(store).build());
leader.addGetStoreListener(
(request) -> Pdpb.GetStoreResponse.newBuilder().setStore(store).build());
}
server = new KVMockServer();
port = server.start(region);
Expand Down
Loading