Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 46 additions & 29 deletions src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -277,19 +277,27 @@ private Metapb.Peer switchLeaderStore(BackOffer backOffer) {
List<SwitchLeaderTask> responses = new LinkedList<>();
for (Metapb.Peer peer : region.getFollowerList()) {
ByteString key = region.getStartKey();
TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer);
ManagedChannel channel =
channelFactory.getChannel(
peerStore.getAddress(), regionManager.getPDClient().getHostMapping());
TikvGrpc.TikvFutureStub stub =
TikvGrpc.newFutureStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS);
Kvrpcpb.RawGetRequest rawGetRequest =
Kvrpcpb.RawGetRequest.newBuilder()
.setContext(region.getReplicaContext(peer))
.setKey(key)
.build();
ListenableFuture<Kvrpcpb.RawGetResponse> task = stub.rawGet(rawGetRequest);
responses.add(new SwitchLeaderTask(task, peer));
try {
TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer);
ManagedChannel channel =
channelFactory.getChannel(
peerStore.getAddress(), regionManager.getPDClient().getHostMapping());
TikvGrpc.TikvFutureStub stub =
TikvGrpc.newFutureStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS);
Kvrpcpb.RawGetRequest rawGetRequest =
Kvrpcpb.RawGetRequest.newBuilder()
.setContext(region.getReplicaContext(peer))
.setKey(key)
.build();
ListenableFuture<Kvrpcpb.RawGetResponse> task = stub.rawGet(rawGetRequest);
responses.add(new SwitchLeaderTask(task, peer));
} catch (Exception e) {
logger.warn(
"switch region[{}] leader store to {} failed: {}",
region.getId(),
peer.getStoreId(),
e);
}
}
while (true) {
try {
Expand Down Expand Up @@ -328,22 +336,31 @@ private TiStore switchProxyStore(BackOffer backOffer) {
List<ForwardCheckTask> responses = new LinkedList<>();
for (Metapb.Peer peer : region.getFollowerList()) {
ByteString key = region.getStartKey();
TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer);
ManagedChannel channel =
channelFactory.getChannel(
peerStore.getAddress(), regionManager.getPDClient().getHostMapping());
TikvGrpc.TikvFutureStub stub =
TikvGrpc.newFutureStub(channel).withDeadlineAfter(forwardTimeout, TimeUnit.MILLISECONDS);
Metadata header = new Metadata();
header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress());
Kvrpcpb.RawGetRequest rawGetRequest =
Kvrpcpb.RawGetRequest.newBuilder()
.setContext(region.getReplicaContext(region.getLeader()))
.setKey(key)
.build();
ListenableFuture<Kvrpcpb.RawGetResponse> task =
MetadataUtils.attachHeaders(stub, header).rawGet(rawGetRequest);
responses.add(new ForwardCheckTask(task, peerStore.getStore()));
try {
TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer);
ManagedChannel channel =
channelFactory.getChannel(
peerStore.getAddress(), regionManager.getPDClient().getHostMapping());
TikvGrpc.TikvFutureStub stub =
TikvGrpc.newFutureStub(channel)
.withDeadlineAfter(forwardTimeout, TimeUnit.MILLISECONDS);
Metadata header = new Metadata();
header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress());
Kvrpcpb.RawGetRequest rawGetRequest =
Kvrpcpb.RawGetRequest.newBuilder()
.setContext(region.getReplicaContext(region.getLeader()))
.setKey(key)
.build();
ListenableFuture<Kvrpcpb.RawGetResponse> task =
MetadataUtils.attachHeaders(stub, header).rawGet(rawGetRequest);
responses.add(new ForwardCheckTask(task, peerStore.getStore()));
} catch (Exception e) {
logger.warn(
"switch region[{}] leader store to {} failed: {}",
region.getId(),
peer.getStoreId(),
e);
}
}
while (true) {
try {
Expand Down
29 changes: 15 additions & 14 deletions src/main/java/org/tikv/common/region/RegionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.ReadOnlyPDClient;
Expand All @@ -47,6 +46,7 @@

@SuppressWarnings("UnstableApiUsage")
public class RegionManager {

private static final Logger logger = LoggerFactory.getLogger(RegionManager.class);
public static final Histogram GET_REGION_BY_KEY_REQUEST_LATENCY =
HistogramUtils.buildDuration()
Expand Down Expand Up @@ -205,22 +205,23 @@ public Pair<TiRegion, TiStore> getRegionStorePairByKey(
}

public TiRegion createRegion(Metapb.Region region, BackOffer backOffer) {
List<Metapb.Peer> peers = region.getPeersList();
List<TiStore> stores = getRegionStore(peers, backOffer);
return new TiRegion(conf, region, null, peers, stores);
return createRegion(region, null, backOffer);
}

private TiRegion createRegion(Metapb.Region region, Metapb.Peer leader, BackOffer backOffer) {
List<Metapb.Peer> peers = region.getPeersList();
List<TiStore> stores = getRegionStore(peers, backOffer);
return new TiRegion(conf, region, leader, peers, stores);
}

private List<TiStore> getRegionStore(List<Metapb.Peer> peers, BackOffer backOffer) {
return peers
.stream()
.map(p -> getStoreById(p.getStoreId(), backOffer))
.collect(Collectors.toList());
List<Metapb.Peer> peers = new ArrayList<>();
List<TiStore> stores = new ArrayList<>();
for (Metapb.Peer peer : region.getPeersList()) {
try {
stores.add(getStoreById(peer.getStoreId(), backOffer));
peers.add(peer);
} catch (Exception e) {
logger.warn("Store {} not found: {}", peer.getStoreId(), e.toString());
}
}
Metapb.Region newRegion =
Metapb.Region.newBuilder().mergeFrom(region).clearPeers().addAllPeers(peers).build();
return new TiRegion(conf, newRegion, leader, peers, stores);
}

private TiStore getStoreByIdWithBackOff(long id, BackOffer backOffer) {
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/org/tikv/common/PDClientMockTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

public class PDClientMockTest extends PDMockServerTest {

private static final String LOCAL_ADDR_IPV6 = "[::]";
private static final String LOCAL_ADDR_IPV6 = "[::1]";
public static final String HTTP = "http://";

@Test
Expand Down
6 changes: 4 additions & 2 deletions src/test/java/org/tikv/common/PDMockServerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void setup() throws IOException {

void setup(String addr) throws IOException {
int basePort;
try (ServerSocket s = new ServerSocket(51820)) {
try (ServerSocket s = new ServerSocket(0)) {
basePort = s.getLocalPort();
}

Expand All @@ -54,9 +54,11 @@ void setup(String addr) throws IOException {
GrpcUtils.makeMember(2, "http://" + addr + ":" + (basePort + 1)),
GrpcUtils.makeMember(3, "http://" + addr + ":" + (basePort + 2))));
pdServers.add(server);
if (i == 0) {
leader = server;
}
}

leader = pdServers.get(0);
TiConfiguration conf = TiConfiguration.createDefault(addr + ":" + leader.port);
conf.setKvMode("RAW");
conf.setWarmUpEnable(false);
Expand Down
35 changes: 34 additions & 1 deletion src/test/java/org/tikv/common/SeekLeaderStoreTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
package org.tikv.common;

import com.google.protobuf.ByteString;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Test;
import org.tikv.common.KVMockServer.State;
import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.Metapb.StoreState;
import org.tikv.kvproto.Pdpb;
import org.tikv.raw.RawKVClient;

public class SeekLeaderStoreTest extends MockThreeStoresTest {
Expand All @@ -34,12 +38,41 @@ public void testSeekLeader() {
RawKVClient client = createClient();
ByteString key = ByteString.copyFromUtf8("key");
ByteString value = ByteString.copyFromUtf8("value");

put(key, value);

client.put(key, value);
Assert.assertEquals(value, client.get(key).get());
servers.get(0).setState(State.Fail);
servers.get(1).setRegion(region.switchPeer(stores.get(1).getId()));
Assert.assertEquals(value, client.get(key).get());

remove(key, value);
}

@Test
public void testSeekLeaderMeetInvalidStore() {
RawKVClient client = createClient();
ByteString key = ByteString.copyFromUtf8("key");
ByteString value = ByteString.copyFromUtf8("value");

put(key, value);

servers.get(0).setState(State.Fail);
servers.get(2).setRegion(region.switchPeer(stores.get(2).getId()));

AtomicInteger i = new AtomicInteger(0);
leader.addGetStoreListener(
request -> {
Metapb.Store.Builder storeBuilder =
Metapb.Store.newBuilder().mergeFrom(stores.get((int) request.getStoreId() - 1));
if (request.getStoreId() == 0x2 && i.incrementAndGet() > 0) {
storeBuilder.setState(StoreState.Tombstone);
}
return Pdpb.GetStoreResponse.newBuilder().setStore(storeBuilder.build()).build();
});

Assert.assertEquals(value, client.get(key).get());

remove(key, value);
}
}