diff --git a/src/test/java/org/tikv/common/MockThreeStoresTest.java b/src/test/java/org/tikv/common/MockThreeStoresTest.java index fa4dbe41e35..43063be7d17 100644 --- a/src/test/java/org/tikv/common/MockThreeStoresTest.java +++ b/src/test/java/org/tikv/common/MockThreeStoresTest.java @@ -42,9 +42,11 @@ public class MockThreeStoresTest extends PDMockServerTest { public void setup() throws IOException { super.setup(); - int basePort; - try (ServerSocket s = new ServerSocket(0)) { - basePort = s.getLocalPort(); + int[] ports = new int[3]; + for (int i = 0; i < ports.length; i++) { + try (ServerSocket s = new ServerSocket(0)) { + ports[i] = s.getLocalPort(); + } } ImmutableList peers = @@ -65,17 +67,17 @@ public void setup() throws IOException { stores = ImmutableList.of( Metapb.Store.newBuilder() - .setAddress("127.0.0.1:" + basePort) + .setAddress("127.0.0.1:" + ports[0]) .setVersion("5.0.0") .setId(0x1) .build(), Metapb.Store.newBuilder() - .setAddress("127.0.0.1:" + (basePort + 1)) + .setAddress("127.0.0.1:" + ports[1]) .setVersion("5.0.0") .setId(0x2) .build(), Metapb.Store.newBuilder() - .setAddress("127.0.0.1:" + (basePort + 2)) + .setAddress("127.0.0.1:" + ports[2]) .setVersion("5.0.0") .setId(0x3) .build()); @@ -101,9 +103,9 @@ public void setup() throws IOException { region.getPeers(0), region.getPeersList(), stores.stream().map(TiStore::new).collect(Collectors.toList())); - for (int i = 0; i < 3; i++) { + for (int port : ports) { KVMockServer server = new KVMockServer(); - server.start(this.region, basePort + i); + server.start(this.region, port); servers.add(server); } } diff --git a/src/test/java/org/tikv/common/PDClientMockTest.java b/src/test/java/org/tikv/common/PDClientMockTest.java index 3ba0e374392..a8074d94572 100644 --- a/src/test/java/org/tikv/common/PDClientMockTest.java +++ b/src/test/java/org/tikv/common/PDClientMockTest.java @@ -40,14 +40,14 @@ import org.tikv.kvproto.Metapb.StoreState; public class PDClientMockTest extends PDMockServerTest { - private static final String LOCAL_ADDR_IPV6 = "[::1]"; public static final String HTTP = "http://"; @Test public void testCreate() throws Exception { try (PDClient client = session.getPDClient()) { - assertEquals(LOCAL_ADDR + ":" + leader.port, client.getPdClientWrapper().getLeaderInfo()); + assertEquals( + LOCAL_ADDR + ":" + leader.getPort(), client.getPdClientWrapper().getLeaderInfo()); assertEquals(CLUSTER_ID, client.getHeader().getClusterId()); } } @@ -55,17 +55,19 @@ public void testCreate() throws Exception { @Test public void testSwitchLeader() throws Exception { try (PDClient client = session.getPDClient()) { - client.trySwitchLeader(HTTP + LOCAL_ADDR + ":" + (leader.port + 1)); + // Switch leader to server 1 + client.trySwitchLeader(HTTP + LOCAL_ADDR + ":" + pdServers.get(1).getPort()); assertEquals( - client.getPdClientWrapper().getLeaderInfo(), HTTP + LOCAL_ADDR + ":" + (leader.port + 1)); + client.getPdClientWrapper().getLeaderInfo(), + HTTP + LOCAL_ADDR + ":" + pdServers.get(1).getPort()); } tearDown(); setup(LOCAL_ADDR_IPV6); try (PDClient client = session.getPDClient()) { - client.trySwitchLeader(HTTP + LOCAL_ADDR_IPV6 + ":" + (leader.port + 2)); + client.trySwitchLeader(HTTP + LOCAL_ADDR_IPV6 + ":" + pdServers.get(2).getPort()); assertEquals( client.getPdClientWrapper().getLeaderInfo(), - HTTP + LOCAL_ADDR_IPV6 + ":" + (leader.port + 2)); + HTTP + LOCAL_ADDR_IPV6 + ":" + pdServers.get(2).getPort()); } } diff --git a/src/test/java/org/tikv/common/PDClientTest.java b/src/test/java/org/tikv/common/PDClientTest.java deleted file mode 100644 index 75562308ccc..00000000000 --- a/src/test/java/org/tikv/common/PDClientTest.java +++ /dev/null @@ -1,226 +0,0 @@ -/* - * Copyright 2017 TiKV Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.tikv.common; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import com.google.protobuf.ByteString; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; -import org.junit.Test; -import org.tikv.common.exception.GrpcException; -import org.tikv.common.meta.TiTimestamp; -import org.tikv.common.util.BackOffer; -import org.tikv.common.util.ConcreteBackOffer; -import org.tikv.common.util.Pair; -import org.tikv.kvproto.Metapb; -import org.tikv.kvproto.Metapb.Store; -import org.tikv.kvproto.Metapb.StoreState; - -public class PDClientTest extends PDMockServerTest { - - private static final String LOCAL_ADDR_IPV6 = "[::1]"; - public static final String HTTP = "http://"; - - @Test - public void testCreate() throws Exception { - try (PDClient client = session.getPDClient()) { - assertEquals(LOCAL_ADDR + ":" + leader.port, client.getPdClientWrapper().getLeaderInfo()); - assertEquals(CLUSTER_ID, client.getHeader().getClusterId()); - } - } - - @Test - public void testSwitchLeader() throws Exception { - try (PDClient client = session.getPDClient()) { - client.trySwitchLeader(HTTP + LOCAL_ADDR + ":" + (leader.port + 1)); - assertEquals( - client.getPdClientWrapper().getLeaderInfo(), HTTP + LOCAL_ADDR + ":" + (leader.port + 1)); - } - tearDown(); - setup(LOCAL_ADDR_IPV6); - try (PDClient client = session.getPDClient()) { - client.trySwitchLeader(HTTP + LOCAL_ADDR_IPV6 + ":" + (leader.port + 2)); - assertEquals( - client.getPdClientWrapper().getLeaderInfo(), - HTTP + LOCAL_ADDR_IPV6 + ":" + (leader.port + 2)); - } - } - - @Test - public void testTso() throws Exception { - try (PDClient client = session.getPDClient()) { - TiTimestamp ts = client.getTimestamp(defaultBackOff()); - // Test pdServer is set to generate physical == logical + 1 - assertEquals(ts.getPhysical(), ts.getLogical() + 1); - } - } - - @Test - public void testGetRegionByKey() throws Exception { - byte[] startKey = new byte[] {1, 0, 2, 4}; - byte[] endKey = new byte[] {1, 0, 2, 5}; - int confVer = 1026; - int ver = 1027; - leader.addGetRegionListener( - request -> - GrpcUtils.makeGetRegionResponse( - leader.getClusterId(), - GrpcUtils.makeRegion( - 1, - ByteString.copyFrom(startKey), - ByteString.copyFrom(endKey), - GrpcUtils.makeRegionEpoch(confVer, ver), - GrpcUtils.makePeer(1, 10), - GrpcUtils.makePeer(2, 20)))); - try (PDClient client = session.getPDClient()) { - Pair rl = - client.getRegionByKey(defaultBackOff(), ByteString.EMPTY); - Metapb.Region r = rl.first; - Metapb.Peer l = rl.second; - assertEquals(r.getStartKey(), ByteString.copyFrom(startKey)); - assertEquals(r.getEndKey(), ByteString.copyFrom(endKey)); - assertEquals(r.getRegionEpoch().getConfVer(), confVer); - assertEquals(r.getRegionEpoch().getVersion(), ver); - assertEquals(1, l.getId()); - assertEquals(10, l.getStoreId()); - } - } - - @Test - public void testGetRegionById() throws Exception { - byte[] startKey = new byte[] {1, 0, 2, 4}; - byte[] endKey = new byte[] {1, 0, 2, 5}; - int confVer = 1026; - int ver = 1027; - - leader.addGetRegionByIDListener( - request -> - GrpcUtils.makeGetRegionResponse( - leader.getClusterId(), - GrpcUtils.makeRegion( - 1, - ByteString.copyFrom(startKey), - ByteString.copyFrom(endKey), - GrpcUtils.makeRegionEpoch(confVer, ver), - GrpcUtils.makePeer(1, 10), - GrpcUtils.makePeer(2, 20)))); - try (PDClient client = session.getPDClient()) { - Pair rl = client.getRegionByID(defaultBackOff(), 0); - Metapb.Region r = rl.first; - Metapb.Peer l = rl.second; - assertEquals(ByteString.copyFrom(startKey), r.getStartKey()); - assertEquals(ByteString.copyFrom(endKey), r.getEndKey()); - assertEquals(confVer, r.getRegionEpoch().getConfVer()); - assertEquals(ver, r.getRegionEpoch().getVersion()); - assertEquals(1, l.getId()); - assertEquals(10, l.getStoreId()); - } - } - - @Test - public void testGetStore() throws Exception { - long storeId = 1; - String testAddress = "testAddress"; - leader.addGetStoreListener( - request -> - GrpcUtils.makeGetStoreResponse( - leader.getClusterId(), - GrpcUtils.makeStore( - storeId, - testAddress, - Metapb.StoreState.Up, - GrpcUtils.makeStoreLabel("k1", "v1"), - GrpcUtils.makeStoreLabel("k2", "v2")))); - try (PDClient client = session.getPDClient()) { - Store r = client.getStore(defaultBackOff(), storeId); - assertEquals(storeId, r.getId()); - assertEquals(testAddress, r.getAddress()); - assertEquals(Metapb.StoreState.Up, r.getState()); - assertEquals("k1", r.getLabels(0).getKey()); - assertEquals("k2", r.getLabels(1).getKey()); - assertEquals("v1", r.getLabels(0).getValue()); - assertEquals("v2", r.getLabels(1).getValue()); - - leader.addGetStoreListener( - request -> - GrpcUtils.makeGetStoreResponse( - leader.getClusterId(), - GrpcUtils.makeStore(storeId, testAddress, Metapb.StoreState.Tombstone))); - assertEquals(StoreState.Tombstone, client.getStore(defaultBackOff(), storeId).getState()); - } - } - - private BackOffer defaultBackOff() { - return ConcreteBackOffer.newCustomBackOff(1000); - } - - @Test - public void testRetryPolicy() throws Exception { - long storeId = 1024; - ExecutorService service = Executors.newCachedThreadPool(); - AtomicInteger i = new AtomicInteger(); - leader.addGetStoreListener( - request -> { - if (i.getAndIncrement() < 2) { - return null; - } else { - return GrpcUtils.makeGetStoreResponse( - leader.getClusterId(), GrpcUtils.makeStore(storeId, "", Metapb.StoreState.Up)); - } - }); - try (PDClient client = session.getPDClient()) { - Callable storeCallable = - () -> client.getStore(ConcreteBackOffer.newCustomBackOff(5000), 0); - Future storeFuture = service.submit(storeCallable); - try { - Store r = storeFuture.get(50, TimeUnit.SECONDS); - assertEquals(r.getId(), storeId); - } catch (TimeoutException e) { - fail(); - } - - // Should fail - AtomicInteger j = new AtomicInteger(); - leader.addGetStoreListener( - request -> { - if (j.getAndIncrement() < 6) { - return null; - } else { - return GrpcUtils.makeGetStoreResponse( - leader.getClusterId(), GrpcUtils.makeStore(storeId, "", Metapb.StoreState.Up)); - } - }); - - try { - client.getStore(defaultBackOff(), 0); - } catch (GrpcException e) { - assertTrue(true); - return; - } - fail(); - } - } -} diff --git a/src/test/java/org/tikv/common/PDMockServer.java b/src/test/java/org/tikv/common/PDMockServer.java index 3d64624f448..96c4970af91 100644 --- a/src/test/java/org/tikv/common/PDMockServer.java +++ b/src/test/java/org/tikv/common/PDMockServer.java @@ -37,8 +37,7 @@ import org.tikv.kvproto.Pdpb.TsoResponse; public class PDMockServer extends PDGrpc.PDImplBase { - - public int port; + private int port; private long clusterId; private Server server; @@ -148,4 +147,8 @@ public void stop() { public long getClusterId() { return clusterId; } + + public long getPort() { + return port; + } } diff --git a/src/test/java/org/tikv/common/PDMockServerTest.java b/src/test/java/org/tikv/common/PDMockServerTest.java index 1fd84f5ddbe..c43af58032c 100644 --- a/src/test/java/org/tikv/common/PDMockServerTest.java +++ b/src/test/java/org/tikv/common/PDMockServerTest.java @@ -25,7 +25,6 @@ import org.junit.Before; public abstract class PDMockServerTest { - protected static final String LOCAL_ADDR = "127.0.0.1"; static final long CLUSTER_ID = 1024; protected TiSession session; @@ -38,28 +37,30 @@ public void setup() throws IOException { } void setup(String addr) throws IOException { - int basePort; - try (ServerSocket s = new ServerSocket(0)) { - basePort = s.getLocalPort(); + int[] ports = new int[3]; + for (int i = 0; i < ports.length; i++) { + try (ServerSocket s = new ServerSocket(0)) { + ports[i] = s.getLocalPort(); + } } - for (int i = 0; i < 3; i++) { + for (int i = 0; i < ports.length; i++) { PDMockServer server = new PDMockServer(); - server.start(CLUSTER_ID, basePort + i); + server.start(CLUSTER_ID, ports[i]); server.addGetMembersListener( (request) -> GrpcUtils.makeGetMembersResponse( server.getClusterId(), - GrpcUtils.makeMember(1, "http://" + addr + ":" + basePort), - GrpcUtils.makeMember(2, "http://" + addr + ":" + (basePort + 1)), - GrpcUtils.makeMember(3, "http://" + addr + ":" + (basePort + 2)))); + GrpcUtils.makeMember(1, "http://" + addr + ":" + ports[0]), + GrpcUtils.makeMember(2, "http://" + addr + ":" + ports[1]), + GrpcUtils.makeMember(3, "http://" + addr + ":" + ports[2]))); pdServers.add(server); if (i == 0) { leader = server; } } - TiConfiguration conf = TiConfiguration.createDefault(addr + ":" + leader.port); + TiConfiguration conf = TiConfiguration.createDefault(addr + ":" + ports[0]); conf.setKvMode("RAW"); conf.setWarmUpEnable(false); conf.setTimeout(2000); diff --git a/src/test/java/org/tikv/common/SeekLeaderStoreTest.java b/src/test/java/org/tikv/common/SeekLeaderStoreTest.java index 64836c8f027..6a4ada598e2 100644 --- a/src/test/java/org/tikv/common/SeekLeaderStoreTest.java +++ b/src/test/java/org/tikv/common/SeekLeaderStoreTest.java @@ -28,7 +28,6 @@ import org.tikv.raw.RawKVClient; public class SeekLeaderStoreTest extends MockThreeStoresTest { - private RawKVClient createClient() { return session.createRawClient(); } diff --git a/src/test/java/org/tikv/common/SeekProxyStoreTest.java b/src/test/java/org/tikv/common/SeekProxyStoreTest.java new file mode 100644 index 00000000000..09f2a6dbeeb --- /dev/null +++ b/src/test/java/org/tikv/common/SeekProxyStoreTest.java @@ -0,0 +1,47 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common; + +import com.google.protobuf.ByteString; +import org.junit.Assert; +import org.junit.Test; +import org.tikv.common.KVMockServer.State; +import org.tikv.raw.RawKVClient; + +public class SeekProxyStoreTest extends MockThreeStoresTest { + private RawKVClient createClient() { + return session.createRawClient(); + } + + @Test + public void testSeekProxyStore() { + RawKVClient client = createClient(); + ByteString key = ByteString.copyFromUtf8("key"); + ByteString value = ByteString.copyFromUtf8("value"); + put(key, value); + + client.put(key, value); + Assert.assertEquals(value, client.get(key)); + // Set the leader to state Fail, the request will route to peer 0x2, which is not the leader. + // The state of three peers is the same. + // Thus, with the correct context, the peer 0x2 will return normally. + servers.get(0).setState(State.Fail); + + Assert.assertEquals(value, client.get(key)); + } +}