diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index 5983b73cb1c..a481b04843d 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -230,6 +230,7 @@ private Boolean seekLeaderStore(BackOffer backOffer) { // switch to leader store store = currentLeaderStore; updateClientStub(); + return true; } return false; } @@ -275,19 +276,27 @@ private Metapb.Peer switchLeaderStore(BackOffer backOffer) { List responses = new LinkedList<>(); for (Metapb.Peer peer : region.getFollowerList()) { ByteString key = region.getStartKey(); - TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer); - ManagedChannel channel = - channelFactory.getChannel( - peerStore.getAddress(), regionManager.getPDClient().getHostMapping()); - TikvGrpc.TikvFutureStub stub = - TikvGrpc.newFutureStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS); - Kvrpcpb.RawGetRequest rawGetRequest = - Kvrpcpb.RawGetRequest.newBuilder() - .setContext(region.getReplicaContext(peer)) - .setKey(key) - .build(); - ListenableFuture task = stub.rawGet(rawGetRequest); - responses.add(new SwitchLeaderTask(task, peer)); + try { + TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer); + ManagedChannel channel = + channelFactory.getChannel( + peerStore.getAddress(), regionManager.getPDClient().getHostMapping()); + TikvGrpc.TikvFutureStub stub = + TikvGrpc.newFutureStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS); + Kvrpcpb.RawGetRequest rawGetRequest = + Kvrpcpb.RawGetRequest.newBuilder() + .setContext(region.getReplicaContext(peer)) + .setKey(key) + .build(); + ListenableFuture task = stub.rawGet(rawGetRequest); + responses.add(new SwitchLeaderTask(task, peer)); + } catch (Exception e) { + logger.warn( + "switch region[{}] leader store to {} failed: {}", + region.getId(), + peer.getStoreId(), + e); + } } while (true) { try { @@ -326,22 +335,31 @@ private TiStore switchProxyStore(BackOffer backOffer) { List responses = new LinkedList<>(); for (Metapb.Peer peer : region.getFollowerList()) { ByteString key = region.getStartKey(); - TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer); - ManagedChannel channel = - channelFactory.getChannel( - peerStore.getAddress(), regionManager.getPDClient().getHostMapping()); - TikvGrpc.TikvFutureStub stub = - TikvGrpc.newFutureStub(channel).withDeadlineAfter(forwardTimeout, TimeUnit.MILLISECONDS); - Metadata header = new Metadata(); - header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress()); - Kvrpcpb.RawGetRequest rawGetRequest = - Kvrpcpb.RawGetRequest.newBuilder() - .setContext(region.getReplicaContext(region.getLeader())) - .setKey(key) - .build(); - ListenableFuture task = - MetadataUtils.attachHeaders(stub, header).rawGet(rawGetRequest); - responses.add(new ForwardCheckTask(task, peerStore.getStore())); + try { + TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer); + ManagedChannel channel = + channelFactory.getChannel( + peerStore.getAddress(), regionManager.getPDClient().getHostMapping()); + TikvGrpc.TikvFutureStub stub = + TikvGrpc.newFutureStub(channel) + .withDeadlineAfter(forwardTimeout, TimeUnit.MILLISECONDS); + Metadata header = new Metadata(); + header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress()); + Kvrpcpb.RawGetRequest rawGetRequest = + Kvrpcpb.RawGetRequest.newBuilder() + .setContext(region.getReplicaContext(region.getLeader())) + .setKey(key) + .build(); + ListenableFuture task = + MetadataUtils.attachHeaders(stub, header).rawGet(rawGetRequest); + responses.add(new ForwardCheckTask(task, peerStore.getStore())); + } catch (Exception e) { + logger.warn( + "switch region[{}] leader store to {} failed: {}", + region.getId(), + peer.getStoreId(), + e); + } } while (true) { try { diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 35803c0d594..75a23f30a88 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -26,7 +26,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.ReadOnlyPDClient; @@ -47,6 +46,7 @@ @SuppressWarnings("UnstableApiUsage") public class RegionManager { + private static final Logger logger = LoggerFactory.getLogger(RegionManager.class); public static final Histogram GET_REGION_BY_KEY_REQUEST_LATENCY = HistogramUtils.buildDuration() @@ -205,22 +205,23 @@ public Pair getRegionStorePairByKey( } public TiRegion createRegion(Metapb.Region region, BackOffer backOffer) { - List peers = region.getPeersList(); - List stores = getRegionStore(peers, backOffer); - return new TiRegion(conf, region, null, peers, stores); + return createRegion(region, null, backOffer); } private TiRegion createRegion(Metapb.Region region, Metapb.Peer leader, BackOffer backOffer) { - List peers = region.getPeersList(); - List stores = getRegionStore(peers, backOffer); - return new TiRegion(conf, region, leader, peers, stores); - } - - private List getRegionStore(List peers, BackOffer backOffer) { - return peers - .stream() - .map(p -> getStoreById(p.getStoreId(), backOffer)) - .collect(Collectors.toList()); + List peers = new ArrayList<>(); + List stores = new ArrayList<>(); + for (Metapb.Peer peer : region.getPeersList()) { + try { + stores.add(getStoreById(peer.getStoreId(), backOffer)); + peers.add(peer); + } catch (Exception e) { + logger.warn("Store {} not found: {}", peer.getStoreId(), e.toString()); + } + } + Metapb.Region newRegion = + Metapb.Region.newBuilder().mergeFrom(region).clearPeers().addAllPeers(peers).build(); + return new TiRegion(conf, newRegion, leader, peers, stores); } private TiStore getStoreByIdWithBackOff(long id, BackOffer backOffer) { diff --git a/src/test/java/org/tikv/common/KVMockServer.java b/src/test/java/org/tikv/common/KVMockServer.java index 8032c9e613f..412d9dcfb43 100644 --- a/src/test/java/org/tikv/common/KVMockServer.java +++ b/src/test/java/org/tikv/common/KVMockServer.java @@ -27,10 +27,22 @@ import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.Status; +import io.grpc.health.v1.HealthCheckRequest; +import io.grpc.health.v1.HealthCheckResponse; +import io.grpc.health.v1.HealthCheckResponse.ServingStatus; +import io.grpc.health.v1.HealthGrpc.HealthImplBase; +import io.grpc.stub.StreamObserver; import java.io.IOException; import java.net.ServerSocket; -import java.util.*; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.tikv.common.key.Key; import org.tikv.common.region.TiRegion; import org.tikv.kvproto.Coprocessor; @@ -45,9 +57,11 @@ public class KVMockServer extends TikvGrpc.TikvImplBase { + private static final Logger logger = LoggerFactory.getLogger(KVMockServer.class); private int port; private Server server; private TiRegion region; + private State state = State.Normal; private final TreeMap dataMap = new TreeMap<>(); private final Map errorMap = new HashMap<>(); @@ -64,10 +78,23 @@ public class KVMockServer extends TikvGrpc.TikvImplBase { public static final int STORE_NOT_MATCH = 9; public static final int RAFT_ENTRY_TOO_LARGE = 10; + public enum State { + Normal, + Fail + } + + public void setState(State state) { + this.state = state; + } + public int getPort() { return port; } + public void setRegion(TiRegion region) { + this.region = region; + } + public void put(ByteString key, ByteString value) { dataMap.put(toRawKey(key), value); } @@ -97,7 +124,7 @@ private void verifyContext(Context context) throws Exception { if (context.getRegionId() != region.getId() || !context.getRegionEpoch().equals(region.getRegionEpoch()) || !context.getPeer().equals(region.getLeader())) { - throw new Exception(); + throw new Exception("context doesn't match"); } } @@ -106,6 +133,11 @@ public void rawGet( org.tikv.kvproto.Kvrpcpb.RawGetRequest request, io.grpc.stub.StreamObserver responseObserver) { try { + switch (state) { + case Fail: + throw new Exception(State.Fail.toString()); + default: + } verifyContext(request.getContext()); ByteString key = request.getKey(); @@ -116,7 +148,12 @@ public void rawGet( setErrorInfo(errorCode, errBuilder); builder.setRegionError(errBuilder.build()); } else { - builder.setValue(dataMap.get(toRawKey(key))); + Key rawKey = toRawKey(key); + ByteString value = dataMap.get(rawKey); + if (value == null) { + value = ByteString.EMPTY; + } + builder.setValue(value); } responseObserver.onNext(builder.build()); responseObserver.onCompleted(); @@ -139,7 +176,6 @@ public void rawPut( if (errorCode != null) { setErrorInfo(errorCode, errBuilder); builder.setRegionError(errBuilder.build()); - // builder.setError(""); } responseObserver.onNext(builder.build()); responseObserver.onCompleted(); @@ -349,14 +385,33 @@ public void coprocessor( } public int start(TiRegion region) throws IOException { + int port; try (ServerSocket s = new ServerSocket(0)) { port = s.getLocalPort(); } - server = ServerBuilder.forPort(port).addService(this).build().start(); + start(region, port); + return port; + } + + private static class HealCheck extends HealthImplBase { + @Override + public void check( + HealthCheckRequest request, StreamObserver responseObserver) { + responseObserver.onNext( + HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVING).build()); + responseObserver.onCompleted(); + } + } + + public void start(TiRegion region, int port) throws IOException { + this.port = port; this.region = region; + + logger.info("start mock server on port: " + port); + server = + ServerBuilder.forPort(port).addService(new HealCheck()).addService(this).build().start(); Runtime.getRuntime().addShutdownHook(new Thread(KVMockServer.this::stop)); - return port; } public void stop() { diff --git a/src/test/java/org/tikv/common/MockServerTest.java b/src/test/java/org/tikv/common/MockServerTest.java index de861622475..02cab4c46f4 100644 --- a/src/test/java/org/tikv/common/MockServerTest.java +++ b/src/test/java/org/tikv/common/MockServerTest.java @@ -29,14 +29,15 @@ import org.tikv.kvproto.Pdpb; public class MockServerTest extends PDMockServerTest { + public KVMockServer server; public int port; public TiRegion region; @Before @Override - public void setUp() throws IOException { - super.setUp(); + public void setup() throws IOException { + super.setup(); Metapb.Region r = Metapb.Region.newBuilder() @@ -62,9 +63,11 @@ public void setUp() throws IOException { r.getPeers(0), r.getPeersList(), s.stream().map(TiStore::new).collect(Collectors.toList())); - pdServer.addGetRegionResp(Pdpb.GetRegionResponse.newBuilder().setRegion(r).build()); + leader.addGetRegionListener( + request -> Pdpb.GetRegionResponse.newBuilder().setRegion(r).build()); for (Metapb.Store store : s) { - pdServer.addGetStoreResp(Pdpb.GetStoreResponse.newBuilder().setStore(store).build()); + leader.addGetStoreListener( + (request) -> Pdpb.GetStoreResponse.newBuilder().setStore(store).build()); } server = new KVMockServer(); port = server.start(region); diff --git a/src/test/java/org/tikv/common/MockThreeStoresTest.java b/src/test/java/org/tikv/common/MockThreeStoresTest.java new file mode 100644 index 00000000000..fa4dbe41e35 --- /dev/null +++ b/src/test/java/org/tikv/common/MockThreeStoresTest.java @@ -0,0 +1,129 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common; + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; +import java.io.IOException; +import java.net.ServerSocket; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import org.junit.After; +import org.junit.Before; +import org.tikv.common.region.TiRegion; +import org.tikv.common.region.TiStore; +import org.tikv.kvproto.Metapb; +import org.tikv.kvproto.Pdpb; + +public class MockThreeStoresTest extends PDMockServerTest { + + protected TiRegion region; + protected List servers = new ArrayList<>(); + protected List stores; + + @Before + @Override + public void setup() throws IOException { + super.setup(); + + int basePort; + try (ServerSocket s = new ServerSocket(0)) { + basePort = s.getLocalPort(); + } + + ImmutableList peers = + ImmutableList.of( + Metapb.Peer.newBuilder().setId(0x1).setStoreId(0x1).build(), + Metapb.Peer.newBuilder().setId(0x2).setStoreId(0x2).build(), + Metapb.Peer.newBuilder().setId(0x3).setStoreId(0x3).build()); + + Metapb.Region region = + Metapb.Region.newBuilder() + .setRegionEpoch(Metapb.RegionEpoch.newBuilder().setConfVer(1).setVersion(2)) + .setId(0xff) + .setStartKey(ByteString.EMPTY) + .setEndKey(ByteString.EMPTY) + .addAllPeers(peers) + .build(); + + stores = + ImmutableList.of( + Metapb.Store.newBuilder() + .setAddress("127.0.0.1:" + basePort) + .setVersion("5.0.0") + .setId(0x1) + .build(), + Metapb.Store.newBuilder() + .setAddress("127.0.0.1:" + (basePort + 1)) + .setVersion("5.0.0") + .setId(0x2) + .build(), + Metapb.Store.newBuilder() + .setAddress("127.0.0.1:" + (basePort + 2)) + .setVersion("5.0.0") + .setId(0x3) + .build()); + + for (PDMockServer server : pdServers) { + server.addGetRegionListener( + request -> + Pdpb.GetRegionResponse.newBuilder() + .setLeader(peers.get(0)) + .setRegion(region) + .build()); + server.addGetStoreListener( + (request) -> { + int i = (int) request.getStoreId() - 1; + return Pdpb.GetStoreResponse.newBuilder().setStore(stores.get(i)).build(); + }); + } + + this.region = + new TiRegion( + session.getConf(), + region, + region.getPeers(0), + region.getPeersList(), + stores.stream().map(TiStore::new).collect(Collectors.toList())); + for (int i = 0; i < 3; i++) { + KVMockServer server = new KVMockServer(); + server.start(this.region, basePort + i); + servers.add(server); + } + } + + public void put(ByteString key, ByteString value) { + for (KVMockServer server : servers) { + server.put(key, value); + } + } + + public void remove(ByteString key, ByteString value) { + for (KVMockServer server : servers) { + server.remove(key); + } + } + + @After + public void tearDown() { + for (KVMockServer server : servers) { + server.stop(); + } + } +} diff --git a/src/test/java/org/tikv/common/PDClientMockTest.java b/src/test/java/org/tikv/common/PDClientMockTest.java index 67d6bd64ad2..3ba0e374392 100644 --- a/src/test/java/org/tikv/common/PDClientMockTest.java +++ b/src/test/java/org/tikv/common/PDClientMockTest.java @@ -28,6 +28,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; import org.tikv.common.exception.GrpcException; import org.tikv.common.meta.TiTimestamp; @@ -40,13 +41,13 @@ public class PDClientMockTest extends PDMockServerTest { - private static final String LOCAL_ADDR_IPV6 = "[::]"; + private static final String LOCAL_ADDR_IPV6 = "[::1]"; public static final String HTTP = "http://"; @Test public void testCreate() throws Exception { try (PDClient client = session.getPDClient()) { - assertEquals(LOCAL_ADDR + ":" + pdServer.port, client.getPdClientWrapper().getLeaderInfo()); + assertEquals(LOCAL_ADDR + ":" + leader.port, client.getPdClientWrapper().getLeaderInfo()); assertEquals(CLUSTER_ID, client.getHeader().getClusterId()); } } @@ -54,18 +55,17 @@ public void testCreate() throws Exception { @Test public void testSwitchLeader() throws Exception { try (PDClient client = session.getPDClient()) { - client.trySwitchLeader(HTTP + LOCAL_ADDR + ":" + (pdServer.port + 1)); + client.trySwitchLeader(HTTP + LOCAL_ADDR + ":" + (leader.port + 1)); assertEquals( - client.getPdClientWrapper().getLeaderInfo(), - HTTP + LOCAL_ADDR + ":" + (pdServer.port + 1)); + client.getPdClientWrapper().getLeaderInfo(), HTTP + LOCAL_ADDR + ":" + (leader.port + 1)); } tearDown(); - setUp(LOCAL_ADDR_IPV6); + setup(LOCAL_ADDR_IPV6); try (PDClient client = session.getPDClient()) { - client.trySwitchLeader(HTTP + LOCAL_ADDR_IPV6 + ":" + (pdServer.port + 2)); + client.trySwitchLeader(HTTP + LOCAL_ADDR_IPV6 + ":" + (leader.port + 2)); assertEquals( client.getPdClientWrapper().getLeaderInfo(), - HTTP + LOCAL_ADDR_IPV6 + ":" + (pdServer.port + 2)); + HTTP + LOCAL_ADDR_IPV6 + ":" + (leader.port + 2)); } } @@ -84,16 +84,17 @@ public void testGetRegionByKey() throws Exception { byte[] endKey = new byte[] {1, 0, 2, 5}; int confVer = 1026; int ver = 1027; - pdServer.addGetRegionResp( - GrpcUtils.makeGetRegionResponse( - pdServer.getClusterId(), - GrpcUtils.makeRegion( - 1, - ByteString.copyFrom(startKey), - ByteString.copyFrom(endKey), - GrpcUtils.makeRegionEpoch(confVer, ver), - GrpcUtils.makePeer(1, 10), - GrpcUtils.makePeer(2, 20)))); + leader.addGetRegionListener( + request -> + GrpcUtils.makeGetRegionResponse( + leader.getClusterId(), + GrpcUtils.makeRegion( + 1, + ByteString.copyFrom(startKey), + ByteString.copyFrom(endKey), + GrpcUtils.makeRegionEpoch(confVer, ver), + GrpcUtils.makePeer(1, 10), + GrpcUtils.makePeer(2, 20)))); try (PDClient client = session.getPDClient()) { Pair rl = client.getRegionByKey(defaultBackOff(), ByteString.EMPTY); @@ -115,16 +116,17 @@ public void testGetRegionById() throws Exception { int confVer = 1026; int ver = 1027; - pdServer.addGetRegionByIDResp( - GrpcUtils.makeGetRegionResponse( - pdServer.getClusterId(), - GrpcUtils.makeRegion( - 1, - ByteString.copyFrom(startKey), - ByteString.copyFrom(endKey), - GrpcUtils.makeRegionEpoch(confVer, ver), - GrpcUtils.makePeer(1, 10), - GrpcUtils.makePeer(2, 20)))); + leader.addGetRegionByIDListener( + request -> + GrpcUtils.makeGetRegionResponse( + leader.getClusterId(), + GrpcUtils.makeRegion( + 1, + ByteString.copyFrom(startKey), + ByteString.copyFrom(endKey), + GrpcUtils.makeRegionEpoch(confVer, ver), + GrpcUtils.makePeer(1, 10), + GrpcUtils.makePeer(2, 20)))); try (PDClient client = session.getPDClient()) { Pair rl = client.getRegionByID(defaultBackOff(), 0); Metapb.Region r = rl.first; @@ -142,15 +144,16 @@ public void testGetRegionById() throws Exception { public void testGetStore() throws Exception { long storeId = 1; String testAddress = "testAddress"; - pdServer.addGetStoreResp( - GrpcUtils.makeGetStoreResponse( - pdServer.getClusterId(), - GrpcUtils.makeStore( - storeId, - testAddress, - Metapb.StoreState.Up, - GrpcUtils.makeStoreLabel("k1", "v1"), - GrpcUtils.makeStoreLabel("k2", "v2")))); + leader.addGetStoreListener( + request -> + GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), + GrpcUtils.makeStore( + storeId, + testAddress, + Metapb.StoreState.Up, + GrpcUtils.makeStoreLabel("k1", "v1"), + GrpcUtils.makeStoreLabel("k2", "v2")))); try (PDClient client = session.getPDClient()) { Store r = client.getStore(defaultBackOff(), storeId); assertEquals(storeId, r.getId()); @@ -161,10 +164,11 @@ public void testGetStore() throws Exception { assertEquals("v1", r.getLabels(0).getValue()); assertEquals("v2", r.getLabels(1).getValue()); - pdServer.addGetStoreResp( - GrpcUtils.makeGetStoreResponse( - pdServer.getClusterId(), - GrpcUtils.makeStore(storeId, testAddress, Metapb.StoreState.Tombstone))); + leader.addGetStoreListener( + request -> + GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), + GrpcUtils.makeStore(storeId, testAddress, Metapb.StoreState.Tombstone))); assertEquals(StoreState.Tombstone, client.getStore(defaultBackOff(), storeId).getState()); } } @@ -177,11 +181,16 @@ private BackOffer defaultBackOff() { public void testRetryPolicy() throws Exception { long storeId = 1024; ExecutorService service = Executors.newCachedThreadPool(); - pdServer.addGetStoreResp(null); - pdServer.addGetStoreResp(null); - pdServer.addGetStoreResp( - GrpcUtils.makeGetStoreResponse( - pdServer.getClusterId(), GrpcUtils.makeStore(storeId, "", Metapb.StoreState.Up))); + AtomicInteger i = new AtomicInteger(); + leader.addGetStoreListener( + request -> { + if (i.getAndIncrement() < 2) { + return null; + } else { + return GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), GrpcUtils.makeStore(storeId, "", Metapb.StoreState.Up)); + } + }); try (PDClient client = session.getPDClient()) { Callable storeCallable = () -> client.getStore(ConcreteBackOffer.newCustomBackOff(5000), 0); @@ -194,16 +203,17 @@ public void testRetryPolicy() throws Exception { } // Should fail - pdServer.addGetStoreResp(null); - pdServer.addGetStoreResp(null); - pdServer.addGetStoreResp(null); - pdServer.addGetStoreResp(null); - pdServer.addGetStoreResp(null); - pdServer.addGetStoreResp(null); - - pdServer.addGetStoreResp( - GrpcUtils.makeGetStoreResponse( - pdServer.getClusterId(), GrpcUtils.makeStore(storeId, "", Metapb.StoreState.Up))); + AtomicInteger j = new AtomicInteger(); + leader.addGetStoreListener( + request -> { + if (j.getAndIncrement() < 6) { + return null; + } else { + return GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), GrpcUtils.makeStore(storeId, "", Metapb.StoreState.Up)); + } + }); + try { client.getStore(defaultBackOff(), 0); } catch (GrpcException e) { diff --git a/src/test/java/org/tikv/common/PDMockServer.java b/src/test/java/org/tikv/common/PDMockServer.java index 78ac6304077..103d643d83f 100644 --- a/src/test/java/org/tikv/common/PDMockServer.java +++ b/src/test/java/org/tikv/common/PDMockServer.java @@ -23,29 +23,37 @@ import io.grpc.stub.StreamObserver; import java.io.IOException; import java.net.ServerSocket; -import java.util.Deque; import java.util.Optional; -import java.util.concurrent.LinkedBlockingDeque; +import java.util.function.Function; import org.tikv.kvproto.PDGrpc; -import org.tikv.kvproto.Pdpb.*; +import org.tikv.kvproto.Pdpb.GetMembersRequest; +import org.tikv.kvproto.Pdpb.GetMembersResponse; +import org.tikv.kvproto.Pdpb.GetRegionByIDRequest; +import org.tikv.kvproto.Pdpb.GetRegionRequest; +import org.tikv.kvproto.Pdpb.GetRegionResponse; +import org.tikv.kvproto.Pdpb.GetStoreRequest; +import org.tikv.kvproto.Pdpb.GetStoreResponse; +import org.tikv.kvproto.Pdpb.TsoRequest; +import org.tikv.kvproto.Pdpb.TsoResponse; public class PDMockServer extends PDGrpc.PDImplBase { public int port; private long clusterId; - private Server server; - public void addGetMemberResp(GetMembersResponse r) { - getMembersResp.addLast(Optional.ofNullable(r)); - } + private Function getMembersListener; + private Function getStoreListener; + private Function getRegionListener; + private Function getRegionByIDListener; - private final Deque> getMembersResp = - new LinkedBlockingDeque>(); + public void addGetMembersListener(Function func) { + getMembersListener = func; + } @Override public void getMembers(GetMembersRequest request, StreamObserver resp) { try { - resp.onNext(getMembersResp.getFirst().get()); + resp.onNext(Optional.ofNullable(getMembersListener.apply(request)).get()); resp.onCompleted(); } catch (Exception e) { resp.onError(Status.INTERNAL.asRuntimeException()); @@ -72,47 +80,42 @@ public void onCompleted() { }; } - public void addGetRegionResp(GetRegionResponse r) { - getRegionResp.addLast(r); + public void addGetRegionListener(Function func) { + getRegionListener = func; } - private final Deque getRegionResp = new LinkedBlockingDeque<>(); - @Override public void getRegion(GetRegionRequest request, StreamObserver resp) { try { - resp.onNext(getRegionResp.removeFirst()); + resp.onNext(getRegionListener.apply(request)); resp.onCompleted(); } catch (Exception e) { resp.onError(Status.INTERNAL.asRuntimeException()); } } - public void addGetRegionByIDResp(GetRegionResponse r) { - getRegionByIDResp.addLast(r); + public void addGetRegionByIDListener(Function func) { + getRegionByIDListener = func; } - private final Deque getRegionByIDResp = new LinkedBlockingDeque<>(); - @Override public void getRegionByID(GetRegionByIDRequest request, StreamObserver resp) { try { - resp.onNext(getRegionByIDResp.removeFirst()); + resp.onNext(getRegionByIDListener.apply(request)); resp.onCompleted(); } catch (Exception e) { resp.onError(Status.INTERNAL.asRuntimeException()); } } - public void addGetStoreResp(GetStoreResponse r) { - getStoreResp.addLast(Optional.ofNullable(r)); + public void addGetStoreListener(Function func) { + getStoreListener = func; } - private final Deque> getStoreResp = new LinkedBlockingDeque<>(); - + @Override public void getStore(GetStoreRequest request, StreamObserver resp) { try { - resp.onNext(getStoreResp.removeFirst().get()); + resp.onNext(Optional.ofNullable(getStoreListener.apply(request)).get()); resp.onCompleted(); } catch (Exception e) { resp.onError(Status.INTERNAL.asRuntimeException()); @@ -120,10 +123,16 @@ public void getStore(GetStoreRequest request, StreamObserver r } public void start(long clusterId) throws IOException { + int port; try (ServerSocket s = new ServerSocket(0)) { port = s.getLocalPort(); } + start(clusterId, port); + } + + public void start(long clusterId, int port) throws IOException { this.clusterId = clusterId; + this.port = port; server = ServerBuilder.forPort(port).addService(this).build().start(); Runtime.getRuntime().addShutdownHook(new Thread(PDMockServer.this::stop)); diff --git a/src/test/java/org/tikv/common/PDMockServerTest.java b/src/test/java/org/tikv/common/PDMockServerTest.java index cd07935fdd9..dab114b6c05 100644 --- a/src/test/java/org/tikv/common/PDMockServerTest.java +++ b/src/test/java/org/tikv/common/PDMockServerTest.java @@ -18,40 +18,59 @@ package org.tikv.common; import java.io.IOException; +import java.net.ServerSocket; +import java.util.ArrayList; +import java.util.List; import org.junit.After; import org.junit.Before; public abstract class PDMockServerTest { protected static final String LOCAL_ADDR = "127.0.0.1"; static final long CLUSTER_ID = 1024; - protected static TiSession session; - protected PDMockServer pdServer; + protected TiSession session; + protected PDMockServer leader; + protected List pdServers = new ArrayList<>(); @Before - public void setUp() throws IOException { - setUp(LOCAL_ADDR); + public void setup() throws IOException { + setup(LOCAL_ADDR); } - void setUp(String addr) throws IOException { - pdServer = new PDMockServer(); - pdServer.start(CLUSTER_ID); - pdServer.addGetMemberResp( - GrpcUtils.makeGetMembersResponse( - pdServer.getClusterId(), - GrpcUtils.makeMember(1, "http://" + addr + ":" + pdServer.port), - GrpcUtils.makeMember(2, "http://" + addr + ":" + (pdServer.port + 1)), - GrpcUtils.makeMember(3, "http://" + addr + ":" + (pdServer.port + 2)))); - TiConfiguration conf = TiConfiguration.createDefault(addr + ":" + pdServer.port); - conf.setEnableGrpcForward(false); + void setup(String addr) throws IOException { + int basePort; + try (ServerSocket s = new ServerSocket(0)) { + basePort = s.getLocalPort(); + } + + for (int i = 0; i < 3; i++) { + PDMockServer server = new PDMockServer(); + server.start(CLUSTER_ID, basePort + i); + server.addGetMembersListener( + (request) -> + GrpcUtils.makeGetMembersResponse( + server.getClusterId(), + GrpcUtils.makeMember(1, "http://" + addr + ":" + basePort), + GrpcUtils.makeMember(2, "http://" + addr + ":" + (basePort + 1)), + GrpcUtils.makeMember(3, "http://" + addr + ":" + (basePort + 2)))); + pdServers.add(server); + if (i == 0) { + leader = server; + } + } + + TiConfiguration conf = TiConfiguration.createDefault(addr + ":" + leader.port); conf.setKvMode("RAW"); - conf.setTest(true); + conf.setWarmUpEnable(false); conf.setTimeout(2000); + conf.setEnableGrpcForward(true); session = TiSession.create(conf); } @After public void tearDown() throws Exception { session.close(); - pdServer.stop(); + for (PDMockServer server : pdServers) { + server.stop(); + } } } diff --git a/src/test/java/org/tikv/common/RegionManagerTest.java b/src/test/java/org/tikv/common/RegionManagerTest.java index 58687ef1059..6052640f9bc 100644 --- a/src/test/java/org/tikv/common/RegionManagerTest.java +++ b/src/test/java/org/tikv/common/RegionManagerTest.java @@ -24,6 +24,7 @@ import com.google.common.collect.TreeRangeMap; import com.google.protobuf.ByteString; import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Before; import org.junit.Test; import org.tikv.common.key.Key; @@ -36,12 +37,13 @@ import org.tikv.kvproto.Metapb.StoreState; public class RegionManagerTest extends PDMockServerTest { + private RegionManager mgr; @Before @Override - public void setUp() throws IOException { - super.setUp(); + public void setup() throws IOException { + super.setup(); mgr = session.getRegionManager(); } @@ -64,27 +66,30 @@ public void getRegionByKey() { int ver = 1027; long regionId = 233; String testAddress = "127.0.0.1"; - pdServer.addGetRegionResp( - GrpcUtils.makeGetRegionResponse( - pdServer.getClusterId(), - GrpcUtils.makeRegion( - regionId, - GrpcUtils.encodeKey(startKey.toByteArray()), - GrpcUtils.encodeKey(endKey.toByteArray()), - GrpcUtils.makeRegionEpoch(confVer, ver), - GrpcUtils.makePeer(1, 10), - GrpcUtils.makePeer(2, 20)))); - for (long id : new long[] {10, 20}) { - pdServer.addGetStoreResp( - GrpcUtils.makeGetStoreResponse( - pdServer.getClusterId(), - GrpcUtils.makeStore( - id, - testAddress, - Metapb.StoreState.Up, - GrpcUtils.makeStoreLabel("k1", "v1"), - GrpcUtils.makeStoreLabel("k2", "v2")))); - } + leader.addGetRegionListener( + request -> + GrpcUtils.makeGetRegionResponse( + leader.getClusterId(), + GrpcUtils.makeRegion( + regionId, + GrpcUtils.encodeKey(startKey.toByteArray()), + GrpcUtils.encodeKey(endKey.toByteArray()), + GrpcUtils.makeRegionEpoch(confVer, ver), + GrpcUtils.makePeer(1, 10), + GrpcUtils.makePeer(2, 20)))); + + AtomicInteger i = new AtomicInteger(0); + long[] ids = new long[] {10, 20}; + leader.addGetStoreListener( + (request -> + GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), + GrpcUtils.makeStore( + ids[i.getAndIncrement()], + testAddress, + StoreState.Up, + GrpcUtils.makeStoreLabel("k1", "v1"), + GrpcUtils.makeStoreLabel("k2", "v2"))))); TiRegion region = mgr.getRegionByKey(startKey); assertEquals(region.getId(), regionId); @@ -103,27 +108,30 @@ public void getStoreByKey() { int confVer = 1026; int ver = 1027; long regionId = 233; - pdServer.addGetRegionResp( - GrpcUtils.makeGetRegionResponse( - pdServer.getClusterId(), - GrpcUtils.makeRegion( - regionId, - GrpcUtils.encodeKey(startKey.toByteArray()), - GrpcUtils.encodeKey(endKey.toByteArray()), - GrpcUtils.makeRegionEpoch(confVer, ver), - GrpcUtils.makePeer(storeId, 10), - GrpcUtils.makePeer(storeId + 1, 20)))); - for (long id : new long[] {10, 20}) { - pdServer.addGetStoreResp( - GrpcUtils.makeGetStoreResponse( - pdServer.getClusterId(), - GrpcUtils.makeStore( - id, - testAddress, - Metapb.StoreState.Up, - GrpcUtils.makeStoreLabel("k1", "v1"), - GrpcUtils.makeStoreLabel("k2", "v2")))); - } + leader.addGetRegionListener( + request -> + GrpcUtils.makeGetRegionResponse( + leader.getClusterId(), + GrpcUtils.makeRegion( + regionId, + GrpcUtils.encodeKey(startKey.toByteArray()), + GrpcUtils.encodeKey(endKey.toByteArray()), + GrpcUtils.makeRegionEpoch(confVer, ver), + GrpcUtils.makePeer(storeId, 10), + GrpcUtils.makePeer(storeId + 1, 20)))); + + AtomicInteger i = new AtomicInteger(0); + long[] ids = new long[] {10, 20}; + leader.addGetStoreListener( + (request -> + GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), + GrpcUtils.makeStore( + ids[i.getAndIncrement()], + testAddress, + StoreState.Up, + GrpcUtils.makeStoreLabel("k1", "v1"), + GrpcUtils.makeStoreLabel("k2", "v2"))))); Pair pair = mgr.getRegionStorePairByKey(searchKey); assertEquals(pair.first.getId(), regionId); @@ -134,27 +142,29 @@ public void getStoreByKey() { public void getStoreById() { long storeId = 234; String testAddress = "testAddress"; - pdServer.addGetStoreResp( - GrpcUtils.makeGetStoreResponse( - pdServer.getClusterId(), - GrpcUtils.makeStore( - storeId, - testAddress, - Metapb.StoreState.Up, - GrpcUtils.makeStoreLabel("k1", "v1"), - GrpcUtils.makeStoreLabel("k2", "v2")))); + leader.addGetStoreListener( + request -> + GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), + GrpcUtils.makeStore( + storeId, + testAddress, + Metapb.StoreState.Up, + GrpcUtils.makeStoreLabel("k1", "v1"), + GrpcUtils.makeStoreLabel("k2", "v2")))); TiStore store = mgr.getStoreById(storeId); assertEquals(store.getStore().getId(), storeId); - pdServer.addGetStoreResp( - GrpcUtils.makeGetStoreResponse( - pdServer.getClusterId(), - GrpcUtils.makeStore( - storeId + 1, - testAddress, - StoreState.Tombstone, - GrpcUtils.makeStoreLabel("k1", "v1"), - GrpcUtils.makeStoreLabel("k2", "v2")))); + leader.addGetStoreListener( + request -> + GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), + GrpcUtils.makeStore( + storeId + 1, + testAddress, + StoreState.Tombstone, + GrpcUtils.makeStoreLabel("k1", "v1"), + GrpcUtils.makeStoreLabel("k2", "v2")))); try { mgr.getStoreById(storeId + 1); diff --git a/src/test/java/org/tikv/common/SeekLeaderStoreTest.java b/src/test/java/org/tikv/common/SeekLeaderStoreTest.java new file mode 100644 index 00000000000..201f10986f5 --- /dev/null +++ b/src/test/java/org/tikv/common/SeekLeaderStoreTest.java @@ -0,0 +1,78 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common; + +import com.google.protobuf.ByteString; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Assert; +import org.junit.Test; +import org.tikv.common.KVMockServer.State; +import org.tikv.kvproto.Metapb; +import org.tikv.kvproto.Metapb.StoreState; +import org.tikv.kvproto.Pdpb; +import org.tikv.raw.RawKVClient; + +public class SeekLeaderStoreTest extends MockThreeStoresTest { + + private RawKVClient createClient() { + return session.createRawClient(); + } + + @Test + public void testSeekLeader() { + RawKVClient client = createClient(); + ByteString key = ByteString.copyFromUtf8("key"); + ByteString value = ByteString.copyFromUtf8("value"); + + put(key, value); + + Assert.assertEquals(value, client.get(key).get()); + servers.get(0).setState(State.Fail); + servers.get(1).setRegion(region.switchPeer(stores.get(1).getId())); + Assert.assertEquals(value, client.get(key).get()); + + remove(key, value); + } + + @Test + public void testSeekLeaderMeetInvalidStore() { + RawKVClient client = createClient(); + ByteString key = ByteString.copyFromUtf8("key"); + ByteString value = ByteString.copyFromUtf8("value"); + + put(key, value); + + servers.get(0).setState(State.Fail); + servers.get(2).setRegion(region.switchPeer(stores.get(2).getId())); + + AtomicInteger i = new AtomicInteger(0); + leader.addGetStoreListener( + request -> { + Metapb.Store.Builder storeBuilder = + Metapb.Store.newBuilder().mergeFrom(stores.get((int) request.getStoreId() - 1)); + if (request.getStoreId() == 0x2 && i.incrementAndGet() > 0) { + storeBuilder.setState(StoreState.Tombstone); + } + return Pdpb.GetStoreResponse.newBuilder().setStore(storeBuilder.build()).build(); + }); + + Assert.assertEquals(value, client.get(key).get()); + + remove(key, value); + } +}