diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index ae90127f049..91c6c847f68 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -29,7 +29,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Set; -import java.util.concurrent.*; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.AbstractGRPCClient; diff --git a/src/test/java/org/tikv/common/MockThreeStoresTest.java b/src/test/java/org/tikv/common/MockThreeStoresTest.java index fa4dbe41e35..43063be7d17 100644 --- a/src/test/java/org/tikv/common/MockThreeStoresTest.java +++ b/src/test/java/org/tikv/common/MockThreeStoresTest.java @@ -42,9 +42,11 @@ public class MockThreeStoresTest extends PDMockServerTest { public void setup() throws IOException { super.setup(); - int basePort; - try (ServerSocket s = new ServerSocket(0)) { - basePort = s.getLocalPort(); + int[] ports = new int[3]; + for (int i = 0; i < ports.length; i++) { + try (ServerSocket s = new ServerSocket(0)) { + ports[i] = s.getLocalPort(); + } } ImmutableList peers = @@ -65,17 +67,17 @@ public void setup() throws IOException { stores = ImmutableList.of( Metapb.Store.newBuilder() - .setAddress("127.0.0.1:" + basePort) + .setAddress("127.0.0.1:" + ports[0]) .setVersion("5.0.0") .setId(0x1) .build(), Metapb.Store.newBuilder() - .setAddress("127.0.0.1:" + (basePort + 1)) + .setAddress("127.0.0.1:" + ports[1]) .setVersion("5.0.0") .setId(0x2) .build(), Metapb.Store.newBuilder() - .setAddress("127.0.0.1:" + (basePort + 2)) + .setAddress("127.0.0.1:" + ports[2]) .setVersion("5.0.0") .setId(0x3) .build()); @@ -101,9 +103,9 @@ public void setup() throws IOException { region.getPeers(0), region.getPeersList(), stores.stream().map(TiStore::new).collect(Collectors.toList())); - for (int i = 0; i < 3; i++) { + for (int port : ports) { KVMockServer server = new KVMockServer(); - server.start(this.region, basePort + i); + server.start(this.region, port); servers.add(server); } } diff --git a/src/test/java/org/tikv/common/PDClientIntegrationTest.java b/src/test/java/org/tikv/common/PDClientIntegrationTest.java index a78ad2b411c..fe0c82d74ad 100644 --- a/src/test/java/org/tikv/common/PDClientIntegrationTest.java +++ b/src/test/java/org/tikv/common/PDClientIntegrationTest.java @@ -27,7 +27,6 @@ import org.tikv.BaseRawKVTest; public class PDClientIntegrationTest extends BaseRawKVTest { - private TiSession session; @Before diff --git a/src/test/java/org/tikv/common/PDClientMockTest.java b/src/test/java/org/tikv/common/PDClientMockTest.java index 3ba0e374392..a8074d94572 100644 --- a/src/test/java/org/tikv/common/PDClientMockTest.java +++ b/src/test/java/org/tikv/common/PDClientMockTest.java @@ -40,14 +40,14 @@ import org.tikv.kvproto.Metapb.StoreState; public class PDClientMockTest extends PDMockServerTest { - private static final String LOCAL_ADDR_IPV6 = "[::1]"; public static final String HTTP = "http://"; @Test public void testCreate() throws Exception { try (PDClient client = session.getPDClient()) { - assertEquals(LOCAL_ADDR + ":" + leader.port, client.getPdClientWrapper().getLeaderInfo()); + assertEquals( + LOCAL_ADDR + ":" + leader.getPort(), client.getPdClientWrapper().getLeaderInfo()); assertEquals(CLUSTER_ID, client.getHeader().getClusterId()); } } @@ -55,17 +55,19 @@ public void testCreate() throws Exception { @Test public void testSwitchLeader() throws Exception { try (PDClient client = session.getPDClient()) { - client.trySwitchLeader(HTTP + LOCAL_ADDR + ":" + (leader.port + 1)); + // Switch leader to server 1 + client.trySwitchLeader(HTTP + LOCAL_ADDR + ":" + pdServers.get(1).getPort()); assertEquals( - client.getPdClientWrapper().getLeaderInfo(), HTTP + LOCAL_ADDR + ":" + (leader.port + 1)); + client.getPdClientWrapper().getLeaderInfo(), + HTTP + LOCAL_ADDR + ":" + pdServers.get(1).getPort()); } tearDown(); setup(LOCAL_ADDR_IPV6); try (PDClient client = session.getPDClient()) { - client.trySwitchLeader(HTTP + LOCAL_ADDR_IPV6 + ":" + (leader.port + 2)); + client.trySwitchLeader(HTTP + LOCAL_ADDR_IPV6 + ":" + pdServers.get(2).getPort()); assertEquals( client.getPdClientWrapper().getLeaderInfo(), - HTTP + LOCAL_ADDR_IPV6 + ":" + (leader.port + 2)); + HTTP + LOCAL_ADDR_IPV6 + ":" + pdServers.get(2).getPort()); } } diff --git a/src/test/java/org/tikv/common/PDMockServer.java b/src/test/java/org/tikv/common/PDMockServer.java index 103d643d83f..96c4970af91 100644 --- a/src/test/java/org/tikv/common/PDMockServer.java +++ b/src/test/java/org/tikv/common/PDMockServer.java @@ -37,7 +37,7 @@ import org.tikv.kvproto.Pdpb.TsoResponse; public class PDMockServer extends PDGrpc.PDImplBase { - public int port; + private int port; private long clusterId; private Server server; @@ -147,4 +147,8 @@ public void stop() { public long getClusterId() { return clusterId; } + + public long getPort() { + return port; + } } diff --git a/src/test/java/org/tikv/common/PDMockServerTest.java b/src/test/java/org/tikv/common/PDMockServerTest.java index dab114b6c05..c43af58032c 100644 --- a/src/test/java/org/tikv/common/PDMockServerTest.java +++ b/src/test/java/org/tikv/common/PDMockServerTest.java @@ -37,28 +37,30 @@ public void setup() throws IOException { } void setup(String addr) throws IOException { - int basePort; - try (ServerSocket s = new ServerSocket(0)) { - basePort = s.getLocalPort(); + int[] ports = new int[3]; + for (int i = 0; i < ports.length; i++) { + try (ServerSocket s = new ServerSocket(0)) { + ports[i] = s.getLocalPort(); + } } - for (int i = 0; i < 3; i++) { + for (int i = 0; i < ports.length; i++) { PDMockServer server = new PDMockServer(); - server.start(CLUSTER_ID, basePort + i); + server.start(CLUSTER_ID, ports[i]); server.addGetMembersListener( (request) -> GrpcUtils.makeGetMembersResponse( server.getClusterId(), - GrpcUtils.makeMember(1, "http://" + addr + ":" + basePort), - GrpcUtils.makeMember(2, "http://" + addr + ":" + (basePort + 1)), - GrpcUtils.makeMember(3, "http://" + addr + ":" + (basePort + 2)))); + GrpcUtils.makeMember(1, "http://" + addr + ":" + ports[0]), + GrpcUtils.makeMember(2, "http://" + addr + ":" + ports[1]), + GrpcUtils.makeMember(3, "http://" + addr + ":" + ports[2]))); pdServers.add(server); if (i == 0) { leader = server; } } - TiConfiguration conf = TiConfiguration.createDefault(addr + ":" + leader.port); + TiConfiguration conf = TiConfiguration.createDefault(addr + ":" + ports[0]); conf.setKvMode("RAW"); conf.setWarmUpEnable(false); conf.setTimeout(2000); diff --git a/src/test/java/org/tikv/common/SeekLeaderStoreTest.java b/src/test/java/org/tikv/common/SeekLeaderStoreTest.java index 201f10986f5..891e676ee96 100644 --- a/src/test/java/org/tikv/common/SeekLeaderStoreTest.java +++ b/src/test/java/org/tikv/common/SeekLeaderStoreTest.java @@ -28,7 +28,6 @@ import org.tikv.raw.RawKVClient; public class SeekLeaderStoreTest extends MockThreeStoresTest { - private RawKVClient createClient() { return session.createRawClient(); } diff --git a/src/test/java/org/tikv/common/SeekProxyStoreTest.java b/src/test/java/org/tikv/common/SeekProxyStoreTest.java new file mode 100644 index 00000000000..6149a591cc1 --- /dev/null +++ b/src/test/java/org/tikv/common/SeekProxyStoreTest.java @@ -0,0 +1,47 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common; + +import com.google.protobuf.ByteString; +import org.junit.Assert; +import org.junit.Test; +import org.tikv.common.KVMockServer.State; +import org.tikv.raw.RawKVClient; + +public class SeekProxyStoreTest extends MockThreeStoresTest { + private RawKVClient createClient() { + return session.createRawClient(); + } + + @Test + public void testSeekProxyStore() { + RawKVClient client = createClient(); + ByteString key = ByteString.copyFromUtf8("key"); + ByteString value = ByteString.copyFromUtf8("value"); + put(key, value); + + client.put(key, value); + Assert.assertEquals(value, client.get(key).get()); + // Set the leader to state Fail, the request will route to peer 0x2, which is not the leader. + // The state of three peers is the same. + // Thus, with the correct context, the peer 0x2 will return normally. + servers.get(0).setState(State.Fail); + + Assert.assertEquals(value, client.get(key).get()); + } +}