From 035befc16a548a645ebb1eec889fb1a6e867165d Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Mon, 14 Feb 2022 18:07:35 +0800 Subject: [PATCH 01/12] wip: introduce MockThreeStoresTest to test seek leader store Signed-off-by: iosmanthus --- src/main/java/org/tikv/common/TiSession.java | 22 ++++- .../java/org/tikv/common/KVMockServer.java | 44 +++++++-- .../java/org/tikv/common/MockServerTest.java | 8 +- .../org/tikv/common/MockThreeStoresTest.java | 98 +++++++++++++++++++ .../org/tikv/common/PDClientMockTest.java | 54 +++++----- .../java/org/tikv/common/PDMockServer.java | 27 ++++- .../org/tikv/common/PDMockServerTest.java | 49 +++++++--- .../org/tikv/common/RegionManagerTest.java | 28 +++--- .../org/tikv/common/SeekLeaderStoreTest.java | 29 ++++++ 9 files changed, 286 insertions(+), 73 deletions(-) create mode 100644 src/test/java/org/tikv/common/MockThreeStoresTest.java create mode 100644 src/test/java/org/tikv/common/SeekLeaderStoreTest.java diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index a8d6fe25c78..06c43150eeb 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -22,7 +22,12 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.protobuf.ByteString; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.stream.Collectors; @@ -34,8 +39,15 @@ import org.tikv.common.importer.SwitchTiKVModeClient; import org.tikv.common.key.Key; import org.tikv.common.meta.TiTimestamp; -import org.tikv.common.region.*; -import org.tikv.common.util.*; +import org.tikv.common.region.RegionManager; +import org.tikv.common.region.RegionStoreClient; +import org.tikv.common.region.TiRegion; +import org.tikv.common.region.TiStore; +import org.tikv.common.util.BackOffFunction; +import org.tikv.common.util.BackOffer; +import org.tikv.common.util.ChannelFactory; +import org.tikv.common.util.ConcreteBackOffer; +import org.tikv.common.util.Pair; import org.tikv.kvproto.Errorpb; import org.tikv.kvproto.ImportSstpb; import org.tikv.kvproto.Metapb; @@ -164,6 +176,10 @@ private static VersionInfo getVersionInfo() { } private synchronized void warmUp() { + if (conf.isTest()) { + logger.info("skip warm up in test mode"); + return; + } long warmUpStartTime = System.nanoTime(); BackOffer backOffer = ConcreteBackOffer.newRawKVBackOff(); try { diff --git a/src/test/java/org/tikv/common/KVMockServer.java b/src/test/java/org/tikv/common/KVMockServer.java index 8032c9e613f..8b012ea24ca 100644 --- a/src/test/java/org/tikv/common/KVMockServer.java +++ b/src/test/java/org/tikv/common/KVMockServer.java @@ -29,7 +29,12 @@ import io.grpc.Status; import java.io.IOException; import java.net.ServerSocket; -import java.util.*; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.stream.Collectors; import org.tikv.common.key.Key; import org.tikv.common.region.TiRegion; @@ -48,6 +53,7 @@ public class KVMockServer extends TikvGrpc.TikvImplBase { private int port; private Server server; private TiRegion region; + private State state = State.Normal; private final TreeMap dataMap = new TreeMap<>(); private final Map errorMap = new HashMap<>(); @@ -64,6 +70,19 @@ public class KVMockServer extends TikvGrpc.TikvImplBase { public static final int STORE_NOT_MATCH = 9; public static final int RAFT_ENTRY_TOO_LARGE = 10; + public enum State { + Normal, + Fail + } + + public void setState(State state) { + this.state = state; + } + + public State getState() { + return state; + } + public int getPort() { return port; } @@ -106,6 +125,11 @@ public void rawGet( org.tikv.kvproto.Kvrpcpb.RawGetRequest request, io.grpc.stub.StreamObserver responseObserver) { try { + switch (state) { + case Fail: + throw new Exception(State.Fail.toString()); + default: + } verifyContext(request.getContext()); ByteString key = request.getKey(); @@ -125,7 +149,9 @@ public void rawGet( } } - /** */ + /** + * + */ public void rawPut( org.tikv.kvproto.Kvrpcpb.RawPutRequest request, io.grpc.stub.StreamObserver responseObserver) { @@ -139,7 +165,6 @@ public void rawPut( if (errorCode != null) { setErrorInfo(errorCode, errBuilder); builder.setRegionError(errBuilder.build()); - // builder.setError(""); } responseObserver.onNext(builder.build()); responseObserver.onCompleted(); @@ -168,7 +193,9 @@ private void setErrorInfo(int errorCode, Errorpb.Error.Builder errBuilder) { } } - /** */ + /** + * + */ public void rawDelete( org.tikv.kvproto.Kvrpcpb.RawDeleteRequest request, io.grpc.stub.StreamObserver responseObserver) { @@ -349,14 +376,19 @@ public void coprocessor( } public int start(TiRegion region) throws IOException { + int port; try (ServerSocket s = new ServerSocket(0)) { port = s.getLocalPort(); } - server = ServerBuilder.forPort(port).addService(this).build().start(); + start(region, port); + return port; + } + public void start(TiRegion region, int port) throws IOException { + this.port = port; + server = ServerBuilder.forPort(port).addService(this).build().start(); this.region = region; Runtime.getRuntime().addShutdownHook(new Thread(KVMockServer.this::stop)); - return port; } public void stop() { diff --git a/src/test/java/org/tikv/common/MockServerTest.java b/src/test/java/org/tikv/common/MockServerTest.java index de861622475..50b9e19b3a1 100644 --- a/src/test/java/org/tikv/common/MockServerTest.java +++ b/src/test/java/org/tikv/common/MockServerTest.java @@ -35,8 +35,8 @@ public class MockServerTest extends PDMockServerTest { @Before @Override - public void setUp() throws IOException { - super.setUp(); + public void setup() throws IOException { + super.setup(); Metapb.Region r = Metapb.Region.newBuilder() @@ -62,9 +62,9 @@ public void setUp() throws IOException { r.getPeers(0), r.getPeersList(), s.stream().map(TiStore::new).collect(Collectors.toList())); - pdServer.addGetRegionResp(Pdpb.GetRegionResponse.newBuilder().setRegion(r).build()); + leader.addGetRegionResp(Pdpb.GetRegionResponse.newBuilder().setRegion(r).build()); for (Metapb.Store store : s) { - pdServer.addGetStoreResp(Pdpb.GetStoreResponse.newBuilder().setStore(store).build()); + leader.addGetStoreResp(Pdpb.GetStoreResponse.newBuilder().setStore(store).build()); } server = new KVMockServer(); port = server.start(region); diff --git a/src/test/java/org/tikv/common/MockThreeStoresTest.java b/src/test/java/org/tikv/common/MockThreeStoresTest.java new file mode 100644 index 00000000000..b5b52924226 --- /dev/null +++ b/src/test/java/org/tikv/common/MockThreeStoresTest.java @@ -0,0 +1,98 @@ +package org.tikv.common; + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import org.junit.After; +import org.junit.Before; +import org.tikv.common.region.TiRegion; +import org.tikv.common.region.TiStore; +import org.tikv.kvproto.Metapb; +import org.tikv.kvproto.Pdpb; + +public class MockThreeStoresTest extends PDMockServerTest { + + public TiRegion region; + public ArrayList servers = new ArrayList<>(); + + @Before + @Override + public void setup() throws IOException { + super.setup(); + + int basePort = 0xabcd; + + Metapb.Region region = + Metapb.Region.newBuilder() + .setRegionEpoch(Metapb.RegionEpoch.newBuilder().setConfVer(1).setVersion(2)) + .setId(0xff) + .setStartKey(ByteString.EMPTY) + .setEndKey(ByteString.EMPTY) + .addPeers(Metapb.Peer.newBuilder().setId(0x1).setStoreId(0x1)) + .addPeers(Metapb.Peer.newBuilder().setId(0x2).setStoreId(0x2)) + .addPeers(Metapb.Peer.newBuilder().setId(0x3).setStoreId(0x3)) + .build(); + + List stores = + ImmutableList.of( + Metapb.Store.newBuilder() + .setAddress("127.0.0.1:" + basePort) + .setVersion("5.0.0") + .setId(0x1) + .build(), + Metapb.Store.newBuilder() + .setAddress("127.0.0.1:" + (basePort + 1)) + .setVersion("5.0.0") + .setVersion("5.0.0") + .setId(0x2) + .build(), + Metapb.Store.newBuilder() + .setAddress("127.0.0.1:" + (basePort + 2)) + .setVersion("5.0.0") + .setId(0x3) + .build() + ); + + for (PDMockServer server : pdServers) { + server.addGetRegionResp(Pdpb.GetRegionResponse.newBuilder().setRegion(region).build()); + for (Metapb.Store store : stores) { + server.addGetStoreResp(Pdpb.GetStoreResponse.newBuilder().setStore(store).build()); + } + } + + this.region = + new TiRegion( + session.getConf(), + region, + region.getPeers(0), + region.getPeersList(), + stores.stream().map(TiStore::new).collect(Collectors.toList())); + for (int i = 0; i < 3; i++) { + KVMockServer server = new KVMockServer(); + server.start(this.region, basePort + i); + servers.add(server); + } + } + + public void put(ByteString key, ByteString value) { + for (KVMockServer server : servers) { + server.put(key, value); + } + } + + public void remove(ByteString key, ByteString value) { + for (KVMockServer server : servers) { + server.remove(key); + } + } + + @After + public void tearDown() { + for (KVMockServer server : servers) { + server.stop(); + } + } +} diff --git a/src/test/java/org/tikv/common/PDClientMockTest.java b/src/test/java/org/tikv/common/PDClientMockTest.java index 67d6bd64ad2..1bc68212e47 100644 --- a/src/test/java/org/tikv/common/PDClientMockTest.java +++ b/src/test/java/org/tikv/common/PDClientMockTest.java @@ -46,7 +46,7 @@ public class PDClientMockTest extends PDMockServerTest { @Test public void testCreate() throws Exception { try (PDClient client = session.getPDClient()) { - assertEquals(LOCAL_ADDR + ":" + pdServer.port, client.getPdClientWrapper().getLeaderInfo()); + assertEquals(LOCAL_ADDR + ":" + leader.port, client.getPdClientWrapper().getLeaderInfo()); assertEquals(CLUSTER_ID, client.getHeader().getClusterId()); } } @@ -54,18 +54,18 @@ public void testCreate() throws Exception { @Test public void testSwitchLeader() throws Exception { try (PDClient client = session.getPDClient()) { - client.trySwitchLeader(HTTP + LOCAL_ADDR + ":" + (pdServer.port + 1)); + client.trySwitchLeader(HTTP + LOCAL_ADDR + ":" + (leader.port + 1)); assertEquals( client.getPdClientWrapper().getLeaderInfo(), - HTTP + LOCAL_ADDR + ":" + (pdServer.port + 1)); + HTTP + LOCAL_ADDR + ":" + (leader.port + 1)); } tearDown(); - setUp(LOCAL_ADDR_IPV6); + setup(LOCAL_ADDR_IPV6); try (PDClient client = session.getPDClient()) { - client.trySwitchLeader(HTTP + LOCAL_ADDR_IPV6 + ":" + (pdServer.port + 2)); + client.trySwitchLeader(HTTP + LOCAL_ADDR_IPV6 + ":" + (leader.port + 2)); assertEquals( client.getPdClientWrapper().getLeaderInfo(), - HTTP + LOCAL_ADDR_IPV6 + ":" + (pdServer.port + 2)); + HTTP + LOCAL_ADDR_IPV6 + ":" + (leader.port + 2)); } } @@ -84,9 +84,9 @@ public void testGetRegionByKey() throws Exception { byte[] endKey = new byte[] {1, 0, 2, 5}; int confVer = 1026; int ver = 1027; - pdServer.addGetRegionResp( + leader.addGetRegionResp( GrpcUtils.makeGetRegionResponse( - pdServer.getClusterId(), + leader.getClusterId(), GrpcUtils.makeRegion( 1, ByteString.copyFrom(startKey), @@ -115,9 +115,9 @@ public void testGetRegionById() throws Exception { int confVer = 1026; int ver = 1027; - pdServer.addGetRegionByIDResp( + leader.addGetRegionByIDResp( GrpcUtils.makeGetRegionResponse( - pdServer.getClusterId(), + leader.getClusterId(), GrpcUtils.makeRegion( 1, ByteString.copyFrom(startKey), @@ -142,9 +142,9 @@ public void testGetRegionById() throws Exception { public void testGetStore() throws Exception { long storeId = 1; String testAddress = "testAddress"; - pdServer.addGetStoreResp( + leader.addGetStoreResp( GrpcUtils.makeGetStoreResponse( - pdServer.getClusterId(), + leader.getClusterId(), GrpcUtils.makeStore( storeId, testAddress, @@ -161,9 +161,9 @@ public void testGetStore() throws Exception { assertEquals("v1", r.getLabels(0).getValue()); assertEquals("v2", r.getLabels(1).getValue()); - pdServer.addGetStoreResp( + leader.addGetStoreResp( GrpcUtils.makeGetStoreResponse( - pdServer.getClusterId(), + leader.getClusterId(), GrpcUtils.makeStore(storeId, testAddress, Metapb.StoreState.Tombstone))); assertEquals(StoreState.Tombstone, client.getStore(defaultBackOff(), storeId).getState()); } @@ -177,11 +177,11 @@ private BackOffer defaultBackOff() { public void testRetryPolicy() throws Exception { long storeId = 1024; ExecutorService service = Executors.newCachedThreadPool(); - pdServer.addGetStoreResp(null); - pdServer.addGetStoreResp(null); - pdServer.addGetStoreResp( + leader.addGetStoreResp(null); + leader.addGetStoreResp(null); + leader.addGetStoreResp( GrpcUtils.makeGetStoreResponse( - pdServer.getClusterId(), GrpcUtils.makeStore(storeId, "", Metapb.StoreState.Up))); + leader.getClusterId(), GrpcUtils.makeStore(storeId, "", Metapb.StoreState.Up))); try (PDClient client = session.getPDClient()) { Callable storeCallable = () -> client.getStore(ConcreteBackOffer.newCustomBackOff(5000), 0); @@ -194,16 +194,16 @@ public void testRetryPolicy() throws Exception { } // Should fail - pdServer.addGetStoreResp(null); - pdServer.addGetStoreResp(null); - pdServer.addGetStoreResp(null); - pdServer.addGetStoreResp(null); - pdServer.addGetStoreResp(null); - pdServer.addGetStoreResp(null); - - pdServer.addGetStoreResp( + leader.addGetStoreResp(null); + leader.addGetStoreResp(null); + leader.addGetStoreResp(null); + leader.addGetStoreResp(null); + leader.addGetStoreResp(null); + leader.addGetStoreResp(null); + + leader.addGetStoreResp( GrpcUtils.makeGetStoreResponse( - pdServer.getClusterId(), GrpcUtils.makeStore(storeId, "", Metapb.StoreState.Up))); + leader.getClusterId(), GrpcUtils.makeStore(storeId, "", Metapb.StoreState.Up))); try { client.getStore(defaultBackOff(), 0); } catch (GrpcException e) { diff --git a/src/test/java/org/tikv/common/PDMockServer.java b/src/test/java/org/tikv/common/PDMockServer.java index 78ac6304077..917b1c7fdd6 100644 --- a/src/test/java/org/tikv/common/PDMockServer.java +++ b/src/test/java/org/tikv/common/PDMockServer.java @@ -26,10 +26,22 @@ import java.util.Deque; import java.util.Optional; import java.util.concurrent.LinkedBlockingDeque; +import java.util.stream.Stream; import org.tikv.kvproto.PDGrpc; -import org.tikv.kvproto.Pdpb.*; +import org.tikv.kvproto.Pdpb.GetAllStoresRequest; +import org.tikv.kvproto.Pdpb.GetAllStoresResponse; +import org.tikv.kvproto.Pdpb.GetMembersRequest; +import org.tikv.kvproto.Pdpb.GetMembersResponse; +import org.tikv.kvproto.Pdpb.GetRegionByIDRequest; +import org.tikv.kvproto.Pdpb.GetRegionRequest; +import org.tikv.kvproto.Pdpb.GetRegionResponse; +import org.tikv.kvproto.Pdpb.GetStoreRequest; +import org.tikv.kvproto.Pdpb.GetStoreResponse; +import org.tikv.kvproto.Pdpb.TsoRequest; +import org.tikv.kvproto.Pdpb.TsoResponse; public class PDMockServer extends PDGrpc.PDImplBase { + public int port; private long clusterId; @@ -59,10 +71,12 @@ public StreamObserver tso(StreamObserver resp) { private int logical = 0; @Override - public void onNext(TsoRequest value) {} + public void onNext(TsoRequest value) { + } @Override - public void onError(Throwable t) {} + public void onError(Throwable t) { + } @Override public void onCompleted() { @@ -110,6 +124,7 @@ public void addGetStoreResp(GetStoreResponse r) { private final Deque> getStoreResp = new LinkedBlockingDeque<>(); + @Override public void getStore(GetStoreRequest request, StreamObserver resp) { try { resp.onNext(getStoreResp.removeFirst().get()); @@ -120,10 +135,16 @@ public void getStore(GetStoreRequest request, StreamObserver r } public void start(long clusterId) throws IOException { + int port; try (ServerSocket s = new ServerSocket(0)) { port = s.getLocalPort(); } + start(clusterId, port); + } + + public void start(long clusterId, int port) throws IOException { this.clusterId = clusterId; + this.port = port; server = ServerBuilder.forPort(port).addService(this).build().start(); Runtime.getRuntime().addShutdownHook(new Thread(PDMockServer.this::stop)); diff --git a/src/test/java/org/tikv/common/PDMockServerTest.java b/src/test/java/org/tikv/common/PDMockServerTest.java index cd07935fdd9..ba097d8a4a3 100644 --- a/src/test/java/org/tikv/common/PDMockServerTest.java +++ b/src/test/java/org/tikv/common/PDMockServerTest.java @@ -18,40 +18,57 @@ package org.tikv.common; import java.io.IOException; +import java.net.ServerSocket; +import java.util.ArrayList; +import java.util.List; import org.junit.After; import org.junit.Before; public abstract class PDMockServerTest { + protected static final String LOCAL_ADDR = "127.0.0.1"; static final long CLUSTER_ID = 1024; - protected static TiSession session; - protected PDMockServer pdServer; + protected TiSession session; + protected PDMockServer leader; + protected List pdServers = new ArrayList<>(); @Before - public void setUp() throws IOException { - setUp(LOCAL_ADDR); + public void setup() throws IOException { + setup(LOCAL_ADDR); } - void setUp(String addr) throws IOException { - pdServer = new PDMockServer(); - pdServer.start(CLUSTER_ID); - pdServer.addGetMemberResp( - GrpcUtils.makeGetMembersResponse( - pdServer.getClusterId(), - GrpcUtils.makeMember(1, "http://" + addr + ":" + pdServer.port), - GrpcUtils.makeMember(2, "http://" + addr + ":" + (pdServer.port + 1)), - GrpcUtils.makeMember(3, "http://" + addr + ":" + (pdServer.port + 2)))); - TiConfiguration conf = TiConfiguration.createDefault(addr + ":" + pdServer.port); + void setup(String addr) throws IOException { + int basePort; + try (ServerSocket s = new ServerSocket(0)) { + basePort = s.getLocalPort(); + } + + for (int i = 0; i < 3; i++) { + PDMockServer server = new PDMockServer(); + server.start(CLUSTER_ID, basePort + i); + server.addGetMemberResp( + GrpcUtils.makeGetMembersResponse( + server.getClusterId(), + GrpcUtils.makeMember(1, "http://" + addr + ":" + basePort), + GrpcUtils.makeMember(2, "http://" + addr + ":" + (basePort + 1)), + GrpcUtils.makeMember(3, "http://" + addr + ":" + (basePort + 2)))); + pdServers.add(server); + } + + leader = pdServers.get(0); + TiConfiguration conf = TiConfiguration.createDefault(addr + ":" + leader.port); conf.setEnableGrpcForward(false); conf.setKvMode("RAW"); conf.setTest(true); - conf.setTimeout(2000); + conf.setTimeout(20000000); session = TiSession.create(conf); } @After public void tearDown() throws Exception { session.close(); - pdServer.stop(); + for (PDMockServer server : pdServers) { + server.stop(); + } } } diff --git a/src/test/java/org/tikv/common/RegionManagerTest.java b/src/test/java/org/tikv/common/RegionManagerTest.java index 58687ef1059..0729abf43a7 100644 --- a/src/test/java/org/tikv/common/RegionManagerTest.java +++ b/src/test/java/org/tikv/common/RegionManagerTest.java @@ -40,8 +40,8 @@ public class RegionManagerTest extends PDMockServerTest { @Before @Override - public void setUp() throws IOException { - super.setUp(); + public void setup() throws IOException { + super.setup(); mgr = session.getRegionManager(); } @@ -64,9 +64,9 @@ public void getRegionByKey() { int ver = 1027; long regionId = 233; String testAddress = "127.0.0.1"; - pdServer.addGetRegionResp( + leader.addGetRegionResp( GrpcUtils.makeGetRegionResponse( - pdServer.getClusterId(), + leader.getClusterId(), GrpcUtils.makeRegion( regionId, GrpcUtils.encodeKey(startKey.toByteArray()), @@ -75,9 +75,9 @@ public void getRegionByKey() { GrpcUtils.makePeer(1, 10), GrpcUtils.makePeer(2, 20)))); for (long id : new long[] {10, 20}) { - pdServer.addGetStoreResp( + leader.addGetStoreResp( GrpcUtils.makeGetStoreResponse( - pdServer.getClusterId(), + leader.getClusterId(), GrpcUtils.makeStore( id, testAddress, @@ -103,9 +103,9 @@ public void getStoreByKey() { int confVer = 1026; int ver = 1027; long regionId = 233; - pdServer.addGetRegionResp( + leader.addGetRegionResp( GrpcUtils.makeGetRegionResponse( - pdServer.getClusterId(), + leader.getClusterId(), GrpcUtils.makeRegion( regionId, GrpcUtils.encodeKey(startKey.toByteArray()), @@ -114,9 +114,9 @@ public void getStoreByKey() { GrpcUtils.makePeer(storeId, 10), GrpcUtils.makePeer(storeId + 1, 20)))); for (long id : new long[] {10, 20}) { - pdServer.addGetStoreResp( + leader.addGetStoreResp( GrpcUtils.makeGetStoreResponse( - pdServer.getClusterId(), + leader.getClusterId(), GrpcUtils.makeStore( id, testAddress, @@ -134,9 +134,9 @@ public void getStoreByKey() { public void getStoreById() { long storeId = 234; String testAddress = "testAddress"; - pdServer.addGetStoreResp( + leader.addGetStoreResp( GrpcUtils.makeGetStoreResponse( - pdServer.getClusterId(), + leader.getClusterId(), GrpcUtils.makeStore( storeId, testAddress, @@ -146,9 +146,9 @@ public void getStoreById() { TiStore store = mgr.getStoreById(storeId); assertEquals(store.getStore().getId(), storeId); - pdServer.addGetStoreResp( + leader.addGetStoreResp( GrpcUtils.makeGetStoreResponse( - pdServer.getClusterId(), + leader.getClusterId(), GrpcUtils.makeStore( storeId + 1, testAddress, diff --git a/src/test/java/org/tikv/common/SeekLeaderStoreTest.java b/src/test/java/org/tikv/common/SeekLeaderStoreTest.java new file mode 100644 index 00000000000..d29fbc1d41e --- /dev/null +++ b/src/test/java/org/tikv/common/SeekLeaderStoreTest.java @@ -0,0 +1,29 @@ +package org.tikv.common; + +import com.google.protobuf.ByteString; +import org.junit.Assert; +import org.junit.Test; +import org.tikv.common.KVMockServer.State; +import org.tikv.raw.RawKVClient; + +public class SeekLeaderStoreTest extends MockThreeStoresTest { + + private RawKVClient createClient() { + RawKVClient client = session.createRawClient(); + return client; + } + + @Test + public void testSeekLeader() { + RawKVClient client = createClient(); + ByteString key = ByteString.copyFromUtf8("key"); + ByteString value = ByteString.copyFromUtf8("value"); + put(key, value); + + client.put(key, value); + Assert.assertEquals(value, client.get(key).get()); + servers.get(0).setState(State.Fail); + // Assert.assertEquals(State.Normal, servers.get(1).getState()); + Assert.assertEquals(value, client.get(key).get()); + } +} From a8a4fa23fafdcb576dc500c88c569926e181acb0 Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Tue, 15 Feb 2022 22:47:58 +0800 Subject: [PATCH 02/12] finish first stage test for seekLeaderStore Signed-off-by: iosmanthus --- .../region/AbstractRegionStoreClient.java | 10 ++- .../java/org/tikv/common/region/TiRegion.java | 26 ++++---- .../java/org/tikv/common/KVMockServer.java | 40 ++++++++++- .../java/org/tikv/common/MockServerTest.java | 7 +- .../org/tikv/common/MockThreeStoresTest.java | 34 ++++++---- .../org/tikv/common/PDClientMockTest.java | 51 +++++++------- .../java/org/tikv/common/PDMockServer.java | 46 ++++++------- .../org/tikv/common/PDMockServerTest.java | 6 +- .../org/tikv/common/RegionManagerTest.java | 66 ++++++++++--------- .../org/tikv/common/SeekLeaderStoreTest.java | 4 +- 10 files changed, 172 insertions(+), 118 deletions(-) diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index fbff9a17ed5..846aac5ebbd 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -29,7 +29,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Set; -import java.util.concurrent.*; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.AbstractGRPCClient; @@ -46,6 +46,7 @@ public abstract class AbstractRegionStoreClient extends AbstractGRPCClient implements RegionErrorReceiver { + private static final Logger logger = LoggerFactory.getLogger(AbstractRegionStoreClient.class); public static final Histogram SEEK_LEADER_STORE_DURATION = @@ -100,7 +101,8 @@ protected TikvGrpc.TikvFutureStub getAsyncStub() { } @Override - public void close() throws GrpcException {} + public void close() throws GrpcException { + } /** * onNotLeader deals with NotLeaderError and returns whether re-splitting key range is needed @@ -210,6 +212,7 @@ private Boolean seekLeaderStore(BackOffer backOffer) { // switch to leader store store = currentLeaderStore; updateClientStub(); + return true; } return false; } @@ -292,6 +295,7 @@ private Metapb.Peer switchLeaderStore(BackOffer backOffer) { } } } catch (Exception ignored) { + logger.error(ignored.getMessage() + " peer is " + task.peer.getId()); } } if (unfinished.isEmpty()) { @@ -354,6 +358,7 @@ private TiStore switchProxyStore(BackOffer backOffer) { } private static class SwitchLeaderTask { + private final ListenableFuture task; private final Metapb.Peer peer; @@ -364,6 +369,7 @@ private SwitchLeaderTask(ListenableFuture task, Metapb.P } private static class ForwardCheckTask { + private final ListenableFuture task; private final Metapb.Store store; diff --git a/src/main/java/org/tikv/common/region/TiRegion.java b/src/main/java/org/tikv/common/region/TiRegion.java index 71b326c46e3..048101f94e8 100644 --- a/src/main/java/org/tikv/common/region/TiRegion.java +++ b/src/main/java/org/tikv/common/region/TiRegion.java @@ -40,6 +40,7 @@ import org.tikv.kvproto.Metapb.Region; public class TiRegion implements Serializable { + private static final Logger logger = LoggerFactory.getLogger(TiRegion.class); private final Region meta; @@ -203,23 +204,23 @@ public TiRegion switchPeer(long leaderStoreID) { public boolean isMoreThan(ByteString key) { return FastByteComparisons.compareTo( - meta.getStartKey().toByteArray(), - 0, - meta.getStartKey().size(), - key.toByteArray(), - 0, - key.size()) + meta.getStartKey().toByteArray(), + 0, + meta.getStartKey().size(), + key.toByteArray(), + 0, + key.size()) > 0; } public boolean isLessThan(ByteString key) { return FastByteComparisons.compareTo( - meta.getEndKey().toByteArray(), - 0, - meta.getEndKey().size(), - key.toByteArray(), - 0, - key.size()) + meta.getEndKey().toByteArray(), + 0, + meta.getEndKey().size(), + key.toByteArray(), + 0, + key.size()) <= 0; } @@ -269,6 +270,7 @@ public String toString() { } public class RegionVerID { + final long id; final long confVer; final long ver; diff --git a/src/test/java/org/tikv/common/KVMockServer.java b/src/test/java/org/tikv/common/KVMockServer.java index 8b012ea24ca..607ff22f16e 100644 --- a/src/test/java/org/tikv/common/KVMockServer.java +++ b/src/test/java/org/tikv/common/KVMockServer.java @@ -27,6 +27,11 @@ import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.Status; +import io.grpc.health.v1.HealthCheckRequest; +import io.grpc.health.v1.HealthCheckResponse; +import io.grpc.health.v1.HealthCheckResponse.ServingStatus; +import io.grpc.health.v1.HealthGrpc.HealthImplBase; +import io.grpc.stub.StreamObserver; import java.io.IOException; import java.net.ServerSocket; import java.util.HashMap; @@ -36,6 +41,8 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.tikv.common.key.Key; import org.tikv.common.region.TiRegion; import org.tikv.kvproto.Coprocessor; @@ -50,6 +57,7 @@ public class KVMockServer extends TikvGrpc.TikvImplBase { + private final static Logger logger = LoggerFactory.getLogger(KVMockServer.class); private int port; private Server server; private TiRegion region; @@ -87,6 +95,10 @@ public int getPort() { return port; } + public void setRegion(TiRegion region) { + this.region = region; + } + public void put(ByteString key, ByteString value) { dataMap.put(toRawKey(key), value); } @@ -116,7 +128,7 @@ private void verifyContext(Context context) throws Exception { if (context.getRegionId() != region.getId() || !context.getRegionEpoch().equals(region.getRegionEpoch()) || !context.getPeer().equals(region.getLeader())) { - throw new Exception(); + throw new Exception("context doesn't match"); } } @@ -140,7 +152,12 @@ public void rawGet( setErrorInfo(errorCode, errBuilder); builder.setRegionError(errBuilder.build()); } else { - builder.setValue(dataMap.get(toRawKey(key))); + Key rawKey = toRawKey(key); + ByteString value = dataMap.get(rawKey); + if (value == null) { + value = ByteString.EMPTY; + } + builder.setValue(value); } responseObserver.onNext(builder.build()); responseObserver.onCompleted(); @@ -384,10 +401,27 @@ public int start(TiRegion region) throws IOException { return port; } + private static class HealCheck extends HealthImplBase { + + @Override + public void check(HealthCheckRequest request, + StreamObserver responseObserver) { + responseObserver.onNext( + HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVING).build()); + responseObserver.onCompleted(); + } + } + public void start(TiRegion region, int port) throws IOException { this.port = port; - server = ServerBuilder.forPort(port).addService(this).build().start(); this.region = region; + + logger.info("start mock server on port: " + port); + server = ServerBuilder.forPort(port) + .addService(new HealCheck()) + .addService(this) + .build() + .start(); Runtime.getRuntime().addShutdownHook(new Thread(KVMockServer.this::stop)); } diff --git a/src/test/java/org/tikv/common/MockServerTest.java b/src/test/java/org/tikv/common/MockServerTest.java index 50b9e19b3a1..02cab4c46f4 100644 --- a/src/test/java/org/tikv/common/MockServerTest.java +++ b/src/test/java/org/tikv/common/MockServerTest.java @@ -29,6 +29,7 @@ import org.tikv.kvproto.Pdpb; public class MockServerTest extends PDMockServerTest { + public KVMockServer server; public int port; public TiRegion region; @@ -62,9 +63,11 @@ public void setup() throws IOException { r.getPeers(0), r.getPeersList(), s.stream().map(TiStore::new).collect(Collectors.toList())); - leader.addGetRegionResp(Pdpb.GetRegionResponse.newBuilder().setRegion(r).build()); + leader.addGetRegionListener( + request -> Pdpb.GetRegionResponse.newBuilder().setRegion(r).build()); for (Metapb.Store store : s) { - leader.addGetStoreResp(Pdpb.GetStoreResponse.newBuilder().setStore(store).build()); + leader.addGetStoreListener( + (request) -> Pdpb.GetStoreResponse.newBuilder().setStore(store).build()); } server = new KVMockServer(); port = server.start(region); diff --git a/src/test/java/org/tikv/common/MockThreeStoresTest.java b/src/test/java/org/tikv/common/MockThreeStoresTest.java index b5b52924226..20eaa053586 100644 --- a/src/test/java/org/tikv/common/MockThreeStoresTest.java +++ b/src/test/java/org/tikv/common/MockThreeStoresTest.java @@ -3,6 +3,7 @@ import com.google.common.collect.ImmutableList; import com.google.protobuf.ByteString; import java.io.IOException; +import java.net.ServerSocket; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; @@ -15,15 +16,25 @@ public class MockThreeStoresTest extends PDMockServerTest { - public TiRegion region; - public ArrayList servers = new ArrayList<>(); + protected TiRegion region; + protected List servers = new ArrayList<>(); + protected List stores; @Before @Override public void setup() throws IOException { super.setup(); - int basePort = 0xabcd; + int basePort; + try (ServerSocket s = new ServerSocket(0)) { + basePort = s.getLocalPort(); + } + + ImmutableList peers = ImmutableList.of( + Metapb.Peer.newBuilder().setId(0x1).setStoreId(0x1).build(), + Metapb.Peer.newBuilder().setId(0x2).setStoreId(0x2).build(), + Metapb.Peer.newBuilder().setId(0x3).setStoreId(0x3).build() + ); Metapb.Region region = Metapb.Region.newBuilder() @@ -31,12 +42,10 @@ public void setup() throws IOException { .setId(0xff) .setStartKey(ByteString.EMPTY) .setEndKey(ByteString.EMPTY) - .addPeers(Metapb.Peer.newBuilder().setId(0x1).setStoreId(0x1)) - .addPeers(Metapb.Peer.newBuilder().setId(0x2).setStoreId(0x2)) - .addPeers(Metapb.Peer.newBuilder().setId(0x3).setStoreId(0x3)) + .addAllPeers(peers) .build(); - List stores = + stores = ImmutableList.of( Metapb.Store.newBuilder() .setAddress("127.0.0.1:" + basePort) @@ -46,7 +55,6 @@ public void setup() throws IOException { Metapb.Store.newBuilder() .setAddress("127.0.0.1:" + (basePort + 1)) .setVersion("5.0.0") - .setVersion("5.0.0") .setId(0x2) .build(), Metapb.Store.newBuilder() @@ -57,10 +65,12 @@ public void setup() throws IOException { ); for (PDMockServer server : pdServers) { - server.addGetRegionResp(Pdpb.GetRegionResponse.newBuilder().setRegion(region).build()); - for (Metapb.Store store : stores) { - server.addGetStoreResp(Pdpb.GetStoreResponse.newBuilder().setStore(store).build()); - } + server.addGetRegionListener(request -> + Pdpb.GetRegionResponse.newBuilder().setLeader(peers.get(0)).setRegion(region).build()); + server.addGetStoreListener((request) -> { + int i = (int) request.getStoreId() - 1; + return Pdpb.GetStoreResponse.newBuilder().setStore(stores.get(i)).build(); + }); } this.region = diff --git a/src/test/java/org/tikv/common/PDClientMockTest.java b/src/test/java/org/tikv/common/PDClientMockTest.java index 1bc68212e47..e1961f529c9 100644 --- a/src/test/java/org/tikv/common/PDClientMockTest.java +++ b/src/test/java/org/tikv/common/PDClientMockTest.java @@ -28,6 +28,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; import org.tikv.common.exception.GrpcException; import org.tikv.common.meta.TiTimestamp; @@ -80,11 +81,11 @@ public void testTso() throws Exception { @Test public void testGetRegionByKey() throws Exception { - byte[] startKey = new byte[] {1, 0, 2, 4}; - byte[] endKey = new byte[] {1, 0, 2, 5}; + byte[] startKey = new byte[]{1, 0, 2, 4}; + byte[] endKey = new byte[]{1, 0, 2, 5}; int confVer = 1026; int ver = 1027; - leader.addGetRegionResp( + leader.addGetRegionListener(request -> GrpcUtils.makeGetRegionResponse( leader.getClusterId(), GrpcUtils.makeRegion( @@ -110,12 +111,12 @@ public void testGetRegionByKey() throws Exception { @Test public void testGetRegionById() throws Exception { - byte[] startKey = new byte[] {1, 0, 2, 4}; - byte[] endKey = new byte[] {1, 0, 2, 5}; + byte[] startKey = new byte[]{1, 0, 2, 4}; + byte[] endKey = new byte[]{1, 0, 2, 5}; int confVer = 1026; int ver = 1027; - leader.addGetRegionByIDResp( + leader.addGetRegionByIDListener(request -> GrpcUtils.makeGetRegionResponse( leader.getClusterId(), GrpcUtils.makeRegion( @@ -142,7 +143,7 @@ public void testGetRegionById() throws Exception { public void testGetStore() throws Exception { long storeId = 1; String testAddress = "testAddress"; - leader.addGetStoreResp( + leader.addGetStoreListener(request -> GrpcUtils.makeGetStoreResponse( leader.getClusterId(), GrpcUtils.makeStore( @@ -161,7 +162,7 @@ public void testGetStore() throws Exception { assertEquals("v1", r.getLabels(0).getValue()); assertEquals("v2", r.getLabels(1).getValue()); - leader.addGetStoreResp( + leader.addGetStoreListener(request -> GrpcUtils.makeGetStoreResponse( leader.getClusterId(), GrpcUtils.makeStore(storeId, testAddress, Metapb.StoreState.Tombstone))); @@ -177,11 +178,15 @@ private BackOffer defaultBackOff() { public void testRetryPolicy() throws Exception { long storeId = 1024; ExecutorService service = Executors.newCachedThreadPool(); - leader.addGetStoreResp(null); - leader.addGetStoreResp(null); - leader.addGetStoreResp( - GrpcUtils.makeGetStoreResponse( - leader.getClusterId(), GrpcUtils.makeStore(storeId, "", Metapb.StoreState.Up))); + AtomicInteger i = new AtomicInteger(); + leader.addGetStoreListener(request -> { + if (i.getAndIncrement() < 2) { + return null; + } else { + return GrpcUtils.makeGetStoreResponse(leader.getClusterId(), + GrpcUtils.makeStore(storeId, "", Metapb.StoreState.Up)); + } + }); try (PDClient client = session.getPDClient()) { Callable storeCallable = () -> client.getStore(ConcreteBackOffer.newCustomBackOff(5000), 0); @@ -194,16 +199,16 @@ public void testRetryPolicy() throws Exception { } // Should fail - leader.addGetStoreResp(null); - leader.addGetStoreResp(null); - leader.addGetStoreResp(null); - leader.addGetStoreResp(null); - leader.addGetStoreResp(null); - leader.addGetStoreResp(null); - - leader.addGetStoreResp( - GrpcUtils.makeGetStoreResponse( - leader.getClusterId(), GrpcUtils.makeStore(storeId, "", Metapb.StoreState.Up))); + AtomicInteger j = new AtomicInteger(); + leader.addGetStoreListener(request -> { + if (j.getAndIncrement() < 6) { + return null; + } else { + return GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), GrpcUtils.makeStore(storeId, "", Metapb.StoreState.Up)); + } + }); + try { client.getStore(defaultBackOff(), 0); } catch (GrpcException e) { diff --git a/src/test/java/org/tikv/common/PDMockServer.java b/src/test/java/org/tikv/common/PDMockServer.java index 917b1c7fdd6..b8c7e7c5158 100644 --- a/src/test/java/org/tikv/common/PDMockServer.java +++ b/src/test/java/org/tikv/common/PDMockServer.java @@ -23,13 +23,9 @@ import io.grpc.stub.StreamObserver; import java.io.IOException; import java.net.ServerSocket; -import java.util.Deque; import java.util.Optional; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.stream.Stream; +import java.util.function.Function; import org.tikv.kvproto.PDGrpc; -import org.tikv.kvproto.Pdpb.GetAllStoresRequest; -import org.tikv.kvproto.Pdpb.GetAllStoresResponse; import org.tikv.kvproto.Pdpb.GetMembersRequest; import org.tikv.kvproto.Pdpb.GetMembersResponse; import org.tikv.kvproto.Pdpb.GetRegionByIDRequest; @@ -44,20 +40,22 @@ public class PDMockServer extends PDGrpc.PDImplBase { public int port; private long clusterId; - private Server server; - public void addGetMemberResp(GetMembersResponse r) { - getMembersResp.addLast(Optional.ofNullable(r)); - } + private Function getMembersListener; + private Function getStoreListener; + private Function getRegionListener; + private Function getRegionByIDListener; - private final Deque> getMembersResp = - new LinkedBlockingDeque>(); + + public void addGetMembersListener(Function func) { + getMembersListener = func; + } @Override public void getMembers(GetMembersRequest request, StreamObserver resp) { try { - resp.onNext(getMembersResp.getFirst().get()); + resp.onNext(Optional.ofNullable(getMembersListener.apply(request)).get()); resp.onCompleted(); } catch (Exception e) { resp.onError(Status.INTERNAL.asRuntimeException()); @@ -86,48 +84,42 @@ public void onCompleted() { }; } - public void addGetRegionResp(GetRegionResponse r) { - getRegionResp.addLast(r); + public void addGetRegionListener(Function func) { + getRegionListener = func; } - private final Deque getRegionResp = new LinkedBlockingDeque<>(); - @Override public void getRegion(GetRegionRequest request, StreamObserver resp) { try { - resp.onNext(getRegionResp.removeFirst()); + resp.onNext(getRegionListener.apply(request)); resp.onCompleted(); } catch (Exception e) { resp.onError(Status.INTERNAL.asRuntimeException()); } } - public void addGetRegionByIDResp(GetRegionResponse r) { - getRegionByIDResp.addLast(r); + public void addGetRegionByIDListener(Function func) { + getRegionByIDListener = func; } - private final Deque getRegionByIDResp = new LinkedBlockingDeque<>(); - @Override public void getRegionByID(GetRegionByIDRequest request, StreamObserver resp) { try { - resp.onNext(getRegionByIDResp.removeFirst()); + resp.onNext(getRegionByIDListener.apply(request)); resp.onCompleted(); } catch (Exception e) { resp.onError(Status.INTERNAL.asRuntimeException()); } } - public void addGetStoreResp(GetStoreResponse r) { - getStoreResp.addLast(Optional.ofNullable(r)); + public void addGetStoreListener(Function func) { + getStoreListener = func; } - private final Deque> getStoreResp = new LinkedBlockingDeque<>(); - @Override public void getStore(GetStoreRequest request, StreamObserver resp) { try { - resp.onNext(getStoreResp.removeFirst().get()); + resp.onNext(Optional.ofNullable(getStoreListener.apply(request)).get()); resp.onCompleted(); } catch (Exception e) { resp.onError(Status.INTERNAL.asRuntimeException()); diff --git a/src/test/java/org/tikv/common/PDMockServerTest.java b/src/test/java/org/tikv/common/PDMockServerTest.java index ba097d8a4a3..4794ff6e507 100644 --- a/src/test/java/org/tikv/common/PDMockServerTest.java +++ b/src/test/java/org/tikv/common/PDMockServerTest.java @@ -39,15 +39,15 @@ public void setup() throws IOException { void setup(String addr) throws IOException { int basePort; - try (ServerSocket s = new ServerSocket(0)) { + try (ServerSocket s = new ServerSocket(51820)) { basePort = s.getLocalPort(); } for (int i = 0; i < 3; i++) { PDMockServer server = new PDMockServer(); server.start(CLUSTER_ID, basePort + i); - server.addGetMemberResp( - GrpcUtils.makeGetMembersResponse( + server.addGetMembersListener( + (request) -> GrpcUtils.makeGetMembersResponse( server.getClusterId(), GrpcUtils.makeMember(1, "http://" + addr + ":" + basePort), GrpcUtils.makeMember(2, "http://" + addr + ":" + (basePort + 1)), diff --git a/src/test/java/org/tikv/common/RegionManagerTest.java b/src/test/java/org/tikv/common/RegionManagerTest.java index 0729abf43a7..c34e48363ff 100644 --- a/src/test/java/org/tikv/common/RegionManagerTest.java +++ b/src/test/java/org/tikv/common/RegionManagerTest.java @@ -24,6 +24,7 @@ import com.google.common.collect.TreeRangeMap; import com.google.protobuf.ByteString; import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Before; import org.junit.Test; import org.tikv.common.key.Key; @@ -36,6 +37,7 @@ import org.tikv.kvproto.Metapb.StoreState; public class RegionManagerTest extends PDMockServerTest { + private RegionManager mgr; @Before @@ -57,14 +59,14 @@ public void testRegionBorder() { @Test public void getRegionByKey() { - ByteString startKey = ByteString.copyFrom(new byte[] {1}); - ByteString endKey = ByteString.copyFrom(new byte[] {10}); - ByteString searchKey = ByteString.copyFrom(new byte[] {5}); + ByteString startKey = ByteString.copyFrom(new byte[]{1}); + ByteString endKey = ByteString.copyFrom(new byte[]{10}); + ByteString searchKey = ByteString.copyFrom(new byte[]{5}); int confVer = 1026; int ver = 1027; long regionId = 233; String testAddress = "127.0.0.1"; - leader.addGetRegionResp( + leader.addGetRegionListener(request -> GrpcUtils.makeGetRegionResponse( leader.getClusterId(), GrpcUtils.makeRegion( @@ -74,17 +76,17 @@ public void getRegionByKey() { GrpcUtils.makeRegionEpoch(confVer, ver), GrpcUtils.makePeer(1, 10), GrpcUtils.makePeer(2, 20)))); - for (long id : new long[] {10, 20}) { - leader.addGetStoreResp( - GrpcUtils.makeGetStoreResponse( - leader.getClusterId(), - GrpcUtils.makeStore( - id, - testAddress, - Metapb.StoreState.Up, - GrpcUtils.makeStoreLabel("k1", "v1"), - GrpcUtils.makeStoreLabel("k2", "v2")))); - } + + AtomicInteger i = new AtomicInteger(0); + long[] ids = new long[]{10, 20}; + leader.addGetStoreListener((request -> GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), + GrpcUtils.makeStore( + ids[i.getAndIncrement()], + testAddress, + StoreState.Up, + GrpcUtils.makeStoreLabel("k1", "v1"), + GrpcUtils.makeStoreLabel("k2", "v2"))))); TiRegion region = mgr.getRegionByKey(startKey); assertEquals(region.getId(), regionId); @@ -95,15 +97,15 @@ public void getRegionByKey() { @Test public void getStoreByKey() { - ByteString startKey = ByteString.copyFrom(new byte[] {1}); - ByteString endKey = ByteString.copyFrom(new byte[] {10}); - ByteString searchKey = ByteString.copyFrom(new byte[] {5}); + ByteString startKey = ByteString.copyFrom(new byte[]{1}); + ByteString endKey = ByteString.copyFrom(new byte[]{10}); + ByteString searchKey = ByteString.copyFrom(new byte[]{5}); String testAddress = "testAddress"; long storeId = 233; int confVer = 1026; int ver = 1027; long regionId = 233; - leader.addGetRegionResp( + leader.addGetRegionListener(request -> GrpcUtils.makeGetRegionResponse( leader.getClusterId(), GrpcUtils.makeRegion( @@ -113,17 +115,17 @@ public void getStoreByKey() { GrpcUtils.makeRegionEpoch(confVer, ver), GrpcUtils.makePeer(storeId, 10), GrpcUtils.makePeer(storeId + 1, 20)))); - for (long id : new long[] {10, 20}) { - leader.addGetStoreResp( - GrpcUtils.makeGetStoreResponse( - leader.getClusterId(), - GrpcUtils.makeStore( - id, - testAddress, - Metapb.StoreState.Up, - GrpcUtils.makeStoreLabel("k1", "v1"), - GrpcUtils.makeStoreLabel("k2", "v2")))); - } + + AtomicInteger i = new AtomicInteger(0); + long[] ids = new long[]{10, 20}; + leader.addGetStoreListener((request -> GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), + GrpcUtils.makeStore( + ids[i.getAndIncrement()], + testAddress, + StoreState.Up, + GrpcUtils.makeStoreLabel("k1", "v1"), + GrpcUtils.makeStoreLabel("k2", "v2"))))); Pair pair = mgr.getRegionStorePairByKey(searchKey); assertEquals(pair.first.getId(), regionId); @@ -134,7 +136,7 @@ public void getStoreByKey() { public void getStoreById() { long storeId = 234; String testAddress = "testAddress"; - leader.addGetStoreResp( + leader.addGetStoreListener(request -> GrpcUtils.makeGetStoreResponse( leader.getClusterId(), GrpcUtils.makeStore( @@ -146,7 +148,7 @@ public void getStoreById() { TiStore store = mgr.getStoreById(storeId); assertEquals(store.getStore().getId(), storeId); - leader.addGetStoreResp( + leader.addGetStoreListener(request -> GrpcUtils.makeGetStoreResponse( leader.getClusterId(), GrpcUtils.makeStore( diff --git a/src/test/java/org/tikv/common/SeekLeaderStoreTest.java b/src/test/java/org/tikv/common/SeekLeaderStoreTest.java index d29fbc1d41e..3eedf19dabd 100644 --- a/src/test/java/org/tikv/common/SeekLeaderStoreTest.java +++ b/src/test/java/org/tikv/common/SeekLeaderStoreTest.java @@ -23,7 +23,7 @@ public void testSeekLeader() { client.put(key, value); Assert.assertEquals(value, client.get(key).get()); servers.get(0).setState(State.Fail); - // Assert.assertEquals(State.Normal, servers.get(1).getState()); - Assert.assertEquals(value, client.get(key).get()); + servers.get(1).setRegion(region.switchPeer(stores.get(1).getId())); + client.get(key); } } From e4856950469d6f47154b2216c0e528528dcefb4c Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Wed, 16 Feb 2022 14:46:48 +0800 Subject: [PATCH 03/12] ./dev/javafmt and fix failed test Signed-off-by: iosmanthus --- .../region/AbstractRegionStoreClient.java | 3 +- .../java/org/tikv/common/region/TiRegion.java | 24 ++-- .../java/org/tikv/common/KVMockServer.java | 25 +--- .../org/tikv/common/MockThreeStoresTest.java | 30 ++-- .../org/tikv/common/PDClientMockTest.java | 115 +++++++-------- .../java/org/tikv/common/PDMockServer.java | 7 +- .../org/tikv/common/PDMockServerTest.java | 11 +- .../org/tikv/common/RegionManagerTest.java | 132 ++++++++++-------- .../org/tikv/common/SeekLeaderStoreTest.java | 19 ++- 9 files changed, 193 insertions(+), 173 deletions(-) diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index 846aac5ebbd..0c89687d257 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -101,8 +101,7 @@ protected TikvGrpc.TikvFutureStub getAsyncStub() { } @Override - public void close() throws GrpcException { - } + public void close() throws GrpcException {} /** * onNotLeader deals with NotLeaderError and returns whether re-splitting key range is needed diff --git a/src/main/java/org/tikv/common/region/TiRegion.java b/src/main/java/org/tikv/common/region/TiRegion.java index 048101f94e8..b24c5f66eb4 100644 --- a/src/main/java/org/tikv/common/region/TiRegion.java +++ b/src/main/java/org/tikv/common/region/TiRegion.java @@ -204,23 +204,23 @@ public TiRegion switchPeer(long leaderStoreID) { public boolean isMoreThan(ByteString key) { return FastByteComparisons.compareTo( - meta.getStartKey().toByteArray(), - 0, - meta.getStartKey().size(), - key.toByteArray(), - 0, - key.size()) + meta.getStartKey().toByteArray(), + 0, + meta.getStartKey().size(), + key.toByteArray(), + 0, + key.size()) > 0; } public boolean isLessThan(ByteString key) { return FastByteComparisons.compareTo( - meta.getEndKey().toByteArray(), - 0, - meta.getEndKey().size(), - key.toByteArray(), - 0, - key.size()) + meta.getEndKey().toByteArray(), + 0, + meta.getEndKey().size(), + key.toByteArray(), + 0, + key.size()) <= 0; } diff --git a/src/test/java/org/tikv/common/KVMockServer.java b/src/test/java/org/tikv/common/KVMockServer.java index 607ff22f16e..412d9dcfb43 100644 --- a/src/test/java/org/tikv/common/KVMockServer.java +++ b/src/test/java/org/tikv/common/KVMockServer.java @@ -57,7 +57,7 @@ public class KVMockServer extends TikvGrpc.TikvImplBase { - private final static Logger logger = LoggerFactory.getLogger(KVMockServer.class); + private static final Logger logger = LoggerFactory.getLogger(KVMockServer.class); private int port; private Server server; private TiRegion region; @@ -87,10 +87,6 @@ public void setState(State state) { this.state = state; } - public State getState() { - return state; - } - public int getPort() { return port; } @@ -166,9 +162,7 @@ public void rawGet( } } - /** - * - */ + /** */ public void rawPut( org.tikv.kvproto.Kvrpcpb.RawPutRequest request, io.grpc.stub.StreamObserver responseObserver) { @@ -210,9 +204,7 @@ private void setErrorInfo(int errorCode, Errorpb.Error.Builder errBuilder) { } } - /** - * - */ + /** */ public void rawDelete( org.tikv.kvproto.Kvrpcpb.RawDeleteRequest request, io.grpc.stub.StreamObserver responseObserver) { @@ -404,8 +396,8 @@ public int start(TiRegion region) throws IOException { private static class HealCheck extends HealthImplBase { @Override - public void check(HealthCheckRequest request, - StreamObserver responseObserver) { + public void check( + HealthCheckRequest request, StreamObserver responseObserver) { responseObserver.onNext( HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVING).build()); responseObserver.onCompleted(); @@ -417,11 +409,8 @@ public void start(TiRegion region, int port) throws IOException { this.region = region; logger.info("start mock server on port: " + port); - server = ServerBuilder.forPort(port) - .addService(new HealCheck()) - .addService(this) - .build() - .start(); + server = + ServerBuilder.forPort(port).addService(new HealCheck()).addService(this).build().start(); Runtime.getRuntime().addShutdownHook(new Thread(KVMockServer.this::stop)); } diff --git a/src/test/java/org/tikv/common/MockThreeStoresTest.java b/src/test/java/org/tikv/common/MockThreeStoresTest.java index 20eaa053586..73af0089855 100644 --- a/src/test/java/org/tikv/common/MockThreeStoresTest.java +++ b/src/test/java/org/tikv/common/MockThreeStoresTest.java @@ -30,11 +30,11 @@ public void setup() throws IOException { basePort = s.getLocalPort(); } - ImmutableList peers = ImmutableList.of( - Metapb.Peer.newBuilder().setId(0x1).setStoreId(0x1).build(), - Metapb.Peer.newBuilder().setId(0x2).setStoreId(0x2).build(), - Metapb.Peer.newBuilder().setId(0x3).setStoreId(0x3).build() - ); + ImmutableList peers = + ImmutableList.of( + Metapb.Peer.newBuilder().setId(0x1).setStoreId(0x1).build(), + Metapb.Peer.newBuilder().setId(0x2).setStoreId(0x2).build(), + Metapb.Peer.newBuilder().setId(0x3).setStoreId(0x3).build()); Metapb.Region region = Metapb.Region.newBuilder() @@ -61,16 +61,20 @@ public void setup() throws IOException { .setAddress("127.0.0.1:" + (basePort + 2)) .setVersion("5.0.0") .setId(0x3) - .build() - ); + .build()); for (PDMockServer server : pdServers) { - server.addGetRegionListener(request -> - Pdpb.GetRegionResponse.newBuilder().setLeader(peers.get(0)).setRegion(region).build()); - server.addGetStoreListener((request) -> { - int i = (int) request.getStoreId() - 1; - return Pdpb.GetStoreResponse.newBuilder().setStore(stores.get(i)).build(); - }); + server.addGetRegionListener( + request -> + Pdpb.GetRegionResponse.newBuilder() + .setLeader(peers.get(0)) + .setRegion(region) + .build()); + server.addGetStoreListener( + (request) -> { + int i = (int) request.getStoreId() - 1; + return Pdpb.GetStoreResponse.newBuilder().setStore(stores.get(i)).build(); + }); } this.region = diff --git a/src/test/java/org/tikv/common/PDClientMockTest.java b/src/test/java/org/tikv/common/PDClientMockTest.java index e1961f529c9..d1608e05a4f 100644 --- a/src/test/java/org/tikv/common/PDClientMockTest.java +++ b/src/test/java/org/tikv/common/PDClientMockTest.java @@ -57,8 +57,7 @@ public void testSwitchLeader() throws Exception { try (PDClient client = session.getPDClient()) { client.trySwitchLeader(HTTP + LOCAL_ADDR + ":" + (leader.port + 1)); assertEquals( - client.getPdClientWrapper().getLeaderInfo(), - HTTP + LOCAL_ADDR + ":" + (leader.port + 1)); + client.getPdClientWrapper().getLeaderInfo(), HTTP + LOCAL_ADDR + ":" + (leader.port + 1)); } tearDown(); setup(LOCAL_ADDR_IPV6); @@ -81,20 +80,21 @@ public void testTso() throws Exception { @Test public void testGetRegionByKey() throws Exception { - byte[] startKey = new byte[]{1, 0, 2, 4}; - byte[] endKey = new byte[]{1, 0, 2, 5}; + byte[] startKey = new byte[] {1, 0, 2, 4}; + byte[] endKey = new byte[] {1, 0, 2, 5}; int confVer = 1026; int ver = 1027; - leader.addGetRegionListener(request -> - GrpcUtils.makeGetRegionResponse( - leader.getClusterId(), - GrpcUtils.makeRegion( - 1, - ByteString.copyFrom(startKey), - ByteString.copyFrom(endKey), - GrpcUtils.makeRegionEpoch(confVer, ver), - GrpcUtils.makePeer(1, 10), - GrpcUtils.makePeer(2, 20)))); + leader.addGetRegionListener( + request -> + GrpcUtils.makeGetRegionResponse( + leader.getClusterId(), + GrpcUtils.makeRegion( + 1, + ByteString.copyFrom(startKey), + ByteString.copyFrom(endKey), + GrpcUtils.makeRegionEpoch(confVer, ver), + GrpcUtils.makePeer(1, 10), + GrpcUtils.makePeer(2, 20)))); try (PDClient client = session.getPDClient()) { Pair rl = client.getRegionByKey(defaultBackOff(), ByteString.EMPTY); @@ -111,21 +111,22 @@ public void testGetRegionByKey() throws Exception { @Test public void testGetRegionById() throws Exception { - byte[] startKey = new byte[]{1, 0, 2, 4}; - byte[] endKey = new byte[]{1, 0, 2, 5}; + byte[] startKey = new byte[] {1, 0, 2, 4}; + byte[] endKey = new byte[] {1, 0, 2, 5}; int confVer = 1026; int ver = 1027; - leader.addGetRegionByIDListener(request -> - GrpcUtils.makeGetRegionResponse( - leader.getClusterId(), - GrpcUtils.makeRegion( - 1, - ByteString.copyFrom(startKey), - ByteString.copyFrom(endKey), - GrpcUtils.makeRegionEpoch(confVer, ver), - GrpcUtils.makePeer(1, 10), - GrpcUtils.makePeer(2, 20)))); + leader.addGetRegionByIDListener( + request -> + GrpcUtils.makeGetRegionResponse( + leader.getClusterId(), + GrpcUtils.makeRegion( + 1, + ByteString.copyFrom(startKey), + ByteString.copyFrom(endKey), + GrpcUtils.makeRegionEpoch(confVer, ver), + GrpcUtils.makePeer(1, 10), + GrpcUtils.makePeer(2, 20)))); try (PDClient client = session.getPDClient()) { Pair rl = client.getRegionByID(defaultBackOff(), 0); Metapb.Region r = rl.first; @@ -143,15 +144,16 @@ public void testGetRegionById() throws Exception { public void testGetStore() throws Exception { long storeId = 1; String testAddress = "testAddress"; - leader.addGetStoreListener(request -> - GrpcUtils.makeGetStoreResponse( - leader.getClusterId(), - GrpcUtils.makeStore( - storeId, - testAddress, - Metapb.StoreState.Up, - GrpcUtils.makeStoreLabel("k1", "v1"), - GrpcUtils.makeStoreLabel("k2", "v2")))); + leader.addGetStoreListener( + request -> + GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), + GrpcUtils.makeStore( + storeId, + testAddress, + Metapb.StoreState.Up, + GrpcUtils.makeStoreLabel("k1", "v1"), + GrpcUtils.makeStoreLabel("k2", "v2")))); try (PDClient client = session.getPDClient()) { Store r = client.getStore(defaultBackOff(), storeId); assertEquals(storeId, r.getId()); @@ -162,10 +164,11 @@ public void testGetStore() throws Exception { assertEquals("v1", r.getLabels(0).getValue()); assertEquals("v2", r.getLabels(1).getValue()); - leader.addGetStoreListener(request -> - GrpcUtils.makeGetStoreResponse( - leader.getClusterId(), - GrpcUtils.makeStore(storeId, testAddress, Metapb.StoreState.Tombstone))); + leader.addGetStoreListener( + request -> + GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), + GrpcUtils.makeStore(storeId, testAddress, Metapb.StoreState.Tombstone))); assertEquals(StoreState.Tombstone, client.getStore(defaultBackOff(), storeId).getState()); } } @@ -179,14 +182,15 @@ public void testRetryPolicy() throws Exception { long storeId = 1024; ExecutorService service = Executors.newCachedThreadPool(); AtomicInteger i = new AtomicInteger(); - leader.addGetStoreListener(request -> { - if (i.getAndIncrement() < 2) { - return null; - } else { - return GrpcUtils.makeGetStoreResponse(leader.getClusterId(), - GrpcUtils.makeStore(storeId, "", Metapb.StoreState.Up)); - } - }); + leader.addGetStoreListener( + request -> { + if (i.getAndIncrement() < 2) { + return null; + } else { + return GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), GrpcUtils.makeStore(storeId, "", Metapb.StoreState.Up)); + } + }); try (PDClient client = session.getPDClient()) { Callable storeCallable = () -> client.getStore(ConcreteBackOffer.newCustomBackOff(5000), 0); @@ -200,14 +204,15 @@ public void testRetryPolicy() throws Exception { // Should fail AtomicInteger j = new AtomicInteger(); - leader.addGetStoreListener(request -> { - if (j.getAndIncrement() < 6) { - return null; - } else { - return GrpcUtils.makeGetStoreResponse( - leader.getClusterId(), GrpcUtils.makeStore(storeId, "", Metapb.StoreState.Up)); - } - }); + leader.addGetStoreListener( + request -> { + if (j.getAndIncrement() < 6) { + return null; + } else { + return GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), GrpcUtils.makeStore(storeId, "", Metapb.StoreState.Up)); + } + }); try { client.getStore(defaultBackOff(), 0); diff --git a/src/test/java/org/tikv/common/PDMockServer.java b/src/test/java/org/tikv/common/PDMockServer.java index b8c7e7c5158..3d64624f448 100644 --- a/src/test/java/org/tikv/common/PDMockServer.java +++ b/src/test/java/org/tikv/common/PDMockServer.java @@ -47,7 +47,6 @@ public class PDMockServer extends PDGrpc.PDImplBase { private Function getRegionListener; private Function getRegionByIDListener; - public void addGetMembersListener(Function func) { getMembersListener = func; } @@ -69,12 +68,10 @@ public StreamObserver tso(StreamObserver resp) { private int logical = 0; @Override - public void onNext(TsoRequest value) { - } + public void onNext(TsoRequest value) {} @Override - public void onError(Throwable t) { - } + public void onError(Throwable t) {} @Override public void onCompleted() { diff --git a/src/test/java/org/tikv/common/PDMockServerTest.java b/src/test/java/org/tikv/common/PDMockServerTest.java index 4794ff6e507..c1ecc637dca 100644 --- a/src/test/java/org/tikv/common/PDMockServerTest.java +++ b/src/test/java/org/tikv/common/PDMockServerTest.java @@ -47,11 +47,12 @@ void setup(String addr) throws IOException { PDMockServer server = new PDMockServer(); server.start(CLUSTER_ID, basePort + i); server.addGetMembersListener( - (request) -> GrpcUtils.makeGetMembersResponse( - server.getClusterId(), - GrpcUtils.makeMember(1, "http://" + addr + ":" + basePort), - GrpcUtils.makeMember(2, "http://" + addr + ":" + (basePort + 1)), - GrpcUtils.makeMember(3, "http://" + addr + ":" + (basePort + 2)))); + (request) -> + GrpcUtils.makeGetMembersResponse( + server.getClusterId(), + GrpcUtils.makeMember(1, "http://" + addr + ":" + basePort), + GrpcUtils.makeMember(2, "http://" + addr + ":" + (basePort + 1)), + GrpcUtils.makeMember(3, "http://" + addr + ":" + (basePort + 2)))); pdServers.add(server); } diff --git a/src/test/java/org/tikv/common/RegionManagerTest.java b/src/test/java/org/tikv/common/RegionManagerTest.java index c34e48363ff..6052640f9bc 100644 --- a/src/test/java/org/tikv/common/RegionManagerTest.java +++ b/src/test/java/org/tikv/common/RegionManagerTest.java @@ -59,34 +59,37 @@ public void testRegionBorder() { @Test public void getRegionByKey() { - ByteString startKey = ByteString.copyFrom(new byte[]{1}); - ByteString endKey = ByteString.copyFrom(new byte[]{10}); - ByteString searchKey = ByteString.copyFrom(new byte[]{5}); + ByteString startKey = ByteString.copyFrom(new byte[] {1}); + ByteString endKey = ByteString.copyFrom(new byte[] {10}); + ByteString searchKey = ByteString.copyFrom(new byte[] {5}); int confVer = 1026; int ver = 1027; long regionId = 233; String testAddress = "127.0.0.1"; - leader.addGetRegionListener(request -> - GrpcUtils.makeGetRegionResponse( - leader.getClusterId(), - GrpcUtils.makeRegion( - regionId, - GrpcUtils.encodeKey(startKey.toByteArray()), - GrpcUtils.encodeKey(endKey.toByteArray()), - GrpcUtils.makeRegionEpoch(confVer, ver), - GrpcUtils.makePeer(1, 10), - GrpcUtils.makePeer(2, 20)))); + leader.addGetRegionListener( + request -> + GrpcUtils.makeGetRegionResponse( + leader.getClusterId(), + GrpcUtils.makeRegion( + regionId, + GrpcUtils.encodeKey(startKey.toByteArray()), + GrpcUtils.encodeKey(endKey.toByteArray()), + GrpcUtils.makeRegionEpoch(confVer, ver), + GrpcUtils.makePeer(1, 10), + GrpcUtils.makePeer(2, 20)))); AtomicInteger i = new AtomicInteger(0); - long[] ids = new long[]{10, 20}; - leader.addGetStoreListener((request -> GrpcUtils.makeGetStoreResponse( - leader.getClusterId(), - GrpcUtils.makeStore( - ids[i.getAndIncrement()], - testAddress, - StoreState.Up, - GrpcUtils.makeStoreLabel("k1", "v1"), - GrpcUtils.makeStoreLabel("k2", "v2"))))); + long[] ids = new long[] {10, 20}; + leader.addGetStoreListener( + (request -> + GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), + GrpcUtils.makeStore( + ids[i.getAndIncrement()], + testAddress, + StoreState.Up, + GrpcUtils.makeStoreLabel("k1", "v1"), + GrpcUtils.makeStoreLabel("k2", "v2"))))); TiRegion region = mgr.getRegionByKey(startKey); assertEquals(region.getId(), regionId); @@ -97,35 +100,38 @@ public void getRegionByKey() { @Test public void getStoreByKey() { - ByteString startKey = ByteString.copyFrom(new byte[]{1}); - ByteString endKey = ByteString.copyFrom(new byte[]{10}); - ByteString searchKey = ByteString.copyFrom(new byte[]{5}); + ByteString startKey = ByteString.copyFrom(new byte[] {1}); + ByteString endKey = ByteString.copyFrom(new byte[] {10}); + ByteString searchKey = ByteString.copyFrom(new byte[] {5}); String testAddress = "testAddress"; long storeId = 233; int confVer = 1026; int ver = 1027; long regionId = 233; - leader.addGetRegionListener(request -> - GrpcUtils.makeGetRegionResponse( - leader.getClusterId(), - GrpcUtils.makeRegion( - regionId, - GrpcUtils.encodeKey(startKey.toByteArray()), - GrpcUtils.encodeKey(endKey.toByteArray()), - GrpcUtils.makeRegionEpoch(confVer, ver), - GrpcUtils.makePeer(storeId, 10), - GrpcUtils.makePeer(storeId + 1, 20)))); + leader.addGetRegionListener( + request -> + GrpcUtils.makeGetRegionResponse( + leader.getClusterId(), + GrpcUtils.makeRegion( + regionId, + GrpcUtils.encodeKey(startKey.toByteArray()), + GrpcUtils.encodeKey(endKey.toByteArray()), + GrpcUtils.makeRegionEpoch(confVer, ver), + GrpcUtils.makePeer(storeId, 10), + GrpcUtils.makePeer(storeId + 1, 20)))); AtomicInteger i = new AtomicInteger(0); - long[] ids = new long[]{10, 20}; - leader.addGetStoreListener((request -> GrpcUtils.makeGetStoreResponse( - leader.getClusterId(), - GrpcUtils.makeStore( - ids[i.getAndIncrement()], - testAddress, - StoreState.Up, - GrpcUtils.makeStoreLabel("k1", "v1"), - GrpcUtils.makeStoreLabel("k2", "v2"))))); + long[] ids = new long[] {10, 20}; + leader.addGetStoreListener( + (request -> + GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), + GrpcUtils.makeStore( + ids[i.getAndIncrement()], + testAddress, + StoreState.Up, + GrpcUtils.makeStoreLabel("k1", "v1"), + GrpcUtils.makeStoreLabel("k2", "v2"))))); Pair pair = mgr.getRegionStorePairByKey(searchKey); assertEquals(pair.first.getId(), regionId); @@ -136,27 +142,29 @@ public void getStoreByKey() { public void getStoreById() { long storeId = 234; String testAddress = "testAddress"; - leader.addGetStoreListener(request -> - GrpcUtils.makeGetStoreResponse( - leader.getClusterId(), - GrpcUtils.makeStore( - storeId, - testAddress, - Metapb.StoreState.Up, - GrpcUtils.makeStoreLabel("k1", "v1"), - GrpcUtils.makeStoreLabel("k2", "v2")))); + leader.addGetStoreListener( + request -> + GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), + GrpcUtils.makeStore( + storeId, + testAddress, + Metapb.StoreState.Up, + GrpcUtils.makeStoreLabel("k1", "v1"), + GrpcUtils.makeStoreLabel("k2", "v2")))); TiStore store = mgr.getStoreById(storeId); assertEquals(store.getStore().getId(), storeId); - leader.addGetStoreListener(request -> - GrpcUtils.makeGetStoreResponse( - leader.getClusterId(), - GrpcUtils.makeStore( - storeId + 1, - testAddress, - StoreState.Tombstone, - GrpcUtils.makeStoreLabel("k1", "v1"), - GrpcUtils.makeStoreLabel("k2", "v2")))); + leader.addGetStoreListener( + request -> + GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), + GrpcUtils.makeStore( + storeId + 1, + testAddress, + StoreState.Tombstone, + GrpcUtils.makeStoreLabel("k1", "v1"), + GrpcUtils.makeStoreLabel("k2", "v2")))); try { mgr.getStoreById(storeId + 1); diff --git a/src/test/java/org/tikv/common/SeekLeaderStoreTest.java b/src/test/java/org/tikv/common/SeekLeaderStoreTest.java index 3eedf19dabd..d4e83fc423f 100644 --- a/src/test/java/org/tikv/common/SeekLeaderStoreTest.java +++ b/src/test/java/org/tikv/common/SeekLeaderStoreTest.java @@ -1,3 +1,20 @@ +/* + * Copyright 2017 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + package org.tikv.common; import com.google.protobuf.ByteString; @@ -24,6 +41,6 @@ public void testSeekLeader() { Assert.assertEquals(value, client.get(key).get()); servers.get(0).setState(State.Fail); servers.get(1).setRegion(region.switchPeer(stores.get(1).getId())); - client.get(key); + Assert.assertEquals(value, client.get(key).get()); } } From 0aff000e0f9ade963a6a67f0a2ae8e4b0e919901 Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Wed, 16 Feb 2022 14:50:23 +0800 Subject: [PATCH 04/12] add license header in MockThreeStoresTest Signed-off-by: iosmanthus --- .../org/tikv/common/MockThreeStoresTest.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/test/java/org/tikv/common/MockThreeStoresTest.java b/src/test/java/org/tikv/common/MockThreeStoresTest.java index 73af0089855..14ce38299f8 100644 --- a/src/test/java/org/tikv/common/MockThreeStoresTest.java +++ b/src/test/java/org/tikv/common/MockThreeStoresTest.java @@ -1,3 +1,20 @@ +/* + * Copyright 2017 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + package org.tikv.common; import com.google.common.collect.ImmutableList; From fa7ca882b76410ea338a393ba40cf4ce234745ae Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Wed, 16 Feb 2022 15:05:58 +0800 Subject: [PATCH 05/12] remove ignored exception log Signed-off-by: iosmanthus --- .../org/tikv/common/region/AbstractRegionStoreClient.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index 6241410e030..ffdaed83e5e 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -103,7 +103,8 @@ protected TikvGrpc.TikvFutureStub getAsyncStub() { } @Override - public void close() throws GrpcException {} + public void close() throws GrpcException { + } /** * onNotLeader deals with NotLeaderError and returns whether re-splitting key range is needed @@ -314,7 +315,6 @@ private Metapb.Peer switchLeaderStore(BackOffer backOffer) { } } } catch (Exception ignored) { - logger.error(ignored.getMessage() + " peer is " + task.peer.getId()); } } if (unfinished.isEmpty()) { From a374f01a16fdc111874400ac9bfb1869640f0b44 Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Wed, 16 Feb 2022 15:51:14 +0800 Subject: [PATCH 06/12] ./dev/javafmt Signed-off-by: iosmanthus --- .../java/org/tikv/common/region/AbstractRegionStoreClient.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index ffdaed83e5e..a9c910fcb40 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -103,8 +103,7 @@ protected TikvGrpc.TikvFutureStub getAsyncStub() { } @Override - public void close() throws GrpcException { - } + public void close() throws GrpcException {} /** * onNotLeader deals with NotLeaderError and returns whether re-splitting key range is needed From 838a1b76902fea38797813cc42c6ddde3884088c Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Thu, 17 Feb 2022 19:36:02 +0800 Subject: [PATCH 07/12] add unit tests for seekProxyStore Signed-off-by: iosmanthus --- .../org/tikv/common/PDMockServerTest.java | 3 +- .../org/tikv/common/SeekProxyStoreTest.java | 28 +++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) create mode 100644 src/test/java/org/tikv/common/SeekProxyStoreTest.java diff --git a/src/test/java/org/tikv/common/PDMockServerTest.java b/src/test/java/org/tikv/common/PDMockServerTest.java index c1ecc637dca..d9d3eecb27d 100644 --- a/src/test/java/org/tikv/common/PDMockServerTest.java +++ b/src/test/java/org/tikv/common/PDMockServerTest.java @@ -61,7 +61,8 @@ void setup(String addr) throws IOException { conf.setEnableGrpcForward(false); conf.setKvMode("RAW"); conf.setTest(true); - conf.setTimeout(20000000); + conf.setTimeout(2000); + conf.setEnableGrpcForward(true); session = TiSession.create(conf); } diff --git a/src/test/java/org/tikv/common/SeekProxyStoreTest.java b/src/test/java/org/tikv/common/SeekProxyStoreTest.java new file mode 100644 index 00000000000..d9994c6b295 --- /dev/null +++ b/src/test/java/org/tikv/common/SeekProxyStoreTest.java @@ -0,0 +1,28 @@ +package org.tikv.common; + +import com.google.protobuf.ByteString; +import org.junit.Assert; +import org.junit.Test; +import org.tikv.common.KVMockServer.State; +import org.tikv.raw.RawKVClient; + +public class SeekProxyStoreTest extends MockThreeStoresTest { + + private RawKVClient createClient() { + RawKVClient client = session.createRawClient(); + return client; + } + + @Test + public void testSeekProxyStore() { + RawKVClient client = createClient(); + ByteString key = ByteString.copyFromUtf8("key"); + ByteString value = ByteString.copyFromUtf8("value"); + put(key, value); + + client.put(key, value); + Assert.assertEquals(value, client.get(key).get()); + servers.get(0).setState(State.Fail); + Assert.assertEquals(value, client.get(key).get()); + } +} From 05e108963045a31c7fa02c5531c00778c4e32e04 Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Thu, 17 Feb 2022 19:39:56 +0800 Subject: [PATCH 08/12] add comments to SeekProxyStoreTest Signed-off-by: iosmanthus --- src/test/java/org/tikv/common/SeekProxyStoreTest.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/test/java/org/tikv/common/SeekProxyStoreTest.java b/src/test/java/org/tikv/common/SeekProxyStoreTest.java index d9994c6b295..b0717664627 100644 --- a/src/test/java/org/tikv/common/SeekProxyStoreTest.java +++ b/src/test/java/org/tikv/common/SeekProxyStoreTest.java @@ -22,7 +22,11 @@ public void testSeekProxyStore() { client.put(key, value); Assert.assertEquals(value, client.get(key).get()); + // Set the leader to state Fail, the request will route to peer 0x2, which is not the leader. + // The state of three peers is the same. + // Thus, with the correct context, the peer 0x2 will return normally. servers.get(0).setState(State.Fail); + Assert.assertEquals(value, client.get(key).get()); } } From 1175bde9f26472a87052deb4bd9c9b2a8832308b Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Thu, 17 Feb 2022 19:41:28 +0800 Subject: [PATCH 09/12] fix license error Signed-off-by: iosmanthus --- .../org/tikv/common/MockThreeStoresTest.java | 2 +- .../org/tikv/common/SeekLeaderStoreTest.java | 17 +++++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/src/test/java/org/tikv/common/MockThreeStoresTest.java b/src/test/java/org/tikv/common/MockThreeStoresTest.java index 14ce38299f8..fa4dbe41e35 100644 --- a/src/test/java/org/tikv/common/MockThreeStoresTest.java +++ b/src/test/java/org/tikv/common/MockThreeStoresTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2017 TiKV Project Authors. + * Copyright 2022 TiKV Project Authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/test/java/org/tikv/common/SeekLeaderStoreTest.java b/src/test/java/org/tikv/common/SeekLeaderStoreTest.java index d4e83fc423f..2469fc5111b 100644 --- a/src/test/java/org/tikv/common/SeekLeaderStoreTest.java +++ b/src/test/java/org/tikv/common/SeekLeaderStoreTest.java @@ -1,3 +1,20 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + /* * Copyright 2017 TiKV Project Authors. * From 81058ebc2a8e930b7cdc262b5745bf6453fd4079 Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Thu, 17 Feb 2022 19:43:48 +0800 Subject: [PATCH 10/12] fix license error Signed-off-by: iosmanthus --- .../org/tikv/common/SeekLeaderStoreTest.java | 17 ----------------- .../org/tikv/common/SeekProxyStoreTest.java | 17 +++++++++++++++++ 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/src/test/java/org/tikv/common/SeekLeaderStoreTest.java b/src/test/java/org/tikv/common/SeekLeaderStoreTest.java index 2469fc5111b..9fa115bfac4 100644 --- a/src/test/java/org/tikv/common/SeekLeaderStoreTest.java +++ b/src/test/java/org/tikv/common/SeekLeaderStoreTest.java @@ -15,23 +15,6 @@ * */ -/* - * Copyright 2017 TiKV Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - package org.tikv.common; import com.google.protobuf.ByteString; diff --git a/src/test/java/org/tikv/common/SeekProxyStoreTest.java b/src/test/java/org/tikv/common/SeekProxyStoreTest.java index b0717664627..6016781a648 100644 --- a/src/test/java/org/tikv/common/SeekProxyStoreTest.java +++ b/src/test/java/org/tikv/common/SeekProxyStoreTest.java @@ -1,3 +1,20 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + package org.tikv.common; import com.google.protobuf.ByteString; From ffeaa1f83616f7061f8531fda496e5ecb94b1dc5 Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Fri, 18 Feb 2022 16:06:24 +0800 Subject: [PATCH 11/12] enable grpc forward in PDMockServerTest Signed-off-by: iosmanthus --- src/test/java/org/tikv/common/PDMockServerTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/test/java/org/tikv/common/PDMockServerTest.java b/src/test/java/org/tikv/common/PDMockServerTest.java index d9d3eecb27d..457fff3feb3 100644 --- a/src/test/java/org/tikv/common/PDMockServerTest.java +++ b/src/test/java/org/tikv/common/PDMockServerTest.java @@ -58,7 +58,6 @@ void setup(String addr) throws IOException { leader = pdServers.get(0); TiConfiguration conf = TiConfiguration.createDefault(addr + ":" + leader.port); - conf.setEnableGrpcForward(false); conf.setKvMode("RAW"); conf.setTest(true); conf.setTimeout(2000); From 152e3d2a3118c3f462870bb6196e0b42d24404b5 Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Fri, 18 Feb 2022 16:42:52 +0800 Subject: [PATCH 12/12] remove ignore warmup for tests Signed-off-by: iosmanthus --- src/main/java/org/tikv/common/TiSession.java | 4 ---- src/test/java/org/tikv/common/PDMockServerTest.java | 2 +- src/test/java/org/tikv/common/SeekLeaderStoreTest.java | 3 +-- src/test/java/org/tikv/common/SeekProxyStoreTest.java | 3 +-- 4 files changed, 3 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index 31a8e5f4f86..8dd1793cbca 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -178,10 +178,6 @@ private static VersionInfo getVersionInfo() { } private synchronized void warmUp() { - if (conf.isTest()) { - logger.info("skip warm up in test mode"); - return; - } long warmUpStartTime = System.nanoTime(); BackOffer backOffer = ConcreteBackOffer.newRawKVBackOff(); try { diff --git a/src/test/java/org/tikv/common/PDMockServerTest.java b/src/test/java/org/tikv/common/PDMockServerTest.java index 457fff3feb3..30f4f9dd651 100644 --- a/src/test/java/org/tikv/common/PDMockServerTest.java +++ b/src/test/java/org/tikv/common/PDMockServerTest.java @@ -59,7 +59,7 @@ void setup(String addr) throws IOException { leader = pdServers.get(0); TiConfiguration conf = TiConfiguration.createDefault(addr + ":" + leader.port); conf.setKvMode("RAW"); - conf.setTest(true); + conf.setWarmUpEnable(false); conf.setTimeout(2000); conf.setEnableGrpcForward(true); session = TiSession.create(conf); diff --git a/src/test/java/org/tikv/common/SeekLeaderStoreTest.java b/src/test/java/org/tikv/common/SeekLeaderStoreTest.java index 9fa115bfac4..08dbe01dd3a 100644 --- a/src/test/java/org/tikv/common/SeekLeaderStoreTest.java +++ b/src/test/java/org/tikv/common/SeekLeaderStoreTest.java @@ -26,8 +26,7 @@ public class SeekLeaderStoreTest extends MockThreeStoresTest { private RawKVClient createClient() { - RawKVClient client = session.createRawClient(); - return client; + return session.createRawClient(); } @Test diff --git a/src/test/java/org/tikv/common/SeekProxyStoreTest.java b/src/test/java/org/tikv/common/SeekProxyStoreTest.java index 6016781a648..e4b34c4fa3f 100644 --- a/src/test/java/org/tikv/common/SeekProxyStoreTest.java +++ b/src/test/java/org/tikv/common/SeekProxyStoreTest.java @@ -26,8 +26,7 @@ public class SeekProxyStoreTest extends MockThreeStoresTest { private RawKVClient createClient() { - RawKVClient client = session.createRawClient(); - return client; + return session.createRawClient(); } @Test