From 899703d4238aeef034e816e41084e2b1f1459d15 Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Fri, 25 Mar 2022 16:34:17 +0800 Subject: [PATCH 1/2] cherry pick #571 to release-3.2 Signed-off-by: ti-srebot --- .../org/tikv/common/MockThreeStoresTest.java | 131 ++++++++++++++++++ .../org/tikv/common/PDClientMockTest.java | 20 +++ .../java/org/tikv/common/PDMockServer.java | 8 ++ .../org/tikv/common/PDMockServerTest.java | 28 ++++ .../org/tikv/common/SeekLeaderStoreTest.java | 77 ++++++++++ .../org/tikv/common/SeekProxyStoreTest.java | 47 +++++++ 6 files changed, 311 insertions(+) create mode 100644 src/test/java/org/tikv/common/MockThreeStoresTest.java create mode 100644 src/test/java/org/tikv/common/SeekLeaderStoreTest.java create mode 100644 src/test/java/org/tikv/common/SeekProxyStoreTest.java diff --git a/src/test/java/org/tikv/common/MockThreeStoresTest.java b/src/test/java/org/tikv/common/MockThreeStoresTest.java new file mode 100644 index 00000000000..43063be7d17 --- /dev/null +++ b/src/test/java/org/tikv/common/MockThreeStoresTest.java @@ -0,0 +1,131 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common; + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; +import java.io.IOException; +import java.net.ServerSocket; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import org.junit.After; +import org.junit.Before; +import org.tikv.common.region.TiRegion; +import org.tikv.common.region.TiStore; +import org.tikv.kvproto.Metapb; +import org.tikv.kvproto.Pdpb; + +public class MockThreeStoresTest extends PDMockServerTest { + + protected TiRegion region; + protected List servers = new ArrayList<>(); + protected List stores; + + @Before + @Override + public void setup() throws IOException { + super.setup(); + + int[] ports = new int[3]; + for (int i = 0; i < ports.length; i++) { + try (ServerSocket s = new ServerSocket(0)) { + ports[i] = s.getLocalPort(); + } + } + + ImmutableList peers = + ImmutableList.of( + Metapb.Peer.newBuilder().setId(0x1).setStoreId(0x1).build(), + Metapb.Peer.newBuilder().setId(0x2).setStoreId(0x2).build(), + Metapb.Peer.newBuilder().setId(0x3).setStoreId(0x3).build()); + + Metapb.Region region = + Metapb.Region.newBuilder() + .setRegionEpoch(Metapb.RegionEpoch.newBuilder().setConfVer(1).setVersion(2)) + .setId(0xff) + .setStartKey(ByteString.EMPTY) + .setEndKey(ByteString.EMPTY) + .addAllPeers(peers) + .build(); + + stores = + ImmutableList.of( + Metapb.Store.newBuilder() + .setAddress("127.0.0.1:" + ports[0]) + .setVersion("5.0.0") + .setId(0x1) + .build(), + Metapb.Store.newBuilder() + .setAddress("127.0.0.1:" + ports[1]) + .setVersion("5.0.0") + .setId(0x2) + .build(), + Metapb.Store.newBuilder() + .setAddress("127.0.0.1:" + ports[2]) + .setVersion("5.0.0") + .setId(0x3) + .build()); + + for (PDMockServer server : pdServers) { + server.addGetRegionListener( + request -> + Pdpb.GetRegionResponse.newBuilder() + .setLeader(peers.get(0)) + .setRegion(region) + .build()); + server.addGetStoreListener( + (request) -> { + int i = (int) request.getStoreId() - 1; + return Pdpb.GetStoreResponse.newBuilder().setStore(stores.get(i)).build(); + }); + } + + this.region = + new TiRegion( + session.getConf(), + region, + region.getPeers(0), + region.getPeersList(), + stores.stream().map(TiStore::new).collect(Collectors.toList())); + for (int port : ports) { + KVMockServer server = new KVMockServer(); + server.start(this.region, port); + servers.add(server); + } + } + + public void put(ByteString key, ByteString value) { + for (KVMockServer server : servers) { + server.put(key, value); + } + } + + public void remove(ByteString key, ByteString value) { + for (KVMockServer server : servers) { + server.remove(key); + } + } + + @After + public void tearDown() { + for (KVMockServer server : servers) { + server.stop(); + } + } +} diff --git a/src/test/java/org/tikv/common/PDClientMockTest.java b/src/test/java/org/tikv/common/PDClientMockTest.java index 67d6bd64ad2..24278892579 100644 --- a/src/test/java/org/tikv/common/PDClientMockTest.java +++ b/src/test/java/org/tikv/common/PDClientMockTest.java @@ -46,7 +46,12 @@ public class PDClientMockTest extends PDMockServerTest { @Test public void testCreate() throws Exception { try (PDClient client = session.getPDClient()) { +<<<<<<< HEAD assertEquals(LOCAL_ADDR + ":" + pdServer.port, client.getPdClientWrapper().getLeaderInfo()); +======= + assertEquals( + LOCAL_ADDR + ":" + leader.getPort(), client.getPdClientWrapper().getLeaderInfo()); +>>>>>>> f4e7c302a... [close #570] mockserver: fix unstable mock server cluster setup (#571) assertEquals(CLUSTER_ID, client.getHeader().getClusterId()); } } @@ -54,18 +59,33 @@ public void testCreate() throws Exception { @Test public void testSwitchLeader() throws Exception { try (PDClient client = session.getPDClient()) { +<<<<<<< HEAD client.trySwitchLeader(HTTP + LOCAL_ADDR + ":" + (pdServer.port + 1)); assertEquals( client.getPdClientWrapper().getLeaderInfo(), HTTP + LOCAL_ADDR + ":" + (pdServer.port + 1)); +======= + // Switch leader to server 1 + client.trySwitchLeader(HTTP + LOCAL_ADDR + ":" + pdServers.get(1).getPort()); + assertEquals( + client.getPdClientWrapper().getLeaderInfo(), + HTTP + LOCAL_ADDR + ":" + pdServers.get(1).getPort()); +>>>>>>> f4e7c302a... [close #570] mockserver: fix unstable mock server cluster setup (#571) } tearDown(); setUp(LOCAL_ADDR_IPV6); try (PDClient client = session.getPDClient()) { +<<<<<<< HEAD client.trySwitchLeader(HTTP + LOCAL_ADDR_IPV6 + ":" + (pdServer.port + 2)); assertEquals( client.getPdClientWrapper().getLeaderInfo(), HTTP + LOCAL_ADDR_IPV6 + ":" + (pdServer.port + 2)); +======= + client.trySwitchLeader(HTTP + LOCAL_ADDR_IPV6 + ":" + pdServers.get(2).getPort()); + assertEquals( + client.getPdClientWrapper().getLeaderInfo(), + HTTP + LOCAL_ADDR_IPV6 + ":" + pdServers.get(2).getPort()); +>>>>>>> f4e7c302a... [close #570] mockserver: fix unstable mock server cluster setup (#571) } } diff --git a/src/test/java/org/tikv/common/PDMockServer.java b/src/test/java/org/tikv/common/PDMockServer.java index 78ac6304077..7a8f69fa5aa 100644 --- a/src/test/java/org/tikv/common/PDMockServer.java +++ b/src/test/java/org/tikv/common/PDMockServer.java @@ -30,7 +30,11 @@ import org.tikv.kvproto.Pdpb.*; public class PDMockServer extends PDGrpc.PDImplBase { +<<<<<<< HEAD public int port; +======= + private int port; +>>>>>>> f4e7c302a... [close #570] mockserver: fix unstable mock server cluster setup (#571) private long clusterId; private Server server; @@ -138,4 +142,8 @@ public void stop() { public long getClusterId() { return clusterId; } + + public long getPort() { + return port; + } } diff --git a/src/test/java/org/tikv/common/PDMockServerTest.java b/src/test/java/org/tikv/common/PDMockServerTest.java index cd07935fdd9..f4a56df8ced 100644 --- a/src/test/java/org/tikv/common/PDMockServerTest.java +++ b/src/test/java/org/tikv/common/PDMockServerTest.java @@ -32,6 +32,7 @@ public void setUp() throws IOException { setUp(LOCAL_ADDR); } +<<<<<<< HEAD void setUp(String addr) throws IOException { pdServer = new PDMockServer(); pdServer.start(CLUSTER_ID); @@ -43,6 +44,33 @@ void setUp(String addr) throws IOException { GrpcUtils.makeMember(3, "http://" + addr + ":" + (pdServer.port + 2)))); TiConfiguration conf = TiConfiguration.createDefault(addr + ":" + pdServer.port); conf.setEnableGrpcForward(false); +======= + void setup(String addr) throws IOException { + int[] ports = new int[3]; + for (int i = 0; i < ports.length; i++) { + try (ServerSocket s = new ServerSocket(0)) { + ports[i] = s.getLocalPort(); + } + } + + for (int i = 0; i < ports.length; i++) { + PDMockServer server = new PDMockServer(); + server.start(CLUSTER_ID, ports[i]); + server.addGetMembersListener( + (request) -> + GrpcUtils.makeGetMembersResponse( + server.getClusterId(), + GrpcUtils.makeMember(1, "http://" + addr + ":" + ports[0]), + GrpcUtils.makeMember(2, "http://" + addr + ":" + ports[1]), + GrpcUtils.makeMember(3, "http://" + addr + ":" + ports[2]))); + pdServers.add(server); + if (i == 0) { + leader = server; + } + } + + TiConfiguration conf = TiConfiguration.createDefault(addr + ":" + ports[0]); +>>>>>>> f4e7c302a... [close #570] mockserver: fix unstable mock server cluster setup (#571) conf.setKvMode("RAW"); conf.setTest(true); conf.setTimeout(2000); diff --git a/src/test/java/org/tikv/common/SeekLeaderStoreTest.java b/src/test/java/org/tikv/common/SeekLeaderStoreTest.java new file mode 100644 index 00000000000..891e676ee96 --- /dev/null +++ b/src/test/java/org/tikv/common/SeekLeaderStoreTest.java @@ -0,0 +1,77 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common; + +import com.google.protobuf.ByteString; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Assert; +import org.junit.Test; +import org.tikv.common.KVMockServer.State; +import org.tikv.kvproto.Metapb; +import org.tikv.kvproto.Metapb.StoreState; +import org.tikv.kvproto.Pdpb; +import org.tikv.raw.RawKVClient; + +public class SeekLeaderStoreTest extends MockThreeStoresTest { + private RawKVClient createClient() { + return session.createRawClient(); + } + + @Test + public void testSeekLeader() { + RawKVClient client = createClient(); + ByteString key = ByteString.copyFromUtf8("key"); + ByteString value = ByteString.copyFromUtf8("value"); + + put(key, value); + + Assert.assertEquals(value, client.get(key).get()); + servers.get(0).setState(State.Fail); + servers.get(1).setRegion(region.switchPeer(stores.get(1).getId())); + Assert.assertEquals(value, client.get(key).get()); + + remove(key, value); + } + + @Test + public void testSeekLeaderMeetInvalidStore() { + RawKVClient client = createClient(); + ByteString key = ByteString.copyFromUtf8("key"); + ByteString value = ByteString.copyFromUtf8("value"); + + put(key, value); + + servers.get(0).setState(State.Fail); + servers.get(2).setRegion(region.switchPeer(stores.get(2).getId())); + + AtomicInteger i = new AtomicInteger(0); + leader.addGetStoreListener( + request -> { + Metapb.Store.Builder storeBuilder = + Metapb.Store.newBuilder().mergeFrom(stores.get((int) request.getStoreId() - 1)); + if (request.getStoreId() == 0x2 && i.incrementAndGet() > 0) { + storeBuilder.setState(StoreState.Tombstone); + } + return Pdpb.GetStoreResponse.newBuilder().setStore(storeBuilder.build()).build(); + }); + + Assert.assertEquals(value, client.get(key).get()); + + remove(key, value); + } +} diff --git a/src/test/java/org/tikv/common/SeekProxyStoreTest.java b/src/test/java/org/tikv/common/SeekProxyStoreTest.java new file mode 100644 index 00000000000..6149a591cc1 --- /dev/null +++ b/src/test/java/org/tikv/common/SeekProxyStoreTest.java @@ -0,0 +1,47 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common; + +import com.google.protobuf.ByteString; +import org.junit.Assert; +import org.junit.Test; +import org.tikv.common.KVMockServer.State; +import org.tikv.raw.RawKVClient; + +public class SeekProxyStoreTest extends MockThreeStoresTest { + private RawKVClient createClient() { + return session.createRawClient(); + } + + @Test + public void testSeekProxyStore() { + RawKVClient client = createClient(); + ByteString key = ByteString.copyFromUtf8("key"); + ByteString value = ByteString.copyFromUtf8("value"); + put(key, value); + + client.put(key, value); + Assert.assertEquals(value, client.get(key).get()); + // Set the leader to state Fail, the request will route to peer 0x2, which is not the leader. + // The state of three peers is the same. + // Thus, with the correct context, the peer 0x2 will return normally. + servers.get(0).setState(State.Fail); + + Assert.assertEquals(value, client.get(key).get()); + } +} From dcfd1685817bffd114f3b7359172a6fce4d681e6 Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Mon, 28 Mar 2022 22:12:07 +0800 Subject: [PATCH 2/2] fix conflict for rebase Signed-off-by: iosmanthus --- .../region/AbstractRegionStoreClient.java | 78 ++++++---- .../java/org/tikv/common/KVMockServer.java | 67 ++++++++- .../java/org/tikv/common/MockServerTest.java | 10 +- .../tikv/common/PDClientIntegrationTest.java | 1 - .../org/tikv/common/PDClientMockTest.java | 130 ++++++++--------- .../java/org/tikv/common/PDMockServer.java | 63 +++++---- .../org/tikv/common/PDMockServerTest.java | 33 ++--- .../org/tikv/common/RegionManagerTest.java | 133 ++++++++++-------- 8 files changed, 294 insertions(+), 221 deletions(-) diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index 5983b73cb1c..f5ac181c15d 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -29,7 +29,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Set; -import java.util.concurrent.*; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.AbstractGRPCClient; @@ -230,6 +230,7 @@ private Boolean seekLeaderStore(BackOffer backOffer) { // switch to leader store store = currentLeaderStore; updateClientStub(); + return true; } return false; } @@ -275,19 +276,27 @@ private Metapb.Peer switchLeaderStore(BackOffer backOffer) { List responses = new LinkedList<>(); for (Metapb.Peer peer : region.getFollowerList()) { ByteString key = region.getStartKey(); - TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer); - ManagedChannel channel = - channelFactory.getChannel( - peerStore.getAddress(), regionManager.getPDClient().getHostMapping()); - TikvGrpc.TikvFutureStub stub = - TikvGrpc.newFutureStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS); - Kvrpcpb.RawGetRequest rawGetRequest = - Kvrpcpb.RawGetRequest.newBuilder() - .setContext(region.getReplicaContext(peer)) - .setKey(key) - .build(); - ListenableFuture task = stub.rawGet(rawGetRequest); - responses.add(new SwitchLeaderTask(task, peer)); + try { + TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer); + ManagedChannel channel = + channelFactory.getChannel( + peerStore.getAddress(), regionManager.getPDClient().getHostMapping()); + TikvGrpc.TikvFutureStub stub = + TikvGrpc.newFutureStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS); + Kvrpcpb.RawGetRequest rawGetRequest = + Kvrpcpb.RawGetRequest.newBuilder() + .setContext(region.getReplicaContext(peer)) + .setKey(key) + .build(); + ListenableFuture task = stub.rawGet(rawGetRequest); + responses.add(new SwitchLeaderTask(task, peer)); + } catch (Exception e) { + logger.warn( + "switch region[{}] leader store to {} failed: {}", + region.getId(), + peer.getStoreId(), + e); + } } while (true) { try { @@ -326,22 +335,31 @@ private TiStore switchProxyStore(BackOffer backOffer) { List responses = new LinkedList<>(); for (Metapb.Peer peer : region.getFollowerList()) { ByteString key = region.getStartKey(); - TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer); - ManagedChannel channel = - channelFactory.getChannel( - peerStore.getAddress(), regionManager.getPDClient().getHostMapping()); - TikvGrpc.TikvFutureStub stub = - TikvGrpc.newFutureStub(channel).withDeadlineAfter(forwardTimeout, TimeUnit.MILLISECONDS); - Metadata header = new Metadata(); - header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress()); - Kvrpcpb.RawGetRequest rawGetRequest = - Kvrpcpb.RawGetRequest.newBuilder() - .setContext(region.getReplicaContext(region.getLeader())) - .setKey(key) - .build(); - ListenableFuture task = - MetadataUtils.attachHeaders(stub, header).rawGet(rawGetRequest); - responses.add(new ForwardCheckTask(task, peerStore.getStore())); + try { + TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer); + ManagedChannel channel = + channelFactory.getChannel( + peerStore.getAddress(), regionManager.getPDClient().getHostMapping()); + TikvGrpc.TikvFutureStub stub = + TikvGrpc.newFutureStub(channel) + .withDeadlineAfter(forwardTimeout, TimeUnit.MILLISECONDS); + Metadata header = new Metadata(); + header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress()); + Kvrpcpb.RawGetRequest rawGetRequest = + Kvrpcpb.RawGetRequest.newBuilder() + .setContext(region.getReplicaContext(region.getLeader())) + .setKey(key) + .build(); + ListenableFuture task = + MetadataUtils.attachHeaders(stub, header).rawGet(rawGetRequest); + responses.add(new ForwardCheckTask(task, peerStore.getStore())); + } catch (Exception e) { + logger.warn( + "switch region[{}] leader store to {} failed: {}", + region.getId(), + peer.getStoreId(), + e); + } } while (true) { try { diff --git a/src/test/java/org/tikv/common/KVMockServer.java b/src/test/java/org/tikv/common/KVMockServer.java index 8032c9e613f..412d9dcfb43 100644 --- a/src/test/java/org/tikv/common/KVMockServer.java +++ b/src/test/java/org/tikv/common/KVMockServer.java @@ -27,10 +27,22 @@ import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.Status; +import io.grpc.health.v1.HealthCheckRequest; +import io.grpc.health.v1.HealthCheckResponse; +import io.grpc.health.v1.HealthCheckResponse.ServingStatus; +import io.grpc.health.v1.HealthGrpc.HealthImplBase; +import io.grpc.stub.StreamObserver; import java.io.IOException; import java.net.ServerSocket; -import java.util.*; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.tikv.common.key.Key; import org.tikv.common.region.TiRegion; import org.tikv.kvproto.Coprocessor; @@ -45,9 +57,11 @@ public class KVMockServer extends TikvGrpc.TikvImplBase { + private static final Logger logger = LoggerFactory.getLogger(KVMockServer.class); private int port; private Server server; private TiRegion region; + private State state = State.Normal; private final TreeMap dataMap = new TreeMap<>(); private final Map errorMap = new HashMap<>(); @@ -64,10 +78,23 @@ public class KVMockServer extends TikvGrpc.TikvImplBase { public static final int STORE_NOT_MATCH = 9; public static final int RAFT_ENTRY_TOO_LARGE = 10; + public enum State { + Normal, + Fail + } + + public void setState(State state) { + this.state = state; + } + public int getPort() { return port; } + public void setRegion(TiRegion region) { + this.region = region; + } + public void put(ByteString key, ByteString value) { dataMap.put(toRawKey(key), value); } @@ -97,7 +124,7 @@ private void verifyContext(Context context) throws Exception { if (context.getRegionId() != region.getId() || !context.getRegionEpoch().equals(region.getRegionEpoch()) || !context.getPeer().equals(region.getLeader())) { - throw new Exception(); + throw new Exception("context doesn't match"); } } @@ -106,6 +133,11 @@ public void rawGet( org.tikv.kvproto.Kvrpcpb.RawGetRequest request, io.grpc.stub.StreamObserver responseObserver) { try { + switch (state) { + case Fail: + throw new Exception(State.Fail.toString()); + default: + } verifyContext(request.getContext()); ByteString key = request.getKey(); @@ -116,7 +148,12 @@ public void rawGet( setErrorInfo(errorCode, errBuilder); builder.setRegionError(errBuilder.build()); } else { - builder.setValue(dataMap.get(toRawKey(key))); + Key rawKey = toRawKey(key); + ByteString value = dataMap.get(rawKey); + if (value == null) { + value = ByteString.EMPTY; + } + builder.setValue(value); } responseObserver.onNext(builder.build()); responseObserver.onCompleted(); @@ -139,7 +176,6 @@ public void rawPut( if (errorCode != null) { setErrorInfo(errorCode, errBuilder); builder.setRegionError(errBuilder.build()); - // builder.setError(""); } responseObserver.onNext(builder.build()); responseObserver.onCompleted(); @@ -349,14 +385,33 @@ public void coprocessor( } public int start(TiRegion region) throws IOException { + int port; try (ServerSocket s = new ServerSocket(0)) { port = s.getLocalPort(); } - server = ServerBuilder.forPort(port).addService(this).build().start(); + start(region, port); + return port; + } + + private static class HealCheck extends HealthImplBase { + @Override + public void check( + HealthCheckRequest request, StreamObserver responseObserver) { + responseObserver.onNext( + HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVING).build()); + responseObserver.onCompleted(); + } + } + + public void start(TiRegion region, int port) throws IOException { + this.port = port; this.region = region; + + logger.info("start mock server on port: " + port); + server = + ServerBuilder.forPort(port).addService(new HealCheck()).addService(this).build().start(); Runtime.getRuntime().addShutdownHook(new Thread(KVMockServer.this::stop)); - return port; } public void stop() { diff --git a/src/test/java/org/tikv/common/MockServerTest.java b/src/test/java/org/tikv/common/MockServerTest.java index de861622475..14edcdcd4ca 100644 --- a/src/test/java/org/tikv/common/MockServerTest.java +++ b/src/test/java/org/tikv/common/MockServerTest.java @@ -35,8 +35,8 @@ public class MockServerTest extends PDMockServerTest { @Before @Override - public void setUp() throws IOException { - super.setUp(); + public void setup() throws IOException { + super.setup(); Metapb.Region r = Metapb.Region.newBuilder() @@ -62,9 +62,11 @@ public void setUp() throws IOException { r.getPeers(0), r.getPeersList(), s.stream().map(TiStore::new).collect(Collectors.toList())); - pdServer.addGetRegionResp(Pdpb.GetRegionResponse.newBuilder().setRegion(r).build()); + leader.addGetRegionListener( + request -> Pdpb.GetRegionResponse.newBuilder().setRegion(r).build()); for (Metapb.Store store : s) { - pdServer.addGetStoreResp(Pdpb.GetStoreResponse.newBuilder().setStore(store).build()); + leader.addGetStoreListener( + (request) -> Pdpb.GetStoreResponse.newBuilder().setStore(store).build()); } server = new KVMockServer(); port = server.start(region); diff --git a/src/test/java/org/tikv/common/PDClientIntegrationTest.java b/src/test/java/org/tikv/common/PDClientIntegrationTest.java index a78ad2b411c..fe0c82d74ad 100644 --- a/src/test/java/org/tikv/common/PDClientIntegrationTest.java +++ b/src/test/java/org/tikv/common/PDClientIntegrationTest.java @@ -27,7 +27,6 @@ import org.tikv.BaseRawKVTest; public class PDClientIntegrationTest extends BaseRawKVTest { - private TiSession session; @Before diff --git a/src/test/java/org/tikv/common/PDClientMockTest.java b/src/test/java/org/tikv/common/PDClientMockTest.java index 24278892579..a8074d94572 100644 --- a/src/test/java/org/tikv/common/PDClientMockTest.java +++ b/src/test/java/org/tikv/common/PDClientMockTest.java @@ -28,6 +28,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; import org.tikv.common.exception.GrpcException; import org.tikv.common.meta.TiTimestamp; @@ -39,19 +40,14 @@ import org.tikv.kvproto.Metapb.StoreState; public class PDClientMockTest extends PDMockServerTest { - - private static final String LOCAL_ADDR_IPV6 = "[::]"; + private static final String LOCAL_ADDR_IPV6 = "[::1]"; public static final String HTTP = "http://"; @Test public void testCreate() throws Exception { try (PDClient client = session.getPDClient()) { -<<<<<<< HEAD - assertEquals(LOCAL_ADDR + ":" + pdServer.port, client.getPdClientWrapper().getLeaderInfo()); -======= assertEquals( LOCAL_ADDR + ":" + leader.getPort(), client.getPdClientWrapper().getLeaderInfo()); ->>>>>>> f4e7c302a... [close #570] mockserver: fix unstable mock server cluster setup (#571) assertEquals(CLUSTER_ID, client.getHeader().getClusterId()); } } @@ -59,33 +55,19 @@ public void testCreate() throws Exception { @Test public void testSwitchLeader() throws Exception { try (PDClient client = session.getPDClient()) { -<<<<<<< HEAD - client.trySwitchLeader(HTTP + LOCAL_ADDR + ":" + (pdServer.port + 1)); - assertEquals( - client.getPdClientWrapper().getLeaderInfo(), - HTTP + LOCAL_ADDR + ":" + (pdServer.port + 1)); -======= // Switch leader to server 1 client.trySwitchLeader(HTTP + LOCAL_ADDR + ":" + pdServers.get(1).getPort()); assertEquals( client.getPdClientWrapper().getLeaderInfo(), HTTP + LOCAL_ADDR + ":" + pdServers.get(1).getPort()); ->>>>>>> f4e7c302a... [close #570] mockserver: fix unstable mock server cluster setup (#571) } tearDown(); - setUp(LOCAL_ADDR_IPV6); + setup(LOCAL_ADDR_IPV6); try (PDClient client = session.getPDClient()) { -<<<<<<< HEAD - client.trySwitchLeader(HTTP + LOCAL_ADDR_IPV6 + ":" + (pdServer.port + 2)); - assertEquals( - client.getPdClientWrapper().getLeaderInfo(), - HTTP + LOCAL_ADDR_IPV6 + ":" + (pdServer.port + 2)); -======= client.trySwitchLeader(HTTP + LOCAL_ADDR_IPV6 + ":" + pdServers.get(2).getPort()); assertEquals( client.getPdClientWrapper().getLeaderInfo(), HTTP + LOCAL_ADDR_IPV6 + ":" + pdServers.get(2).getPort()); ->>>>>>> f4e7c302a... [close #570] mockserver: fix unstable mock server cluster setup (#571) } } @@ -104,16 +86,17 @@ public void testGetRegionByKey() throws Exception { byte[] endKey = new byte[] {1, 0, 2, 5}; int confVer = 1026; int ver = 1027; - pdServer.addGetRegionResp( - GrpcUtils.makeGetRegionResponse( - pdServer.getClusterId(), - GrpcUtils.makeRegion( - 1, - ByteString.copyFrom(startKey), - ByteString.copyFrom(endKey), - GrpcUtils.makeRegionEpoch(confVer, ver), - GrpcUtils.makePeer(1, 10), - GrpcUtils.makePeer(2, 20)))); + leader.addGetRegionListener( + request -> + GrpcUtils.makeGetRegionResponse( + leader.getClusterId(), + GrpcUtils.makeRegion( + 1, + ByteString.copyFrom(startKey), + ByteString.copyFrom(endKey), + GrpcUtils.makeRegionEpoch(confVer, ver), + GrpcUtils.makePeer(1, 10), + GrpcUtils.makePeer(2, 20)))); try (PDClient client = session.getPDClient()) { Pair rl = client.getRegionByKey(defaultBackOff(), ByteString.EMPTY); @@ -135,16 +118,17 @@ public void testGetRegionById() throws Exception { int confVer = 1026; int ver = 1027; - pdServer.addGetRegionByIDResp( - GrpcUtils.makeGetRegionResponse( - pdServer.getClusterId(), - GrpcUtils.makeRegion( - 1, - ByteString.copyFrom(startKey), - ByteString.copyFrom(endKey), - GrpcUtils.makeRegionEpoch(confVer, ver), - GrpcUtils.makePeer(1, 10), - GrpcUtils.makePeer(2, 20)))); + leader.addGetRegionByIDListener( + request -> + GrpcUtils.makeGetRegionResponse( + leader.getClusterId(), + GrpcUtils.makeRegion( + 1, + ByteString.copyFrom(startKey), + ByteString.copyFrom(endKey), + GrpcUtils.makeRegionEpoch(confVer, ver), + GrpcUtils.makePeer(1, 10), + GrpcUtils.makePeer(2, 20)))); try (PDClient client = session.getPDClient()) { Pair rl = client.getRegionByID(defaultBackOff(), 0); Metapb.Region r = rl.first; @@ -162,15 +146,16 @@ public void testGetRegionById() throws Exception { public void testGetStore() throws Exception { long storeId = 1; String testAddress = "testAddress"; - pdServer.addGetStoreResp( - GrpcUtils.makeGetStoreResponse( - pdServer.getClusterId(), - GrpcUtils.makeStore( - storeId, - testAddress, - Metapb.StoreState.Up, - GrpcUtils.makeStoreLabel("k1", "v1"), - GrpcUtils.makeStoreLabel("k2", "v2")))); + leader.addGetStoreListener( + request -> + GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), + GrpcUtils.makeStore( + storeId, + testAddress, + Metapb.StoreState.Up, + GrpcUtils.makeStoreLabel("k1", "v1"), + GrpcUtils.makeStoreLabel("k2", "v2")))); try (PDClient client = session.getPDClient()) { Store r = client.getStore(defaultBackOff(), storeId); assertEquals(storeId, r.getId()); @@ -181,10 +166,11 @@ public void testGetStore() throws Exception { assertEquals("v1", r.getLabels(0).getValue()); assertEquals("v2", r.getLabels(1).getValue()); - pdServer.addGetStoreResp( - GrpcUtils.makeGetStoreResponse( - pdServer.getClusterId(), - GrpcUtils.makeStore(storeId, testAddress, Metapb.StoreState.Tombstone))); + leader.addGetStoreListener( + request -> + GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), + GrpcUtils.makeStore(storeId, testAddress, Metapb.StoreState.Tombstone))); assertEquals(StoreState.Tombstone, client.getStore(defaultBackOff(), storeId).getState()); } } @@ -197,11 +183,16 @@ private BackOffer defaultBackOff() { public void testRetryPolicy() throws Exception { long storeId = 1024; ExecutorService service = Executors.newCachedThreadPool(); - pdServer.addGetStoreResp(null); - pdServer.addGetStoreResp(null); - pdServer.addGetStoreResp( - GrpcUtils.makeGetStoreResponse( - pdServer.getClusterId(), GrpcUtils.makeStore(storeId, "", Metapb.StoreState.Up))); + AtomicInteger i = new AtomicInteger(); + leader.addGetStoreListener( + request -> { + if (i.getAndIncrement() < 2) { + return null; + } else { + return GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), GrpcUtils.makeStore(storeId, "", Metapb.StoreState.Up)); + } + }); try (PDClient client = session.getPDClient()) { Callable storeCallable = () -> client.getStore(ConcreteBackOffer.newCustomBackOff(5000), 0); @@ -214,16 +205,17 @@ public void testRetryPolicy() throws Exception { } // Should fail - pdServer.addGetStoreResp(null); - pdServer.addGetStoreResp(null); - pdServer.addGetStoreResp(null); - pdServer.addGetStoreResp(null); - pdServer.addGetStoreResp(null); - pdServer.addGetStoreResp(null); - - pdServer.addGetStoreResp( - GrpcUtils.makeGetStoreResponse( - pdServer.getClusterId(), GrpcUtils.makeStore(storeId, "", Metapb.StoreState.Up))); + AtomicInteger j = new AtomicInteger(); + leader.addGetStoreListener( + request -> { + if (j.getAndIncrement() < 6) { + return null; + } else { + return GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), GrpcUtils.makeStore(storeId, "", Metapb.StoreState.Up)); + } + }); + try { client.getStore(defaultBackOff(), 0); } catch (GrpcException e) { diff --git a/src/test/java/org/tikv/common/PDMockServer.java b/src/test/java/org/tikv/common/PDMockServer.java index 7a8f69fa5aa..96c4970af91 100644 --- a/src/test/java/org/tikv/common/PDMockServer.java +++ b/src/test/java/org/tikv/common/PDMockServer.java @@ -23,33 +23,37 @@ import io.grpc.stub.StreamObserver; import java.io.IOException; import java.net.ServerSocket; -import java.util.Deque; import java.util.Optional; -import java.util.concurrent.LinkedBlockingDeque; +import java.util.function.Function; import org.tikv.kvproto.PDGrpc; -import org.tikv.kvproto.Pdpb.*; +import org.tikv.kvproto.Pdpb.GetMembersRequest; +import org.tikv.kvproto.Pdpb.GetMembersResponse; +import org.tikv.kvproto.Pdpb.GetRegionByIDRequest; +import org.tikv.kvproto.Pdpb.GetRegionRequest; +import org.tikv.kvproto.Pdpb.GetRegionResponse; +import org.tikv.kvproto.Pdpb.GetStoreRequest; +import org.tikv.kvproto.Pdpb.GetStoreResponse; +import org.tikv.kvproto.Pdpb.TsoRequest; +import org.tikv.kvproto.Pdpb.TsoResponse; public class PDMockServer extends PDGrpc.PDImplBase { -<<<<<<< HEAD - public int port; -======= private int port; ->>>>>>> f4e7c302a... [close #570] mockserver: fix unstable mock server cluster setup (#571) private long clusterId; - private Server server; - public void addGetMemberResp(GetMembersResponse r) { - getMembersResp.addLast(Optional.ofNullable(r)); - } + private Function getMembersListener; + private Function getStoreListener; + private Function getRegionListener; + private Function getRegionByIDListener; - private final Deque> getMembersResp = - new LinkedBlockingDeque>(); + public void addGetMembersListener(Function func) { + getMembersListener = func; + } @Override public void getMembers(GetMembersRequest request, StreamObserver resp) { try { - resp.onNext(getMembersResp.getFirst().get()); + resp.onNext(Optional.ofNullable(getMembersListener.apply(request)).get()); resp.onCompleted(); } catch (Exception e) { resp.onError(Status.INTERNAL.asRuntimeException()); @@ -76,47 +80,42 @@ public void onCompleted() { }; } - public void addGetRegionResp(GetRegionResponse r) { - getRegionResp.addLast(r); + public void addGetRegionListener(Function func) { + getRegionListener = func; } - private final Deque getRegionResp = new LinkedBlockingDeque<>(); - @Override public void getRegion(GetRegionRequest request, StreamObserver resp) { try { - resp.onNext(getRegionResp.removeFirst()); + resp.onNext(getRegionListener.apply(request)); resp.onCompleted(); } catch (Exception e) { resp.onError(Status.INTERNAL.asRuntimeException()); } } - public void addGetRegionByIDResp(GetRegionResponse r) { - getRegionByIDResp.addLast(r); + public void addGetRegionByIDListener(Function func) { + getRegionByIDListener = func; } - private final Deque getRegionByIDResp = new LinkedBlockingDeque<>(); - @Override public void getRegionByID(GetRegionByIDRequest request, StreamObserver resp) { try { - resp.onNext(getRegionByIDResp.removeFirst()); + resp.onNext(getRegionByIDListener.apply(request)); resp.onCompleted(); } catch (Exception e) { resp.onError(Status.INTERNAL.asRuntimeException()); } } - public void addGetStoreResp(GetStoreResponse r) { - getStoreResp.addLast(Optional.ofNullable(r)); + public void addGetStoreListener(Function func) { + getStoreListener = func; } - private final Deque> getStoreResp = new LinkedBlockingDeque<>(); - + @Override public void getStore(GetStoreRequest request, StreamObserver resp) { try { - resp.onNext(getStoreResp.removeFirst().get()); + resp.onNext(Optional.ofNullable(getStoreListener.apply(request)).get()); resp.onCompleted(); } catch (Exception e) { resp.onError(Status.INTERNAL.asRuntimeException()); @@ -124,10 +123,16 @@ public void getStore(GetStoreRequest request, StreamObserver r } public void start(long clusterId) throws IOException { + int port; try (ServerSocket s = new ServerSocket(0)) { port = s.getLocalPort(); } + start(clusterId, port); + } + + public void start(long clusterId, int port) throws IOException { this.clusterId = clusterId; + this.port = port; server = ServerBuilder.forPort(port).addService(this).build().start(); Runtime.getRuntime().addShutdownHook(new Thread(PDMockServer.this::stop)); diff --git a/src/test/java/org/tikv/common/PDMockServerTest.java b/src/test/java/org/tikv/common/PDMockServerTest.java index f4a56df8ced..c43af58032c 100644 --- a/src/test/java/org/tikv/common/PDMockServerTest.java +++ b/src/test/java/org/tikv/common/PDMockServerTest.java @@ -18,33 +18,24 @@ package org.tikv.common; import java.io.IOException; +import java.net.ServerSocket; +import java.util.ArrayList; +import java.util.List; import org.junit.After; import org.junit.Before; public abstract class PDMockServerTest { protected static final String LOCAL_ADDR = "127.0.0.1"; static final long CLUSTER_ID = 1024; - protected static TiSession session; - protected PDMockServer pdServer; + protected TiSession session; + protected PDMockServer leader; + protected List pdServers = new ArrayList<>(); @Before - public void setUp() throws IOException { - setUp(LOCAL_ADDR); + public void setup() throws IOException { + setup(LOCAL_ADDR); } -<<<<<<< HEAD - void setUp(String addr) throws IOException { - pdServer = new PDMockServer(); - pdServer.start(CLUSTER_ID); - pdServer.addGetMemberResp( - GrpcUtils.makeGetMembersResponse( - pdServer.getClusterId(), - GrpcUtils.makeMember(1, "http://" + addr + ":" + pdServer.port), - GrpcUtils.makeMember(2, "http://" + addr + ":" + (pdServer.port + 1)), - GrpcUtils.makeMember(3, "http://" + addr + ":" + (pdServer.port + 2)))); - TiConfiguration conf = TiConfiguration.createDefault(addr + ":" + pdServer.port); - conf.setEnableGrpcForward(false); -======= void setup(String addr) throws IOException { int[] ports = new int[3]; for (int i = 0; i < ports.length; i++) { @@ -70,16 +61,18 @@ void setup(String addr) throws IOException { } TiConfiguration conf = TiConfiguration.createDefault(addr + ":" + ports[0]); ->>>>>>> f4e7c302a... [close #570] mockserver: fix unstable mock server cluster setup (#571) conf.setKvMode("RAW"); - conf.setTest(true); + conf.setWarmUpEnable(false); conf.setTimeout(2000); + conf.setEnableGrpcForward(true); session = TiSession.create(conf); } @After public void tearDown() throws Exception { session.close(); - pdServer.stop(); + for (PDMockServer server : pdServers) { + server.stop(); + } } } diff --git a/src/test/java/org/tikv/common/RegionManagerTest.java b/src/test/java/org/tikv/common/RegionManagerTest.java index 58687ef1059..1c9f7235936 100644 --- a/src/test/java/org/tikv/common/RegionManagerTest.java +++ b/src/test/java/org/tikv/common/RegionManagerTest.java @@ -24,6 +24,7 @@ import com.google.common.collect.TreeRangeMap; import com.google.protobuf.ByteString; import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Before; import org.junit.Test; import org.tikv.common.key.Key; @@ -40,8 +41,8 @@ public class RegionManagerTest extends PDMockServerTest { @Before @Override - public void setUp() throws IOException { - super.setUp(); + public void setup() throws IOException { + super.setup(); mgr = session.getRegionManager(); } @@ -64,27 +65,30 @@ public void getRegionByKey() { int ver = 1027; long regionId = 233; String testAddress = "127.0.0.1"; - pdServer.addGetRegionResp( - GrpcUtils.makeGetRegionResponse( - pdServer.getClusterId(), - GrpcUtils.makeRegion( - regionId, - GrpcUtils.encodeKey(startKey.toByteArray()), - GrpcUtils.encodeKey(endKey.toByteArray()), - GrpcUtils.makeRegionEpoch(confVer, ver), - GrpcUtils.makePeer(1, 10), - GrpcUtils.makePeer(2, 20)))); - for (long id : new long[] {10, 20}) { - pdServer.addGetStoreResp( - GrpcUtils.makeGetStoreResponse( - pdServer.getClusterId(), - GrpcUtils.makeStore( - id, - testAddress, - Metapb.StoreState.Up, - GrpcUtils.makeStoreLabel("k1", "v1"), - GrpcUtils.makeStoreLabel("k2", "v2")))); - } + leader.addGetRegionListener( + request -> + GrpcUtils.makeGetRegionResponse( + leader.getClusterId(), + GrpcUtils.makeRegion( + regionId, + GrpcUtils.encodeKey(startKey.toByteArray()), + GrpcUtils.encodeKey(endKey.toByteArray()), + GrpcUtils.makeRegionEpoch(confVer, ver), + GrpcUtils.makePeer(1, 10), + GrpcUtils.makePeer(2, 20)))); + + AtomicInteger i = new AtomicInteger(0); + long[] ids = new long[] {10, 20}; + leader.addGetStoreListener( + (request -> + GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), + GrpcUtils.makeStore( + ids[i.getAndIncrement()], + testAddress, + StoreState.Up, + GrpcUtils.makeStoreLabel("k1", "v1"), + GrpcUtils.makeStoreLabel("k2", "v2"))))); TiRegion region = mgr.getRegionByKey(startKey); assertEquals(region.getId(), regionId); @@ -103,27 +107,30 @@ public void getStoreByKey() { int confVer = 1026; int ver = 1027; long regionId = 233; - pdServer.addGetRegionResp( - GrpcUtils.makeGetRegionResponse( - pdServer.getClusterId(), - GrpcUtils.makeRegion( - regionId, - GrpcUtils.encodeKey(startKey.toByteArray()), - GrpcUtils.encodeKey(endKey.toByteArray()), - GrpcUtils.makeRegionEpoch(confVer, ver), - GrpcUtils.makePeer(storeId, 10), - GrpcUtils.makePeer(storeId + 1, 20)))); - for (long id : new long[] {10, 20}) { - pdServer.addGetStoreResp( - GrpcUtils.makeGetStoreResponse( - pdServer.getClusterId(), - GrpcUtils.makeStore( - id, - testAddress, - Metapb.StoreState.Up, - GrpcUtils.makeStoreLabel("k1", "v1"), - GrpcUtils.makeStoreLabel("k2", "v2")))); - } + leader.addGetRegionListener( + request -> + GrpcUtils.makeGetRegionResponse( + leader.getClusterId(), + GrpcUtils.makeRegion( + regionId, + GrpcUtils.encodeKey(startKey.toByteArray()), + GrpcUtils.encodeKey(endKey.toByteArray()), + GrpcUtils.makeRegionEpoch(confVer, ver), + GrpcUtils.makePeer(storeId, 10), + GrpcUtils.makePeer(storeId + 1, 20)))); + + AtomicInteger i = new AtomicInteger(0); + long[] ids = new long[] {10, 20}; + leader.addGetStoreListener( + (request -> + GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), + GrpcUtils.makeStore( + ids[i.getAndIncrement()], + testAddress, + StoreState.Up, + GrpcUtils.makeStoreLabel("k1", "v1"), + GrpcUtils.makeStoreLabel("k2", "v2"))))); Pair pair = mgr.getRegionStorePairByKey(searchKey); assertEquals(pair.first.getId(), regionId); @@ -134,27 +141,29 @@ public void getStoreByKey() { public void getStoreById() { long storeId = 234; String testAddress = "testAddress"; - pdServer.addGetStoreResp( - GrpcUtils.makeGetStoreResponse( - pdServer.getClusterId(), - GrpcUtils.makeStore( - storeId, - testAddress, - Metapb.StoreState.Up, - GrpcUtils.makeStoreLabel("k1", "v1"), - GrpcUtils.makeStoreLabel("k2", "v2")))); + leader.addGetStoreListener( + request -> + GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), + GrpcUtils.makeStore( + storeId, + testAddress, + Metapb.StoreState.Up, + GrpcUtils.makeStoreLabel("k1", "v1"), + GrpcUtils.makeStoreLabel("k2", "v2")))); TiStore store = mgr.getStoreById(storeId); assertEquals(store.getStore().getId(), storeId); - pdServer.addGetStoreResp( - GrpcUtils.makeGetStoreResponse( - pdServer.getClusterId(), - GrpcUtils.makeStore( - storeId + 1, - testAddress, - StoreState.Tombstone, - GrpcUtils.makeStoreLabel("k1", "v1"), - GrpcUtils.makeStoreLabel("k2", "v2")))); + leader.addGetStoreListener( + request -> + GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), + GrpcUtils.makeStore( + storeId + 1, + testAddress, + StoreState.Tombstone, + GrpcUtils.makeStoreLabel("k1", "v1"), + GrpcUtils.makeStoreLabel("k2", "v2")))); try { mgr.getStoreById(storeId + 1);