From bda076228a9e0879252b7a93fd2fc606d488aac1 Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Wed, 23 Mar 2022 12:48:48 +0800 Subject: [PATCH 1/4] cherry pick #551 to release-3.1 Signed-off-by: ti-srebot --- .../region/AbstractRegionStoreClient.java | 75 +++--- .../org/tikv/common/region/RegionManager.java | 29 +-- .../org/tikv/common/PDClientMockTest.java | 226 ++++++++++++++++++ .../org/tikv/common/PDMockServerTest.java | 30 +++ .../org/tikv/common/SeekLeaderStoreTest.java | 78 ++++++ 5 files changed, 395 insertions(+), 43 deletions(-) create mode 100644 src/test/java/org/tikv/common/PDClientMockTest.java create mode 100644 src/test/java/org/tikv/common/SeekLeaderStoreTest.java diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index 6b33a9a97c4..6c37d2651fc 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -255,19 +255,27 @@ private Metapb.Peer switchLeaderStore(BackOffer backOffer) { List responses = new LinkedList<>(); for (Metapb.Peer peer : region.getFollowerList()) { ByteString key = region.getStartKey(); - TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer); - ManagedChannel channel = - channelFactory.getChannel( - peerStore.getAddress(), regionManager.getPDClient().getHostMapping()); - TikvGrpc.TikvFutureStub stub = - TikvGrpc.newFutureStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS); - Kvrpcpb.RawGetRequest rawGetRequest = - Kvrpcpb.RawGetRequest.newBuilder() - .setContext(region.getReplicaContext(peer)) - .setKey(key) - .build(); - ListenableFuture task = stub.rawGet(rawGetRequest); - responses.add(new SwitchLeaderTask(task, peer)); + try { + TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer); + ManagedChannel channel = + channelFactory.getChannel( + peerStore.getAddress(), regionManager.getPDClient().getHostMapping()); + TikvGrpc.TikvFutureStub stub = + TikvGrpc.newFutureStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS); + Kvrpcpb.RawGetRequest rawGetRequest = + Kvrpcpb.RawGetRequest.newBuilder() + .setContext(region.getReplicaContext(peer)) + .setKey(key) + .build(); + ListenableFuture task = stub.rawGet(rawGetRequest); + responses.add(new SwitchLeaderTask(task, peer)); + } catch (Exception e) { + logger.warn( + "switch region[{}] leader store to {} failed: {}", + region.getId(), + peer.getStoreId(), + e); + } } while (true) { try { @@ -306,22 +314,31 @@ private TiStore switchProxyStore(BackOffer backOffer) { List responses = new LinkedList<>(); for (Metapb.Peer peer : region.getFollowerList()) { ByteString key = region.getStartKey(); - TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer); - ManagedChannel channel = - channelFactory.getChannel( - peerStore.getAddress(), regionManager.getPDClient().getHostMapping()); - TikvGrpc.TikvFutureStub stub = - TikvGrpc.newFutureStub(channel).withDeadlineAfter(forwardTimeout, TimeUnit.MILLISECONDS); - Metadata header = new Metadata(); - header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress()); - Kvrpcpb.RawGetRequest rawGetRequest = - Kvrpcpb.RawGetRequest.newBuilder() - .setContext(region.getReplicaContext(region.getLeader())) - .setKey(key) - .build(); - ListenableFuture task = - MetadataUtils.attachHeaders(stub, header).rawGet(rawGetRequest); - responses.add(new ForwardCheckTask(task, peerStore.getStore())); + try { + TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer); + ManagedChannel channel = + channelFactory.getChannel( + peerStore.getAddress(), regionManager.getPDClient().getHostMapping()); + TikvGrpc.TikvFutureStub stub = + TikvGrpc.newFutureStub(channel) + .withDeadlineAfter(forwardTimeout, TimeUnit.MILLISECONDS); + Metadata header = new Metadata(); + header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress()); + Kvrpcpb.RawGetRequest rawGetRequest = + Kvrpcpb.RawGetRequest.newBuilder() + .setContext(region.getReplicaContext(region.getLeader())) + .setKey(key) + .build(); + ListenableFuture task = + MetadataUtils.attachHeaders(stub, header).rawGet(rawGetRequest); + responses.add(new ForwardCheckTask(task, peerStore.getStore())); + } catch (Exception e) { + logger.warn( + "switch region[{}] leader store to {} failed: {}", + region.getId(), + peer.getStoreId(), + e); + } } while (true) { try { diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index fdce6465915..c73725e79d6 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -26,7 +26,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.ReadOnlyPDClient; @@ -47,6 +46,7 @@ @SuppressWarnings("UnstableApiUsage") public class RegionManager { + private static final Logger logger = LoggerFactory.getLogger(RegionManager.class); public static final Histogram GET_REGION_BY_KEY_REQUEST_LATENCY = HistogramUtils.buildDuration() @@ -200,22 +200,23 @@ public Pair getRegionStorePairByKey( } public TiRegion createRegion(Metapb.Region region, BackOffer backOffer) { - List peers = region.getPeersList(); - List stores = getRegionStore(peers, backOffer); - return new TiRegion(conf, region, null, peers, stores); + return createRegion(region, null, backOffer); } private TiRegion createRegion(Metapb.Region region, Metapb.Peer leader, BackOffer backOffer) { - List peers = region.getPeersList(); - List stores = getRegionStore(peers, backOffer); - return new TiRegion(conf, region, leader, peers, stores); - } - - private List getRegionStore(List peers, BackOffer backOffer) { - return peers - .stream() - .map(p -> getStoreById(p.getStoreId(), backOffer)) - .collect(Collectors.toList()); + List peers = new ArrayList<>(); + List stores = new ArrayList<>(); + for (Metapb.Peer peer : region.getPeersList()) { + try { + stores.add(getStoreById(peer.getStoreId(), backOffer)); + peers.add(peer); + } catch (Exception e) { + logger.warn("Store {} not found: {}", peer.getStoreId(), e.toString()); + } + } + Metapb.Region newRegion = + Metapb.Region.newBuilder().mergeFrom(region).clearPeers().addAllPeers(peers).build(); + return new TiRegion(conf, newRegion, leader, peers, stores); } private TiStore getStoreByIdWithBackOff(long id, BackOffer backOffer) { diff --git a/src/test/java/org/tikv/common/PDClientMockTest.java b/src/test/java/org/tikv/common/PDClientMockTest.java new file mode 100644 index 00000000000..3ba0e374392 --- /dev/null +++ b/src/test/java/org/tikv/common/PDClientMockTest.java @@ -0,0 +1,226 @@ +/* + * Copyright 2017 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import com.google.protobuf.ByteString; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Test; +import org.tikv.common.exception.GrpcException; +import org.tikv.common.meta.TiTimestamp; +import org.tikv.common.util.BackOffer; +import org.tikv.common.util.ConcreteBackOffer; +import org.tikv.common.util.Pair; +import org.tikv.kvproto.Metapb; +import org.tikv.kvproto.Metapb.Store; +import org.tikv.kvproto.Metapb.StoreState; + +public class PDClientMockTest extends PDMockServerTest { + + private static final String LOCAL_ADDR_IPV6 = "[::1]"; + public static final String HTTP = "http://"; + + @Test + public void testCreate() throws Exception { + try (PDClient client = session.getPDClient()) { + assertEquals(LOCAL_ADDR + ":" + leader.port, client.getPdClientWrapper().getLeaderInfo()); + assertEquals(CLUSTER_ID, client.getHeader().getClusterId()); + } + } + + @Test + public void testSwitchLeader() throws Exception { + try (PDClient client = session.getPDClient()) { + client.trySwitchLeader(HTTP + LOCAL_ADDR + ":" + (leader.port + 1)); + assertEquals( + client.getPdClientWrapper().getLeaderInfo(), HTTP + LOCAL_ADDR + ":" + (leader.port + 1)); + } + tearDown(); + setup(LOCAL_ADDR_IPV6); + try (PDClient client = session.getPDClient()) { + client.trySwitchLeader(HTTP + LOCAL_ADDR_IPV6 + ":" + (leader.port + 2)); + assertEquals( + client.getPdClientWrapper().getLeaderInfo(), + HTTP + LOCAL_ADDR_IPV6 + ":" + (leader.port + 2)); + } + } + + @Test + public void testTso() throws Exception { + try (PDClient client = session.getPDClient()) { + TiTimestamp ts = client.getTimestamp(defaultBackOff()); + // Test pdServer is set to generate physical == logical + 1 + assertEquals(ts.getPhysical(), ts.getLogical() + 1); + } + } + + @Test + public void testGetRegionByKey() throws Exception { + byte[] startKey = new byte[] {1, 0, 2, 4}; + byte[] endKey = new byte[] {1, 0, 2, 5}; + int confVer = 1026; + int ver = 1027; + leader.addGetRegionListener( + request -> + GrpcUtils.makeGetRegionResponse( + leader.getClusterId(), + GrpcUtils.makeRegion( + 1, + ByteString.copyFrom(startKey), + ByteString.copyFrom(endKey), + GrpcUtils.makeRegionEpoch(confVer, ver), + GrpcUtils.makePeer(1, 10), + GrpcUtils.makePeer(2, 20)))); + try (PDClient client = session.getPDClient()) { + Pair rl = + client.getRegionByKey(defaultBackOff(), ByteString.EMPTY); + Metapb.Region r = rl.first; + Metapb.Peer l = rl.second; + assertEquals(r.getStartKey(), ByteString.copyFrom(startKey)); + assertEquals(r.getEndKey(), ByteString.copyFrom(endKey)); + assertEquals(r.getRegionEpoch().getConfVer(), confVer); + assertEquals(r.getRegionEpoch().getVersion(), ver); + assertEquals(1, l.getId()); + assertEquals(10, l.getStoreId()); + } + } + + @Test + public void testGetRegionById() throws Exception { + byte[] startKey = new byte[] {1, 0, 2, 4}; + byte[] endKey = new byte[] {1, 0, 2, 5}; + int confVer = 1026; + int ver = 1027; + + leader.addGetRegionByIDListener( + request -> + GrpcUtils.makeGetRegionResponse( + leader.getClusterId(), + GrpcUtils.makeRegion( + 1, + ByteString.copyFrom(startKey), + ByteString.copyFrom(endKey), + GrpcUtils.makeRegionEpoch(confVer, ver), + GrpcUtils.makePeer(1, 10), + GrpcUtils.makePeer(2, 20)))); + try (PDClient client = session.getPDClient()) { + Pair rl = client.getRegionByID(defaultBackOff(), 0); + Metapb.Region r = rl.first; + Metapb.Peer l = rl.second; + assertEquals(ByteString.copyFrom(startKey), r.getStartKey()); + assertEquals(ByteString.copyFrom(endKey), r.getEndKey()); + assertEquals(confVer, r.getRegionEpoch().getConfVer()); + assertEquals(ver, r.getRegionEpoch().getVersion()); + assertEquals(1, l.getId()); + assertEquals(10, l.getStoreId()); + } + } + + @Test + public void testGetStore() throws Exception { + long storeId = 1; + String testAddress = "testAddress"; + leader.addGetStoreListener( + request -> + GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), + GrpcUtils.makeStore( + storeId, + testAddress, + Metapb.StoreState.Up, + GrpcUtils.makeStoreLabel("k1", "v1"), + GrpcUtils.makeStoreLabel("k2", "v2")))); + try (PDClient client = session.getPDClient()) { + Store r = client.getStore(defaultBackOff(), storeId); + assertEquals(storeId, r.getId()); + assertEquals(testAddress, r.getAddress()); + assertEquals(Metapb.StoreState.Up, r.getState()); + assertEquals("k1", r.getLabels(0).getKey()); + assertEquals("k2", r.getLabels(1).getKey()); + assertEquals("v1", r.getLabels(0).getValue()); + assertEquals("v2", r.getLabels(1).getValue()); + + leader.addGetStoreListener( + request -> + GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), + GrpcUtils.makeStore(storeId, testAddress, Metapb.StoreState.Tombstone))); + assertEquals(StoreState.Tombstone, client.getStore(defaultBackOff(), storeId).getState()); + } + } + + private BackOffer defaultBackOff() { + return ConcreteBackOffer.newCustomBackOff(1000); + } + + @Test + public void testRetryPolicy() throws Exception { + long storeId = 1024; + ExecutorService service = Executors.newCachedThreadPool(); + AtomicInteger i = new AtomicInteger(); + leader.addGetStoreListener( + request -> { + if (i.getAndIncrement() < 2) { + return null; + } else { + return GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), GrpcUtils.makeStore(storeId, "", Metapb.StoreState.Up)); + } + }); + try (PDClient client = session.getPDClient()) { + Callable storeCallable = + () -> client.getStore(ConcreteBackOffer.newCustomBackOff(5000), 0); + Future storeFuture = service.submit(storeCallable); + try { + Store r = storeFuture.get(50, TimeUnit.SECONDS); + assertEquals(r.getId(), storeId); + } catch (TimeoutException e) { + fail(); + } + + // Should fail + AtomicInteger j = new AtomicInteger(); + leader.addGetStoreListener( + request -> { + if (j.getAndIncrement() < 6) { + return null; + } else { + return GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), GrpcUtils.makeStore(storeId, "", Metapb.StoreState.Up)); + } + }); + + try { + client.getStore(defaultBackOff(), 0); + } catch (GrpcException e) { + assertTrue(true); + return; + } + fail(); + } + } +} diff --git a/src/test/java/org/tikv/common/PDMockServerTest.java b/src/test/java/org/tikv/common/PDMockServerTest.java index 6469c38731f..67bb61a2cea 100644 --- a/src/test/java/org/tikv/common/PDMockServerTest.java +++ b/src/test/java/org/tikv/common/PDMockServerTest.java @@ -30,6 +30,7 @@ public void setUp() throws IOException { setUp(LOCAL_ADDR); } +<<<<<<< HEAD void setUp(String addr) throws IOException { pdServer = new PDMockServer(); pdServer.start(CLUSTER_ID); @@ -40,6 +41,35 @@ void setUp(String addr) throws IOException { GrpcUtils.makeMember(2, "http://" + addr + ":" + (pdServer.port + 1)), GrpcUtils.makeMember(3, "http://" + addr + ":" + (pdServer.port + 2)))); TiConfiguration conf = TiConfiguration.createDefault(addr + ":" + pdServer.port); +======= + void setup(String addr) throws IOException { + int basePort; + try (ServerSocket s = new ServerSocket(0)) { + basePort = s.getLocalPort(); + } + + for (int i = 0; i < 3; i++) { + PDMockServer server = new PDMockServer(); + server.start(CLUSTER_ID, basePort + i); + server.addGetMembersListener( + (request) -> + GrpcUtils.makeGetMembersResponse( + server.getClusterId(), + GrpcUtils.makeMember(1, "http://" + addr + ":" + basePort), + GrpcUtils.makeMember(2, "http://" + addr + ":" + (basePort + 1)), + GrpcUtils.makeMember(3, "http://" + addr + ":" + (basePort + 2)))); + pdServers.add(server); + if (i == 0) { + leader = server; + } + } + + TiConfiguration conf = TiConfiguration.createDefault(addr + ":" + leader.port); + conf.setKvMode("RAW"); + conf.setWarmUpEnable(false); + conf.setTimeout(2000); + conf.setEnableGrpcForward(true); +>>>>>>> e89ca5f37... [close #550] rawkv: fix seek leader/proxy store early abort (#551) session = TiSession.create(conf); } diff --git a/src/test/java/org/tikv/common/SeekLeaderStoreTest.java b/src/test/java/org/tikv/common/SeekLeaderStoreTest.java new file mode 100644 index 00000000000..201f10986f5 --- /dev/null +++ b/src/test/java/org/tikv/common/SeekLeaderStoreTest.java @@ -0,0 +1,78 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common; + +import com.google.protobuf.ByteString; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Assert; +import org.junit.Test; +import org.tikv.common.KVMockServer.State; +import org.tikv.kvproto.Metapb; +import org.tikv.kvproto.Metapb.StoreState; +import org.tikv.kvproto.Pdpb; +import org.tikv.raw.RawKVClient; + +public class SeekLeaderStoreTest extends MockThreeStoresTest { + + private RawKVClient createClient() { + return session.createRawClient(); + } + + @Test + public void testSeekLeader() { + RawKVClient client = createClient(); + ByteString key = ByteString.copyFromUtf8("key"); + ByteString value = ByteString.copyFromUtf8("value"); + + put(key, value); + + Assert.assertEquals(value, client.get(key).get()); + servers.get(0).setState(State.Fail); + servers.get(1).setRegion(region.switchPeer(stores.get(1).getId())); + Assert.assertEquals(value, client.get(key).get()); + + remove(key, value); + } + + @Test + public void testSeekLeaderMeetInvalidStore() { + RawKVClient client = createClient(); + ByteString key = ByteString.copyFromUtf8("key"); + ByteString value = ByteString.copyFromUtf8("value"); + + put(key, value); + + servers.get(0).setState(State.Fail); + servers.get(2).setRegion(region.switchPeer(stores.get(2).getId())); + + AtomicInteger i = new AtomicInteger(0); + leader.addGetStoreListener( + request -> { + Metapb.Store.Builder storeBuilder = + Metapb.Store.newBuilder().mergeFrom(stores.get((int) request.getStoreId() - 1)); + if (request.getStoreId() == 0x2 && i.incrementAndGet() > 0) { + storeBuilder.setState(StoreState.Tombstone); + } + return Pdpb.GetStoreResponse.newBuilder().setStore(storeBuilder.build()).build(); + }); + + Assert.assertEquals(value, client.get(key).get()); + + remove(key, value); + } +} From a886865d353234858cae3ca17c5a9674b71aa721 Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Wed, 23 Mar 2022 14:54:54 +0800 Subject: [PATCH 2/4] mockpd: fix conflict of PDMockServerTest Signed-off-by: iosmanthus --- src/test/java/org/tikv/common/PDMockServerTest.java | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/src/test/java/org/tikv/common/PDMockServerTest.java b/src/test/java/org/tikv/common/PDMockServerTest.java index 67bb61a2cea..2a370fa982a 100644 --- a/src/test/java/org/tikv/common/PDMockServerTest.java +++ b/src/test/java/org/tikv/common/PDMockServerTest.java @@ -30,18 +30,6 @@ public void setUp() throws IOException { setUp(LOCAL_ADDR); } -<<<<<<< HEAD - void setUp(String addr) throws IOException { - pdServer = new PDMockServer(); - pdServer.start(CLUSTER_ID); - pdServer.addGetMemberResp( - GrpcUtils.makeGetMembersResponse( - pdServer.getClusterId(), - GrpcUtils.makeMember(1, "http://" + addr + ":" + pdServer.port), - GrpcUtils.makeMember(2, "http://" + addr + ":" + (pdServer.port + 1)), - GrpcUtils.makeMember(3, "http://" + addr + ":" + (pdServer.port + 2)))); - TiConfiguration conf = TiConfiguration.createDefault(addr + ":" + pdServer.port); -======= void setup(String addr) throws IOException { int basePort; try (ServerSocket s = new ServerSocket(0)) { @@ -69,7 +57,6 @@ void setup(String addr) throws IOException { conf.setWarmUpEnable(false); conf.setTimeout(2000); conf.setEnableGrpcForward(true); ->>>>>>> e89ca5f37... [close #550] rawkv: fix seek leader/proxy store early abort (#551) session = TiSession.create(conf); } From 49dfda762e675de65bca530ca4fb043bb3a82fdf Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Wed, 23 Mar 2022 16:52:10 +0800 Subject: [PATCH 3/4] pick mock server tests Signed-off-by: iosmanthus --- .../java/org/tikv/common/KVMockServer.java | 73 ++++++- .../java/org/tikv/common/MockServerTest.java | 28 ++- .../org/tikv/common/MockThreeStoresTest.java | 129 +++++++++++++ .../java/org/tikv/common/PDClientTest.java | 182 ++++++++++-------- .../java/org/tikv/common/PDMockServer.java | 66 ++++--- .../org/tikv/common/PDMockServerTest.java | 23 ++- .../org/tikv/common/RegionManagerTest.java | 160 +++++++-------- .../org/tikv/common/SeekLeaderStoreTest.java | 6 +- 8 files changed, 459 insertions(+), 208 deletions(-) create mode 100644 src/test/java/org/tikv/common/MockThreeStoresTest.java diff --git a/src/test/java/org/tikv/common/KVMockServer.java b/src/test/java/org/tikv/common/KVMockServer.java index 370802ad2a8..412d9dcfb43 100644 --- a/src/test/java/org/tikv/common/KVMockServer.java +++ b/src/test/java/org/tikv/common/KVMockServer.java @@ -1,16 +1,18 @@ /* - * Copyright 2017 PingCAP, Inc. + * Copyright 2017 TiKV Project Authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * */ package org.tikv.common; @@ -25,10 +27,22 @@ import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.Status; +import io.grpc.health.v1.HealthCheckRequest; +import io.grpc.health.v1.HealthCheckResponse; +import io.grpc.health.v1.HealthCheckResponse.ServingStatus; +import io.grpc.health.v1.HealthGrpc.HealthImplBase; +import io.grpc.stub.StreamObserver; import java.io.IOException; import java.net.ServerSocket; -import java.util.*; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.tikv.common.key.Key; import org.tikv.common.region.TiRegion; import org.tikv.kvproto.Coprocessor; @@ -43,9 +57,11 @@ public class KVMockServer extends TikvGrpc.TikvImplBase { + private static final Logger logger = LoggerFactory.getLogger(KVMockServer.class); private int port; private Server server; private TiRegion region; + private State state = State.Normal; private final TreeMap dataMap = new TreeMap<>(); private final Map errorMap = new HashMap<>(); @@ -62,10 +78,23 @@ public class KVMockServer extends TikvGrpc.TikvImplBase { public static final int STORE_NOT_MATCH = 9; public static final int RAFT_ENTRY_TOO_LARGE = 10; + public enum State { + Normal, + Fail + } + + public void setState(State state) { + this.state = state; + } + public int getPort() { return port; } + public void setRegion(TiRegion region) { + this.region = region; + } + public void put(ByteString key, ByteString value) { dataMap.put(toRawKey(key), value); } @@ -95,7 +124,7 @@ private void verifyContext(Context context) throws Exception { if (context.getRegionId() != region.getId() || !context.getRegionEpoch().equals(region.getRegionEpoch()) || !context.getPeer().equals(region.getLeader())) { - throw new Exception(); + throw new Exception("context doesn't match"); } } @@ -104,6 +133,11 @@ public void rawGet( org.tikv.kvproto.Kvrpcpb.RawGetRequest request, io.grpc.stub.StreamObserver responseObserver) { try { + switch (state) { + case Fail: + throw new Exception(State.Fail.toString()); + default: + } verifyContext(request.getContext()); ByteString key = request.getKey(); @@ -114,7 +148,12 @@ public void rawGet( setErrorInfo(errorCode, errBuilder); builder.setRegionError(errBuilder.build()); } else { - builder.setValue(dataMap.get(toRawKey(key))); + Key rawKey = toRawKey(key); + ByteString value = dataMap.get(rawKey); + if (value == null) { + value = ByteString.EMPTY; + } + builder.setValue(value); } responseObserver.onNext(builder.build()); responseObserver.onCompleted(); @@ -137,7 +176,6 @@ public void rawPut( if (errorCode != null) { setErrorInfo(errorCode, errBuilder); builder.setRegionError(errBuilder.build()); - // builder.setError(""); } responseObserver.onNext(builder.build()); responseObserver.onCompleted(); @@ -347,14 +385,33 @@ public void coprocessor( } public int start(TiRegion region) throws IOException { + int port; try (ServerSocket s = new ServerSocket(0)) { port = s.getLocalPort(); } - server = ServerBuilder.forPort(port).addService(this).build().start(); + start(region, port); + return port; + } + + private static class HealCheck extends HealthImplBase { + + @Override + public void check( + HealthCheckRequest request, StreamObserver responseObserver) { + responseObserver.onNext( + HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVING).build()); + responseObserver.onCompleted(); + } + } + public void start(TiRegion region, int port) throws IOException { + this.port = port; this.region = region; + + logger.info("start mock server on port: " + port); + server = + ServerBuilder.forPort(port).addService(new HealCheck()).addService(this).build().start(); Runtime.getRuntime().addShutdownHook(new Thread(KVMockServer.this::stop)); - return port; } public void stop() { diff --git a/src/test/java/org/tikv/common/MockServerTest.java b/src/test/java/org/tikv/common/MockServerTest.java index c99688729d2..02cab4c46f4 100644 --- a/src/test/java/org/tikv/common/MockServerTest.java +++ b/src/test/java/org/tikv/common/MockServerTest.java @@ -1,3 +1,20 @@ +/* + * Copyright 2021 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + package org.tikv.common; import com.google.common.collect.ImmutableList; @@ -12,14 +29,15 @@ import org.tikv.kvproto.Pdpb; public class MockServerTest extends PDMockServerTest { + public KVMockServer server; public int port; public TiRegion region; @Before @Override - public void setUp() throws IOException { - super.setUp(); + public void setup() throws IOException { + super.setup(); Metapb.Region r = Metapb.Region.newBuilder() @@ -45,9 +63,11 @@ public void setUp() throws IOException { r.getPeers(0), r.getPeersList(), s.stream().map(TiStore::new).collect(Collectors.toList())); - pdServer.addGetRegionResp(Pdpb.GetRegionResponse.newBuilder().setRegion(r).build()); + leader.addGetRegionListener( + request -> Pdpb.GetRegionResponse.newBuilder().setRegion(r).build()); for (Metapb.Store store : s) { - pdServer.addGetStoreResp(Pdpb.GetStoreResponse.newBuilder().setStore(store).build()); + leader.addGetStoreListener( + (request) -> Pdpb.GetStoreResponse.newBuilder().setStore(store).build()); } server = new KVMockServer(); port = server.start(region); diff --git a/src/test/java/org/tikv/common/MockThreeStoresTest.java b/src/test/java/org/tikv/common/MockThreeStoresTest.java new file mode 100644 index 00000000000..fa4dbe41e35 --- /dev/null +++ b/src/test/java/org/tikv/common/MockThreeStoresTest.java @@ -0,0 +1,129 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common; + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; +import java.io.IOException; +import java.net.ServerSocket; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import org.junit.After; +import org.junit.Before; +import org.tikv.common.region.TiRegion; +import org.tikv.common.region.TiStore; +import org.tikv.kvproto.Metapb; +import org.tikv.kvproto.Pdpb; + +public class MockThreeStoresTest extends PDMockServerTest { + + protected TiRegion region; + protected List servers = new ArrayList<>(); + protected List stores; + + @Before + @Override + public void setup() throws IOException { + super.setup(); + + int basePort; + try (ServerSocket s = new ServerSocket(0)) { + basePort = s.getLocalPort(); + } + + ImmutableList peers = + ImmutableList.of( + Metapb.Peer.newBuilder().setId(0x1).setStoreId(0x1).build(), + Metapb.Peer.newBuilder().setId(0x2).setStoreId(0x2).build(), + Metapb.Peer.newBuilder().setId(0x3).setStoreId(0x3).build()); + + Metapb.Region region = + Metapb.Region.newBuilder() + .setRegionEpoch(Metapb.RegionEpoch.newBuilder().setConfVer(1).setVersion(2)) + .setId(0xff) + .setStartKey(ByteString.EMPTY) + .setEndKey(ByteString.EMPTY) + .addAllPeers(peers) + .build(); + + stores = + ImmutableList.of( + Metapb.Store.newBuilder() + .setAddress("127.0.0.1:" + basePort) + .setVersion("5.0.0") + .setId(0x1) + .build(), + Metapb.Store.newBuilder() + .setAddress("127.0.0.1:" + (basePort + 1)) + .setVersion("5.0.0") + .setId(0x2) + .build(), + Metapb.Store.newBuilder() + .setAddress("127.0.0.1:" + (basePort + 2)) + .setVersion("5.0.0") + .setId(0x3) + .build()); + + for (PDMockServer server : pdServers) { + server.addGetRegionListener( + request -> + Pdpb.GetRegionResponse.newBuilder() + .setLeader(peers.get(0)) + .setRegion(region) + .build()); + server.addGetStoreListener( + (request) -> { + int i = (int) request.getStoreId() - 1; + return Pdpb.GetStoreResponse.newBuilder().setStore(stores.get(i)).build(); + }); + } + + this.region = + new TiRegion( + session.getConf(), + region, + region.getPeers(0), + region.getPeersList(), + stores.stream().map(TiStore::new).collect(Collectors.toList())); + for (int i = 0; i < 3; i++) { + KVMockServer server = new KVMockServer(); + server.start(this.region, basePort + i); + servers.add(server); + } + } + + public void put(ByteString key, ByteString value) { + for (KVMockServer server : servers) { + server.put(key, value); + } + } + + public void remove(ByteString key, ByteString value) { + for (KVMockServer server : servers) { + server.remove(key); + } + } + + @After + public void tearDown() { + for (KVMockServer server : servers) { + server.stop(); + } + } +} diff --git a/src/test/java/org/tikv/common/PDClientTest.java b/src/test/java/org/tikv/common/PDClientTest.java index bd49dce39e1..75562308ccc 100644 --- a/src/test/java/org/tikv/common/PDClientTest.java +++ b/src/test/java/org/tikv/common/PDClientTest.java @@ -1,25 +1,34 @@ /* - * Copyright 2017 PingCAP, Inc. + * Copyright 2017 TiKV Project Authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * */ package org.tikv.common; -import static org.junit.Assert.*; -import static org.tikv.common.GrpcUtils.encodeKey; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import com.google.protobuf.ByteString; -import java.util.concurrent.*; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; import org.tikv.common.exception.GrpcException; import org.tikv.common.meta.TiTimestamp; @@ -31,31 +40,32 @@ import org.tikv.kvproto.Metapb.StoreState; public class PDClientTest extends PDMockServerTest { - private static final String LOCAL_ADDR_IPV6 = "[::]"; + + private static final String LOCAL_ADDR_IPV6 = "[::1]"; + public static final String HTTP = "http://"; @Test public void testCreate() throws Exception { try (PDClient client = session.getPDClient()) { - assertEquals(client.getPdClientWrapper().getLeaderInfo(), LOCAL_ADDR + ":" + pdServer.port); - assertEquals(client.getHeader().getClusterId(), CLUSTER_ID); + assertEquals(LOCAL_ADDR + ":" + leader.port, client.getPdClientWrapper().getLeaderInfo()); + assertEquals(CLUSTER_ID, client.getHeader().getClusterId()); } } @Test public void testSwitchLeader() throws Exception { try (PDClient client = session.getPDClient()) { - client.trySwitchLeader("http://" + LOCAL_ADDR + ":" + (pdServer.port + 1)); + client.trySwitchLeader(HTTP + LOCAL_ADDR + ":" + (leader.port + 1)); assertEquals( - "http://" + LOCAL_ADDR + ":" + (pdServer.port + 1), - client.getPdClientWrapper().getLeaderInfo()); + client.getPdClientWrapper().getLeaderInfo(), HTTP + LOCAL_ADDR + ":" + (leader.port + 1)); } tearDown(); - setUp(LOCAL_ADDR_IPV6); + setup(LOCAL_ADDR_IPV6); try (PDClient client = session.getPDClient()) { - client.trySwitchLeader("http://" + LOCAL_ADDR_IPV6 + ":" + (pdServer.port + 2)); + client.trySwitchLeader(HTTP + LOCAL_ADDR_IPV6 + ":" + (leader.port + 2)); assertEquals( - "http://" + LOCAL_ADDR_IPV6 + ":" + (pdServer.port + 2), - client.getPdClientWrapper().getLeaderInfo()); + client.getPdClientWrapper().getLeaderInfo(), + HTTP + LOCAL_ADDR_IPV6 + ":" + (leader.port + 2)); } } @@ -74,16 +84,17 @@ public void testGetRegionByKey() throws Exception { byte[] endKey = new byte[] {1, 0, 2, 5}; int confVer = 1026; int ver = 1027; - pdServer.addGetRegionResp( - GrpcUtils.makeGetRegionResponse( - pdServer.getClusterId(), - GrpcUtils.makeRegion( - 1, - encodeKey(startKey), - encodeKey(endKey), - GrpcUtils.makeRegionEpoch(confVer, ver), - GrpcUtils.makePeer(1, 10), - GrpcUtils.makePeer(2, 20)))); + leader.addGetRegionListener( + request -> + GrpcUtils.makeGetRegionResponse( + leader.getClusterId(), + GrpcUtils.makeRegion( + 1, + ByteString.copyFrom(startKey), + ByteString.copyFrom(endKey), + GrpcUtils.makeRegionEpoch(confVer, ver), + GrpcUtils.makePeer(1, 10), + GrpcUtils.makePeer(2, 20)))); try (PDClient client = session.getPDClient()) { Pair rl = client.getRegionByKey(defaultBackOff(), ByteString.EMPTY); @@ -93,8 +104,8 @@ public void testGetRegionByKey() throws Exception { assertEquals(r.getEndKey(), ByteString.copyFrom(endKey)); assertEquals(r.getRegionEpoch().getConfVer(), confVer); assertEquals(r.getRegionEpoch().getVersion(), ver); - assertEquals(l.getId(), 1); - assertEquals(l.getStoreId(), 10); + assertEquals(1, l.getId()); + assertEquals(10, l.getStoreId()); } } @@ -105,26 +116,27 @@ public void testGetRegionById() throws Exception { int confVer = 1026; int ver = 1027; - pdServer.addGetRegionByIDResp( - GrpcUtils.makeGetRegionResponse( - pdServer.getClusterId(), - GrpcUtils.makeRegion( - 1, - encodeKey(startKey), - encodeKey(endKey), - GrpcUtils.makeRegionEpoch(confVer, ver), - GrpcUtils.makePeer(1, 10), - GrpcUtils.makePeer(2, 20)))); + leader.addGetRegionByIDListener( + request -> + GrpcUtils.makeGetRegionResponse( + leader.getClusterId(), + GrpcUtils.makeRegion( + 1, + ByteString.copyFrom(startKey), + ByteString.copyFrom(endKey), + GrpcUtils.makeRegionEpoch(confVer, ver), + GrpcUtils.makePeer(1, 10), + GrpcUtils.makePeer(2, 20)))); try (PDClient client = session.getPDClient()) { Pair rl = client.getRegionByID(defaultBackOff(), 0); Metapb.Region r = rl.first; Metapb.Peer l = rl.second; - assertEquals(r.getStartKey(), ByteString.copyFrom(startKey)); - assertEquals(r.getEndKey(), ByteString.copyFrom(endKey)); - assertEquals(r.getRegionEpoch().getConfVer(), confVer); - assertEquals(r.getRegionEpoch().getVersion(), ver); - assertEquals(l.getId(), 1); - assertEquals(l.getStoreId(), 10); + assertEquals(ByteString.copyFrom(startKey), r.getStartKey()); + assertEquals(ByteString.copyFrom(endKey), r.getEndKey()); + assertEquals(confVer, r.getRegionEpoch().getConfVer()); + assertEquals(ver, r.getRegionEpoch().getVersion()); + assertEquals(1, l.getId()); + assertEquals(10, l.getStoreId()); } } @@ -132,30 +144,32 @@ public void testGetRegionById() throws Exception { public void testGetStore() throws Exception { long storeId = 1; String testAddress = "testAddress"; - pdServer.addGetStoreResp( - GrpcUtils.makeGetStoreResponse( - pdServer.getClusterId(), - GrpcUtils.makeStore( - storeId, - testAddress, - Metapb.StoreState.Up, - GrpcUtils.makeStoreLabel("k1", "v1"), - GrpcUtils.makeStoreLabel("k2", "v2")))); + leader.addGetStoreListener( + request -> + GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), + GrpcUtils.makeStore( + storeId, + testAddress, + Metapb.StoreState.Up, + GrpcUtils.makeStoreLabel("k1", "v1"), + GrpcUtils.makeStoreLabel("k2", "v2")))); try (PDClient client = session.getPDClient()) { - Store r = client.getStore(defaultBackOff(), 0); - assertEquals(r.getId(), storeId); - assertEquals(r.getAddress(), testAddress); - assertEquals(r.getState(), Metapb.StoreState.Up); - assertEquals(r.getLabels(0).getKey(), "k1"); - assertEquals(r.getLabels(1).getKey(), "k2"); - assertEquals(r.getLabels(0).getValue(), "v1"); - assertEquals(r.getLabels(1).getValue(), "v2"); - - pdServer.addGetStoreResp( - GrpcUtils.makeGetStoreResponse( - pdServer.getClusterId(), - GrpcUtils.makeStore(storeId, testAddress, Metapb.StoreState.Tombstone))); - assertEquals(StoreState.Tombstone, client.getStore(defaultBackOff(), 0).getState()); + Store r = client.getStore(defaultBackOff(), storeId); + assertEquals(storeId, r.getId()); + assertEquals(testAddress, r.getAddress()); + assertEquals(Metapb.StoreState.Up, r.getState()); + assertEquals("k1", r.getLabels(0).getKey()); + assertEquals("k2", r.getLabels(1).getKey()); + assertEquals("v1", r.getLabels(0).getValue()); + assertEquals("v2", r.getLabels(1).getValue()); + + leader.addGetStoreListener( + request -> + GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), + GrpcUtils.makeStore(storeId, testAddress, Metapb.StoreState.Tombstone))); + assertEquals(StoreState.Tombstone, client.getStore(defaultBackOff(), storeId).getState()); } } @@ -167,11 +181,16 @@ private BackOffer defaultBackOff() { public void testRetryPolicy() throws Exception { long storeId = 1024; ExecutorService service = Executors.newCachedThreadPool(); - pdServer.addGetStoreResp(null); - pdServer.addGetStoreResp(null); - pdServer.addGetStoreResp( - GrpcUtils.makeGetStoreResponse( - pdServer.getClusterId(), GrpcUtils.makeStore(storeId, "", Metapb.StoreState.Up))); + AtomicInteger i = new AtomicInteger(); + leader.addGetStoreListener( + request -> { + if (i.getAndIncrement() < 2) { + return null; + } else { + return GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), GrpcUtils.makeStore(storeId, "", Metapb.StoreState.Up)); + } + }); try (PDClient client = session.getPDClient()) { Callable storeCallable = () -> client.getStore(ConcreteBackOffer.newCustomBackOff(5000), 0); @@ -184,16 +203,17 @@ public void testRetryPolicy() throws Exception { } // Should fail - pdServer.addGetStoreResp(null); - pdServer.addGetStoreResp(null); - pdServer.addGetStoreResp(null); - pdServer.addGetStoreResp(null); - pdServer.addGetStoreResp(null); - pdServer.addGetStoreResp(null); - - pdServer.addGetStoreResp( - GrpcUtils.makeGetStoreResponse( - pdServer.getClusterId(), GrpcUtils.makeStore(storeId, "", Metapb.StoreState.Up))); + AtomicInteger j = new AtomicInteger(); + leader.addGetStoreListener( + request -> { + if (j.getAndIncrement() < 6) { + return null; + } else { + return GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), GrpcUtils.makeStore(storeId, "", Metapb.StoreState.Up)); + } + }); + try { client.getStore(defaultBackOff(), 0); } catch (GrpcException e) { diff --git a/src/test/java/org/tikv/common/PDMockServer.java b/src/test/java/org/tikv/common/PDMockServer.java index 521a286741f..3d64624f448 100644 --- a/src/test/java/org/tikv/common/PDMockServer.java +++ b/src/test/java/org/tikv/common/PDMockServer.java @@ -1,16 +1,18 @@ /* - * Copyright 2017 PingCAP, Inc. + * Copyright 2017 TiKV Project Authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * */ package org.tikv.common; @@ -21,29 +23,38 @@ import io.grpc.stub.StreamObserver; import java.io.IOException; import java.net.ServerSocket; -import java.util.Deque; import java.util.Optional; -import java.util.concurrent.LinkedBlockingDeque; +import java.util.function.Function; import org.tikv.kvproto.PDGrpc; -import org.tikv.kvproto.Pdpb.*; +import org.tikv.kvproto.Pdpb.GetMembersRequest; +import org.tikv.kvproto.Pdpb.GetMembersResponse; +import org.tikv.kvproto.Pdpb.GetRegionByIDRequest; +import org.tikv.kvproto.Pdpb.GetRegionRequest; +import org.tikv.kvproto.Pdpb.GetRegionResponse; +import org.tikv.kvproto.Pdpb.GetStoreRequest; +import org.tikv.kvproto.Pdpb.GetStoreResponse; +import org.tikv.kvproto.Pdpb.TsoRequest; +import org.tikv.kvproto.Pdpb.TsoResponse; public class PDMockServer extends PDGrpc.PDImplBase { + public int port; private long clusterId; - private Server server; - public void addGetMemberResp(GetMembersResponse r) { - getMembersResp.addLast(Optional.ofNullable(r)); - } + private Function getMembersListener; + private Function getStoreListener; + private Function getRegionListener; + private Function getRegionByIDListener; - private final Deque> getMembersResp = - new LinkedBlockingDeque>(); + public void addGetMembersListener(Function func) { + getMembersListener = func; + } @Override public void getMembers(GetMembersRequest request, StreamObserver resp) { try { - resp.onNext(getMembersResp.getFirst().get()); + resp.onNext(Optional.ofNullable(getMembersListener.apply(request)).get()); resp.onCompleted(); } catch (Exception e) { resp.onError(Status.INTERNAL.asRuntimeException()); @@ -70,47 +81,42 @@ public void onCompleted() { }; } - public void addGetRegionResp(GetRegionResponse r) { - getRegionResp.addLast(r); + public void addGetRegionListener(Function func) { + getRegionListener = func; } - private final Deque getRegionResp = new LinkedBlockingDeque<>(); - @Override public void getRegion(GetRegionRequest request, StreamObserver resp) { try { - resp.onNext(getRegionResp.removeFirst()); + resp.onNext(getRegionListener.apply(request)); resp.onCompleted(); } catch (Exception e) { resp.onError(Status.INTERNAL.asRuntimeException()); } } - public void addGetRegionByIDResp(GetRegionResponse r) { - getRegionByIDResp.addLast(r); + public void addGetRegionByIDListener(Function func) { + getRegionByIDListener = func; } - private final Deque getRegionByIDResp = new LinkedBlockingDeque<>(); - @Override public void getRegionByID(GetRegionByIDRequest request, StreamObserver resp) { try { - resp.onNext(getRegionByIDResp.removeFirst()); + resp.onNext(getRegionByIDListener.apply(request)); resp.onCompleted(); } catch (Exception e) { resp.onError(Status.INTERNAL.asRuntimeException()); } } - public void addGetStoreResp(GetStoreResponse r) { - getStoreResp.addLast(Optional.ofNullable(r)); + public void addGetStoreListener(Function func) { + getStoreListener = func; } - private final Deque> getStoreResp = new LinkedBlockingDeque<>(); - + @Override public void getStore(GetStoreRequest request, StreamObserver resp) { try { - resp.onNext(getStoreResp.removeFirst().get()); + resp.onNext(Optional.ofNullable(getStoreListener.apply(request)).get()); resp.onCompleted(); } catch (Exception e) { resp.onError(Status.INTERNAL.asRuntimeException()); @@ -118,10 +124,16 @@ public void getStore(GetStoreRequest request, StreamObserver r } public void start(long clusterId) throws IOException { + int port; try (ServerSocket s = new ServerSocket(0)) { port = s.getLocalPort(); } + start(clusterId, port); + } + + public void start(long clusterId, int port) throws IOException { this.clusterId = clusterId; + this.port = port; server = ServerBuilder.forPort(port).addService(this).build().start(); Runtime.getRuntime().addShutdownHook(new Thread(PDMockServer.this::stop)); diff --git a/src/test/java/org/tikv/common/PDMockServerTest.java b/src/test/java/org/tikv/common/PDMockServerTest.java index 2a370fa982a..1fd84f5ddbe 100644 --- a/src/test/java/org/tikv/common/PDMockServerTest.java +++ b/src/test/java/org/tikv/common/PDMockServerTest.java @@ -1,33 +1,40 @@ /* - * Copyright 2020 PingCAP, Inc. + * Copyright 2020 TiKV Project Authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * */ package org.tikv.common; import java.io.IOException; +import java.net.ServerSocket; +import java.util.ArrayList; +import java.util.List; import org.junit.After; import org.junit.Before; public abstract class PDMockServerTest { + protected static final String LOCAL_ADDR = "127.0.0.1"; static final long CLUSTER_ID = 1024; - protected static TiSession session; - protected PDMockServer pdServer; + protected TiSession session; + protected PDMockServer leader; + protected List pdServers = new ArrayList<>(); @Before - public void setUp() throws IOException { - setUp(LOCAL_ADDR); + public void setup() throws IOException { + setup(LOCAL_ADDR); } void setup(String addr) throws IOException { @@ -63,6 +70,8 @@ void setup(String addr) throws IOException { @After public void tearDown() throws Exception { session.close(); - pdServer.stop(); + for (PDMockServer server : pdServers) { + server.stop(); + } } } diff --git a/src/test/java/org/tikv/common/RegionManagerTest.java b/src/test/java/org/tikv/common/RegionManagerTest.java index 483f8a9e6fb..6052640f9bc 100644 --- a/src/test/java/org/tikv/common/RegionManagerTest.java +++ b/src/test/java/org/tikv/common/RegionManagerTest.java @@ -1,26 +1,30 @@ /* - * Copyright 2017 PingCAP, Inc. + * Copyright 2017 TiKV Project Authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * */ package org.tikv.common; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; import com.google.common.collect.RangeMap; import com.google.common.collect.TreeRangeMap; import com.google.protobuf.ByteString; import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Before; import org.junit.Test; import org.tikv.common.key.Key; @@ -33,12 +37,13 @@ import org.tikv.kvproto.Metapb.StoreState; public class RegionManagerTest extends PDMockServerTest { + private RegionManager mgr; @Before @Override - public void setUp() throws IOException { - super.setUp(); + public void setup() throws IOException { + super.setup(); mgr = session.getRegionManager(); } @@ -53,54 +58,48 @@ public void testRegionBorder() { } @Test - public void getRegionByKey() throws Exception { + public void getRegionByKey() { ByteString startKey = ByteString.copyFrom(new byte[] {1}); ByteString endKey = ByteString.copyFrom(new byte[] {10}); ByteString searchKey = ByteString.copyFrom(new byte[] {5}); - ByteString searchKeyNotExists = ByteString.copyFrom(new byte[] {11}); int confVer = 1026; int ver = 1027; long regionId = 233; - String testAddress = "testAddress"; - pdServer.addGetRegionResp( - GrpcUtils.makeGetRegionResponse( - pdServer.getClusterId(), - GrpcUtils.makeRegion( - regionId, - GrpcUtils.encodeKey(startKey.toByteArray()), - GrpcUtils.encodeKey(endKey.toByteArray()), - GrpcUtils.makeRegionEpoch(confVer, ver), - GrpcUtils.makePeer(1, 10), - GrpcUtils.makePeer(2, 20)))); - for (long id : new long[] {10, 20}) { - pdServer.addGetStoreResp( - GrpcUtils.makeGetStoreResponse( - pdServer.getClusterId(), - GrpcUtils.makeStore( - id, - testAddress, - Metapb.StoreState.Up, - GrpcUtils.makeStoreLabel("k1", "v1"), - GrpcUtils.makeStoreLabel("k2", "v2")))); - } + String testAddress = "127.0.0.1"; + leader.addGetRegionListener( + request -> + GrpcUtils.makeGetRegionResponse( + leader.getClusterId(), + GrpcUtils.makeRegion( + regionId, + GrpcUtils.encodeKey(startKey.toByteArray()), + GrpcUtils.encodeKey(endKey.toByteArray()), + GrpcUtils.makeRegionEpoch(confVer, ver), + GrpcUtils.makePeer(1, 10), + GrpcUtils.makePeer(2, 20)))); + + AtomicInteger i = new AtomicInteger(0); + long[] ids = new long[] {10, 20}; + leader.addGetStoreListener( + (request -> + GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), + GrpcUtils.makeStore( + ids[i.getAndIncrement()], + testAddress, + StoreState.Up, + GrpcUtils.makeStoreLabel("k1", "v1"), + GrpcUtils.makeStoreLabel("k2", "v2"))))); TiRegion region = mgr.getRegionByKey(startKey); assertEquals(region.getId(), regionId); TiRegion regionToSearch = mgr.getRegionByKey(searchKey); assertEquals(region, regionToSearch); - - // This will in turn invoke rpc and results in an error - // since we set just one rpc response - try { - mgr.getRegionByKey(searchKeyNotExists); - fail(); - } catch (Exception ignored) { - } } @Test - public void getStoreByKey() throws Exception { + public void getStoreByKey() { ByteString startKey = ByteString.copyFrom(new byte[] {1}); ByteString endKey = ByteString.copyFrom(new byte[] {10}); ByteString searchKey = ByteString.copyFrom(new byte[] {5}); @@ -109,27 +108,30 @@ public void getStoreByKey() throws Exception { int confVer = 1026; int ver = 1027; long regionId = 233; - pdServer.addGetRegionResp( - GrpcUtils.makeGetRegionResponse( - pdServer.getClusterId(), - GrpcUtils.makeRegion( - regionId, - GrpcUtils.encodeKey(startKey.toByteArray()), - GrpcUtils.encodeKey(endKey.toByteArray()), - GrpcUtils.makeRegionEpoch(confVer, ver), - GrpcUtils.makePeer(storeId, 10), - GrpcUtils.makePeer(storeId + 1, 20)))); - for (long id : new long[] {10, 20}) { - pdServer.addGetStoreResp( - GrpcUtils.makeGetStoreResponse( - pdServer.getClusterId(), - GrpcUtils.makeStore( - id, - testAddress, - Metapb.StoreState.Up, - GrpcUtils.makeStoreLabel("k1", "v1"), - GrpcUtils.makeStoreLabel("k2", "v2")))); - } + leader.addGetRegionListener( + request -> + GrpcUtils.makeGetRegionResponse( + leader.getClusterId(), + GrpcUtils.makeRegion( + regionId, + GrpcUtils.encodeKey(startKey.toByteArray()), + GrpcUtils.encodeKey(endKey.toByteArray()), + GrpcUtils.makeRegionEpoch(confVer, ver), + GrpcUtils.makePeer(storeId, 10), + GrpcUtils.makePeer(storeId + 1, 20)))); + + AtomicInteger i = new AtomicInteger(0); + long[] ids = new long[] {10, 20}; + leader.addGetStoreListener( + (request -> + GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), + GrpcUtils.makeStore( + ids[i.getAndIncrement()], + testAddress, + StoreState.Up, + GrpcUtils.makeStoreLabel("k1", "v1"), + GrpcUtils.makeStoreLabel("k2", "v2"))))); Pair pair = mgr.getRegionStorePairByKey(searchKey); assertEquals(pair.first.getId(), regionId); @@ -137,30 +139,32 @@ public void getStoreByKey() throws Exception { } @Test - public void getStoreById() throws Exception { + public void getStoreById() { long storeId = 234; String testAddress = "testAddress"; - pdServer.addGetStoreResp( - GrpcUtils.makeGetStoreResponse( - pdServer.getClusterId(), - GrpcUtils.makeStore( - storeId, - testAddress, - Metapb.StoreState.Up, - GrpcUtils.makeStoreLabel("k1", "v1"), - GrpcUtils.makeStoreLabel("k2", "v2")))); + leader.addGetStoreListener( + request -> + GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), + GrpcUtils.makeStore( + storeId, + testAddress, + Metapb.StoreState.Up, + GrpcUtils.makeStoreLabel("k1", "v1"), + GrpcUtils.makeStoreLabel("k2", "v2")))); TiStore store = mgr.getStoreById(storeId); assertEquals(store.getStore().getId(), storeId); - pdServer.addGetStoreResp( - GrpcUtils.makeGetStoreResponse( - pdServer.getClusterId(), - GrpcUtils.makeStore( - storeId + 1, - testAddress, - StoreState.Tombstone, - GrpcUtils.makeStoreLabel("k1", "v1"), - GrpcUtils.makeStoreLabel("k2", "v2")))); + leader.addGetStoreListener( + request -> + GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), + GrpcUtils.makeStore( + storeId + 1, + testAddress, + StoreState.Tombstone, + GrpcUtils.makeStoreLabel("k1", "v1"), + GrpcUtils.makeStoreLabel("k2", "v2")))); try { mgr.getStoreById(storeId + 1); diff --git a/src/test/java/org/tikv/common/SeekLeaderStoreTest.java b/src/test/java/org/tikv/common/SeekLeaderStoreTest.java index 201f10986f5..64836c8f027 100644 --- a/src/test/java/org/tikv/common/SeekLeaderStoreTest.java +++ b/src/test/java/org/tikv/common/SeekLeaderStoreTest.java @@ -41,10 +41,10 @@ public void testSeekLeader() { put(key, value); - Assert.assertEquals(value, client.get(key).get()); + Assert.assertEquals(value, client.get(key)); servers.get(0).setState(State.Fail); servers.get(1).setRegion(region.switchPeer(stores.get(1).getId())); - Assert.assertEquals(value, client.get(key).get()); + Assert.assertEquals(value, client.get(key)); remove(key, value); } @@ -71,7 +71,7 @@ public void testSeekLeaderMeetInvalidStore() { return Pdpb.GetStoreResponse.newBuilder().setStore(storeBuilder.build()).build(); }); - Assert.assertEquals(value, client.get(key).get()); + Assert.assertEquals(value, client.get(key)); remove(key, value); } From b7aad99271c466a75137bb1284ca2fc734641eb8 Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Wed, 23 Mar 2022 17:38:47 +0800 Subject: [PATCH 4/4] fix seek leader store issue Signed-off-by: iosmanthus --- .../java/org/tikv/common/region/AbstractRegionStoreClient.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index 6c37d2651fc..ce13be471ad 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -210,6 +210,7 @@ private Boolean seekLeaderStore(BackOffer backOffer) { // switch to leader store store = currentLeaderStore; updateClientStub(); + return true; } return false; }