From c8d856df03f3a9110312bd997e57a1d16e9399e2 Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Fri, 25 Mar 2022 16:34:17 +0800 Subject: [PATCH] cherry pick #571 to release-3.1 Signed-off-by: ti-srebot --- .../org/tikv/common/MockThreeStoresTest.java | 131 ++++++++++ .../org/tikv/common/PDClientMockTest.java | 229 ++++++++++++++++++ .../java/org/tikv/common/PDMockServer.java | 8 + .../org/tikv/common/PDMockServerTest.java | 32 +++ .../org/tikv/common/SeekLeaderStoreTest.java | 77 ++++++ .../org/tikv/common/SeekProxyStoreTest.java | 47 ++++ 6 files changed, 524 insertions(+) create mode 100644 src/test/java/org/tikv/common/MockThreeStoresTest.java create mode 100644 src/test/java/org/tikv/common/PDClientMockTest.java create mode 100644 src/test/java/org/tikv/common/SeekLeaderStoreTest.java create mode 100644 src/test/java/org/tikv/common/SeekProxyStoreTest.java diff --git a/src/test/java/org/tikv/common/MockThreeStoresTest.java b/src/test/java/org/tikv/common/MockThreeStoresTest.java new file mode 100644 index 00000000000..43063be7d17 --- /dev/null +++ b/src/test/java/org/tikv/common/MockThreeStoresTest.java @@ -0,0 +1,131 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common; + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; +import java.io.IOException; +import java.net.ServerSocket; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import org.junit.After; +import org.junit.Before; +import org.tikv.common.region.TiRegion; +import org.tikv.common.region.TiStore; +import org.tikv.kvproto.Metapb; +import org.tikv.kvproto.Pdpb; + +public class MockThreeStoresTest extends PDMockServerTest { + + protected TiRegion region; + protected List servers = new ArrayList<>(); + protected List stores; + + @Before + @Override + public void setup() throws IOException { + super.setup(); + + int[] ports = new int[3]; + for (int i = 0; i < ports.length; i++) { + try (ServerSocket s = new ServerSocket(0)) { + ports[i] = s.getLocalPort(); + } + } + + ImmutableList peers = + ImmutableList.of( + Metapb.Peer.newBuilder().setId(0x1).setStoreId(0x1).build(), + Metapb.Peer.newBuilder().setId(0x2).setStoreId(0x2).build(), + Metapb.Peer.newBuilder().setId(0x3).setStoreId(0x3).build()); + + Metapb.Region region = + Metapb.Region.newBuilder() + .setRegionEpoch(Metapb.RegionEpoch.newBuilder().setConfVer(1).setVersion(2)) + .setId(0xff) + .setStartKey(ByteString.EMPTY) + .setEndKey(ByteString.EMPTY) + .addAllPeers(peers) + .build(); + + stores = + ImmutableList.of( + Metapb.Store.newBuilder() + .setAddress("127.0.0.1:" + ports[0]) + .setVersion("5.0.0") + .setId(0x1) + .build(), + Metapb.Store.newBuilder() + .setAddress("127.0.0.1:" + ports[1]) + .setVersion("5.0.0") + .setId(0x2) + .build(), + Metapb.Store.newBuilder() + .setAddress("127.0.0.1:" + ports[2]) + .setVersion("5.0.0") + .setId(0x3) + .build()); + + for (PDMockServer server : pdServers) { + server.addGetRegionListener( + request -> + Pdpb.GetRegionResponse.newBuilder() + .setLeader(peers.get(0)) + .setRegion(region) + .build()); + server.addGetStoreListener( + (request) -> { + int i = (int) request.getStoreId() - 1; + return Pdpb.GetStoreResponse.newBuilder().setStore(stores.get(i)).build(); + }); + } + + this.region = + new TiRegion( + session.getConf(), + region, + region.getPeers(0), + region.getPeersList(), + stores.stream().map(TiStore::new).collect(Collectors.toList())); + for (int port : ports) { + KVMockServer server = new KVMockServer(); + server.start(this.region, port); + servers.add(server); + } + } + + public void put(ByteString key, ByteString value) { + for (KVMockServer server : servers) { + server.put(key, value); + } + } + + public void remove(ByteString key, ByteString value) { + for (KVMockServer server : servers) { + server.remove(key); + } + } + + @After + public void tearDown() { + for (KVMockServer server : servers) { + server.stop(); + } + } +} diff --git a/src/test/java/org/tikv/common/PDClientMockTest.java b/src/test/java/org/tikv/common/PDClientMockTest.java new file mode 100644 index 00000000000..10e0901d342 --- /dev/null +++ b/src/test/java/org/tikv/common/PDClientMockTest.java @@ -0,0 +1,229 @@ +/* + * Copyright 2017 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import com.google.protobuf.ByteString; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Test; +import org.tikv.common.exception.GrpcException; +import org.tikv.common.meta.TiTimestamp; +import org.tikv.common.util.BackOffer; +import org.tikv.common.util.ConcreteBackOffer; +import org.tikv.common.util.Pair; +import org.tikv.kvproto.Metapb; +import org.tikv.kvproto.Metapb.Store; +import org.tikv.kvproto.Metapb.StoreState; + +public class PDClientMockTest extends PDMockServerTest { + + private static final String LOCAL_ADDR_IPV6 = "[::1]"; + public static final String HTTP = "http://"; + + @Test + public void testCreate() throws Exception { + try (PDClient client = session.getPDClient()) { + assertEquals( + LOCAL_ADDR + ":" + leader.getPort(), client.getPdClientWrapper().getLeaderInfo()); + assertEquals(CLUSTER_ID, client.getHeader().getClusterId()); + } + } + + @Test + public void testSwitchLeader() throws Exception { + try (PDClient client = session.getPDClient()) { + // Switch leader to server 1 + client.trySwitchLeader(HTTP + LOCAL_ADDR + ":" + pdServers.get(1).getPort()); + assertEquals( + client.getPdClientWrapper().getLeaderInfo(), + HTTP + LOCAL_ADDR + ":" + pdServers.get(1).getPort()); + } + tearDown(); + setup(LOCAL_ADDR_IPV6); + try (PDClient client = session.getPDClient()) { + client.trySwitchLeader(HTTP + LOCAL_ADDR_IPV6 + ":" + pdServers.get(2).getPort()); + assertEquals( + client.getPdClientWrapper().getLeaderInfo(), + HTTP + LOCAL_ADDR_IPV6 + ":" + pdServers.get(2).getPort()); + } + } + + @Test + public void testTso() throws Exception { + try (PDClient client = session.getPDClient()) { + TiTimestamp ts = client.getTimestamp(defaultBackOff()); + // Test pdServer is set to generate physical == logical + 1 + assertEquals(ts.getPhysical(), ts.getLogical() + 1); + } + } + + @Test + public void testGetRegionByKey() throws Exception { + byte[] startKey = new byte[] {1, 0, 2, 4}; + byte[] endKey = new byte[] {1, 0, 2, 5}; + int confVer = 1026; + int ver = 1027; + leader.addGetRegionListener( + request -> + GrpcUtils.makeGetRegionResponse( + leader.getClusterId(), + GrpcUtils.makeRegion( + 1, + ByteString.copyFrom(startKey), + ByteString.copyFrom(endKey), + GrpcUtils.makeRegionEpoch(confVer, ver), + GrpcUtils.makePeer(1, 10), + GrpcUtils.makePeer(2, 20)))); + try (PDClient client = session.getPDClient()) { + Pair rl = + client.getRegionByKey(defaultBackOff(), ByteString.EMPTY); + Metapb.Region r = rl.first; + Metapb.Peer l = rl.second; + assertEquals(r.getStartKey(), ByteString.copyFrom(startKey)); + assertEquals(r.getEndKey(), ByteString.copyFrom(endKey)); + assertEquals(r.getRegionEpoch().getConfVer(), confVer); + assertEquals(r.getRegionEpoch().getVersion(), ver); + assertEquals(1, l.getId()); + assertEquals(10, l.getStoreId()); + } + } + + @Test + public void testGetRegionById() throws Exception { + byte[] startKey = new byte[] {1, 0, 2, 4}; + byte[] endKey = new byte[] {1, 0, 2, 5}; + int confVer = 1026; + int ver = 1027; + + leader.addGetRegionByIDListener( + request -> + GrpcUtils.makeGetRegionResponse( + leader.getClusterId(), + GrpcUtils.makeRegion( + 1, + ByteString.copyFrom(startKey), + ByteString.copyFrom(endKey), + GrpcUtils.makeRegionEpoch(confVer, ver), + GrpcUtils.makePeer(1, 10), + GrpcUtils.makePeer(2, 20)))); + try (PDClient client = session.getPDClient()) { + Pair rl = client.getRegionByID(defaultBackOff(), 0); + Metapb.Region r = rl.first; + Metapb.Peer l = rl.second; + assertEquals(ByteString.copyFrom(startKey), r.getStartKey()); + assertEquals(ByteString.copyFrom(endKey), r.getEndKey()); + assertEquals(confVer, r.getRegionEpoch().getConfVer()); + assertEquals(ver, r.getRegionEpoch().getVersion()); + assertEquals(1, l.getId()); + assertEquals(10, l.getStoreId()); + } + } + + @Test + public void testGetStore() throws Exception { + long storeId = 1; + String testAddress = "testAddress"; + leader.addGetStoreListener( + request -> + GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), + GrpcUtils.makeStore( + storeId, + testAddress, + Metapb.StoreState.Up, + GrpcUtils.makeStoreLabel("k1", "v1"), + GrpcUtils.makeStoreLabel("k2", "v2")))); + try (PDClient client = session.getPDClient()) { + Store r = client.getStore(defaultBackOff(), storeId); + assertEquals(storeId, r.getId()); + assertEquals(testAddress, r.getAddress()); + assertEquals(Metapb.StoreState.Up, r.getState()); + assertEquals("k1", r.getLabels(0).getKey()); + assertEquals("k2", r.getLabels(1).getKey()); + assertEquals("v1", r.getLabels(0).getValue()); + assertEquals("v2", r.getLabels(1).getValue()); + + leader.addGetStoreListener( + request -> + GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), + GrpcUtils.makeStore(storeId, testAddress, Metapb.StoreState.Tombstone))); + assertEquals(StoreState.Tombstone, client.getStore(defaultBackOff(), storeId).getState()); + } + } + + private BackOffer defaultBackOff() { + return ConcreteBackOffer.newCustomBackOff(1000); + } + + @Test + public void testRetryPolicy() throws Exception { + long storeId = 1024; + ExecutorService service = Executors.newCachedThreadPool(); + AtomicInteger i = new AtomicInteger(); + leader.addGetStoreListener( + request -> { + if (i.getAndIncrement() < 2) { + return null; + } else { + return GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), GrpcUtils.makeStore(storeId, "", Metapb.StoreState.Up)); + } + }); + try (PDClient client = session.getPDClient()) { + Callable storeCallable = + () -> client.getStore(ConcreteBackOffer.newCustomBackOff(5000), 0); + Future storeFuture = service.submit(storeCallable); + try { + Store r = storeFuture.get(50, TimeUnit.SECONDS); + assertEquals(r.getId(), storeId); + } catch (TimeoutException e) { + fail(); + } + + // Should fail + AtomicInteger j = new AtomicInteger(); + leader.addGetStoreListener( + request -> { + if (j.getAndIncrement() < 6) { + return null; + } else { + return GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), GrpcUtils.makeStore(storeId, "", Metapb.StoreState.Up)); + } + }); + + try { + client.getStore(defaultBackOff(), 0); + } catch (GrpcException e) { + assertTrue(true); + return; + } + fail(); + } + } +} diff --git a/src/test/java/org/tikv/common/PDMockServer.java b/src/test/java/org/tikv/common/PDMockServer.java index 521a286741f..11abbd11e7a 100644 --- a/src/test/java/org/tikv/common/PDMockServer.java +++ b/src/test/java/org/tikv/common/PDMockServer.java @@ -28,7 +28,11 @@ import org.tikv.kvproto.Pdpb.*; public class PDMockServer extends PDGrpc.PDImplBase { +<<<<<<< HEAD public int port; +======= + private int port; +>>>>>>> f4e7c302a... [close #570] mockserver: fix unstable mock server cluster setup (#571) private long clusterId; private Server server; @@ -136,4 +140,8 @@ public void stop() { public long getClusterId() { return clusterId; } + + public long getPort() { + return port; + } } diff --git a/src/test/java/org/tikv/common/PDMockServerTest.java b/src/test/java/org/tikv/common/PDMockServerTest.java index 6469c38731f..7f35e4fda8d 100644 --- a/src/test/java/org/tikv/common/PDMockServerTest.java +++ b/src/test/java/org/tikv/common/PDMockServerTest.java @@ -30,6 +30,7 @@ public void setUp() throws IOException { setUp(LOCAL_ADDR); } +<<<<<<< HEAD void setUp(String addr) throws IOException { pdServer = new PDMockServer(); pdServer.start(CLUSTER_ID); @@ -40,6 +41,37 @@ void setUp(String addr) throws IOException { GrpcUtils.makeMember(2, "http://" + addr + ":" + (pdServer.port + 1)), GrpcUtils.makeMember(3, "http://" + addr + ":" + (pdServer.port + 2)))); TiConfiguration conf = TiConfiguration.createDefault(addr + ":" + pdServer.port); +======= + void setup(String addr) throws IOException { + int[] ports = new int[3]; + for (int i = 0; i < ports.length; i++) { + try (ServerSocket s = new ServerSocket(0)) { + ports[i] = s.getLocalPort(); + } + } + + for (int i = 0; i < ports.length; i++) { + PDMockServer server = new PDMockServer(); + server.start(CLUSTER_ID, ports[i]); + server.addGetMembersListener( + (request) -> + GrpcUtils.makeGetMembersResponse( + server.getClusterId(), + GrpcUtils.makeMember(1, "http://" + addr + ":" + ports[0]), + GrpcUtils.makeMember(2, "http://" + addr + ":" + ports[1]), + GrpcUtils.makeMember(3, "http://" + addr + ":" + ports[2]))); + pdServers.add(server); + if (i == 0) { + leader = server; + } + } + + TiConfiguration conf = TiConfiguration.createDefault(addr + ":" + ports[0]); + conf.setKvMode("RAW"); + conf.setWarmUpEnable(false); + conf.setTimeout(2000); + conf.setEnableGrpcForward(true); +>>>>>>> f4e7c302a... [close #570] mockserver: fix unstable mock server cluster setup (#571) session = TiSession.create(conf); } diff --git a/src/test/java/org/tikv/common/SeekLeaderStoreTest.java b/src/test/java/org/tikv/common/SeekLeaderStoreTest.java new file mode 100644 index 00000000000..891e676ee96 --- /dev/null +++ b/src/test/java/org/tikv/common/SeekLeaderStoreTest.java @@ -0,0 +1,77 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common; + +import com.google.protobuf.ByteString; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Assert; +import org.junit.Test; +import org.tikv.common.KVMockServer.State; +import org.tikv.kvproto.Metapb; +import org.tikv.kvproto.Metapb.StoreState; +import org.tikv.kvproto.Pdpb; +import org.tikv.raw.RawKVClient; + +public class SeekLeaderStoreTest extends MockThreeStoresTest { + private RawKVClient createClient() { + return session.createRawClient(); + } + + @Test + public void testSeekLeader() { + RawKVClient client = createClient(); + ByteString key = ByteString.copyFromUtf8("key"); + ByteString value = ByteString.copyFromUtf8("value"); + + put(key, value); + + Assert.assertEquals(value, client.get(key).get()); + servers.get(0).setState(State.Fail); + servers.get(1).setRegion(region.switchPeer(stores.get(1).getId())); + Assert.assertEquals(value, client.get(key).get()); + + remove(key, value); + } + + @Test + public void testSeekLeaderMeetInvalidStore() { + RawKVClient client = createClient(); + ByteString key = ByteString.copyFromUtf8("key"); + ByteString value = ByteString.copyFromUtf8("value"); + + put(key, value); + + servers.get(0).setState(State.Fail); + servers.get(2).setRegion(region.switchPeer(stores.get(2).getId())); + + AtomicInteger i = new AtomicInteger(0); + leader.addGetStoreListener( + request -> { + Metapb.Store.Builder storeBuilder = + Metapb.Store.newBuilder().mergeFrom(stores.get((int) request.getStoreId() - 1)); + if (request.getStoreId() == 0x2 && i.incrementAndGet() > 0) { + storeBuilder.setState(StoreState.Tombstone); + } + return Pdpb.GetStoreResponse.newBuilder().setStore(storeBuilder.build()).build(); + }); + + Assert.assertEquals(value, client.get(key).get()); + + remove(key, value); + } +} diff --git a/src/test/java/org/tikv/common/SeekProxyStoreTest.java b/src/test/java/org/tikv/common/SeekProxyStoreTest.java new file mode 100644 index 00000000000..6149a591cc1 --- /dev/null +++ b/src/test/java/org/tikv/common/SeekProxyStoreTest.java @@ -0,0 +1,47 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common; + +import com.google.protobuf.ByteString; +import org.junit.Assert; +import org.junit.Test; +import org.tikv.common.KVMockServer.State; +import org.tikv.raw.RawKVClient; + +public class SeekProxyStoreTest extends MockThreeStoresTest { + private RawKVClient createClient() { + return session.createRawClient(); + } + + @Test + public void testSeekProxyStore() { + RawKVClient client = createClient(); + ByteString key = ByteString.copyFromUtf8("key"); + ByteString value = ByteString.copyFromUtf8("value"); + put(key, value); + + client.put(key, value); + Assert.assertEquals(value, client.get(key).get()); + // Set the leader to state Fail, the request will route to peer 0x2, which is not the leader. + // The state of three peers is the same. + // Thus, with the correct context, the peer 0x2 will return normally. + servers.get(0).setState(State.Fail); + + Assert.assertEquals(value, client.get(key).get()); + } +}