From e9a0f59b6ecda1b69196bb0d345d150c62ba6393 Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Fri, 26 Mar 2021 13:34:57 +0800 Subject: [PATCH 1/2] add replica selector Signed-off-by: marsishandsome --- .../tikv/common/InternalReplicaSelector.java | 48 +++++++++++++++++++ src/main/java/org/tikv/common/PDClient.java | 8 ++-- .../java/org/tikv/common/ReplicaSelector.java | 25 ++++++++++ .../java/org/tikv/common/TiConfiguration.java | 15 ++++++ .../tikv/common/region/RegionStoreClient.java | 2 +- .../java/org/tikv/common/region/TiRegion.java | 33 +++---------- .../java/org/tikv/common/MockServerTest.java | 3 +- .../java/org/tikv/txn/ReplicaReadTest.java | 27 +++++++++++ 8 files changed, 128 insertions(+), 33 deletions(-) create mode 100644 src/main/java/org/tikv/common/InternalReplicaSelector.java create mode 100644 src/main/java/org/tikv/common/ReplicaSelector.java diff --git a/src/main/java/org/tikv/common/InternalReplicaSelector.java b/src/main/java/org/tikv/common/InternalReplicaSelector.java new file mode 100644 index 00000000000..8dcbb69b297 --- /dev/null +++ b/src/main/java/org/tikv/common/InternalReplicaSelector.java @@ -0,0 +1,48 @@ +/* + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.tikv.common; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.tikv.kvproto.Metapb; + +public class InternalReplicaSelector implements ReplicaSelector { + private final TiConfiguration.ReplicaRead replicaRead; + + public InternalReplicaSelector(TiConfiguration.ReplicaRead replicaRead) { + this.replicaRead = replicaRead; + } + + @Override + public List select( + Metapb.Peer leader, List followers, List learners) { + List list = new ArrayList<>(); + + if (TiConfiguration.ReplicaRead.LEADER.equals(replicaRead)) { + list.add(leader); + } else if (TiConfiguration.ReplicaRead.FOLLOWER.equals(replicaRead)) { + list.addAll(followers); + Collections.shuffle(list); + } else if (TiConfiguration.ReplicaRead.LEADER_AND_FOLLOWER.equals(replicaRead)) { + list.addAll(followers); + Collections.shuffle(list); + list.add(leader); + } + + return list; + } +} diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index e7270bff913..17e9d0460fe 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -244,7 +244,7 @@ public TiRegion getRegionByKey(BackOffer backOffer, ByteString key) { conf.getIsolationLevel(), conf.getCommandPriority(), conf.getKvMode(), - conf.getReplicaRead()); + conf.getReplicaSelector()); } finally { requestTimer.observeDuration(); } @@ -261,7 +261,7 @@ public Future getRegionByKeyAsync(BackOffer backOffer, ByteString key) conf.getIsolationLevel(), conf.getCommandPriority(), conf.getKvMode(), - conf.getReplicaRead())); + conf.getReplicaSelector())); Supplier request = () -> GetRegionRequest.newBuilder().setHeader(header).setRegionKey(key).build(); @@ -288,7 +288,7 @@ public TiRegion getRegionByID(BackOffer backOffer, long id) { conf.getIsolationLevel(), conf.getCommandPriority(), conf.getKvMode(), - conf.getReplicaRead()); + conf.getReplicaSelector()); } @Override @@ -302,7 +302,7 @@ public Future getRegionByIDAsync(BackOffer backOffer, long id) { conf.getIsolationLevel(), conf.getCommandPriority(), conf.getKvMode(), - conf.getReplicaRead())); + conf.getReplicaSelector())); Supplier request = () -> GetRegionByIDRequest.newBuilder().setHeader(header).setRegionId(id).build(); diff --git a/src/main/java/org/tikv/common/ReplicaSelector.java b/src/main/java/org/tikv/common/ReplicaSelector.java new file mode 100644 index 00000000000..dbe5712b0d4 --- /dev/null +++ b/src/main/java/org/tikv/common/ReplicaSelector.java @@ -0,0 +1,25 @@ +/* + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.tikv.common; + +import java.io.Serializable; +import java.util.List; +import org.tikv.kvproto.Metapb; + +public interface ReplicaSelector extends Serializable { + List select( + Metapb.Peer leader, List followers, List learners); +} diff --git a/src/main/java/org/tikv/common/TiConfiguration.java b/src/main/java/org/tikv/common/TiConfiguration.java index 5af96a6d48e..4d9960a8cc7 100644 --- a/src/main/java/org/tikv/common/TiConfiguration.java +++ b/src/main/java/org/tikv/common/TiConfiguration.java @@ -247,6 +247,8 @@ private static ReplicaRead getReplicaRead(String key) { private int kvClientConcurrency = getInt(TIKV_KV_CLIENT_CONCURRENCY); private ReplicaRead replicaRead = getReplicaRead(TIKV_REPLICA_READ); + private ReplicaSelector internalReplicaSelector = new InternalReplicaSelector(replicaRead); + private ReplicaSelector replicaSelector; private boolean metricsEnable = getBoolean(TIKV_METRICS_ENABLE); private int metricsPort = getInt(TIKV_METRICS_PORT); @@ -480,9 +482,22 @@ public ReplicaRead getReplicaRead() { public TiConfiguration setReplicaRead(ReplicaRead replicaRead) { this.replicaRead = replicaRead; + this.internalReplicaSelector = new InternalReplicaSelector(this.replicaRead); return this; } + public ReplicaSelector getReplicaSelector() { + if (replicaSelector != null) { + return replicaSelector; + } else { + return internalReplicaSelector; + } + } + + public void setReplicaSelector(ReplicaSelector replicaSelector) { + this.replicaSelector = replicaSelector; + } + public boolean isMetricsEnable() { return metricsEnable; } diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java index 7e5d500f412..6ea51aa9cb8 100644 --- a/src/main/java/org/tikv/common/region/RegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java @@ -790,7 +790,7 @@ public List splitRegion(Iterable splitKeys) { conf.getIsolationLevel(), conf.getCommandPriority(), conf.getKvMode(), - conf.getReplicaRead())) + conf.getReplicaSelector())) .collect(Collectors.toList()); } diff --git a/src/main/java/org/tikv/common/region/TiRegion.java b/src/main/java/org/tikv/common/region/TiRegion.java index 19dac68f1a2..20e9169740a 100644 --- a/src/main/java/org/tikv/common/region/TiRegion.java +++ b/src/main/java/org/tikv/common/region/TiRegion.java @@ -20,13 +20,12 @@ import com.google.protobuf.ByteString; import java.io.Serializable; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.tikv.common.TiConfiguration; +import org.tikv.common.ReplicaSelector; import org.tikv.common.TiConfiguration.KVMode; import org.tikv.common.codec.Codec.BytesCodec; import org.tikv.common.codec.CodecDataInput; @@ -49,32 +48,23 @@ public class TiRegion implements Serializable { private final IsolationLevel isolationLevel; private final Kvrpcpb.CommandPri commandPri; private final Peer leader; - private final TiConfiguration.ReplicaRead replicaRead; + private final ReplicaSelector replicaSelector; private final List replicaList; private int replicaIdx; - public TiRegion( - Region meta, - Peer leader, - IsolationLevel isolationLevel, - Kvrpcpb.CommandPri commandPri, - KVMode kvMode) { - this(meta, leader, isolationLevel, commandPri, kvMode, TiConfiguration.ReplicaRead.LEADER); - } - public TiRegion( Region meta, Peer leader, IsolationLevel isolationLevel, Kvrpcpb.CommandPri commandPri, KVMode kvMode, - TiConfiguration.ReplicaRead replicaRead) { + ReplicaSelector replicaSelector) { Objects.requireNonNull(meta, "meta is null"); this.meta = decodeRegion(meta, kvMode == KVMode.RAW); this.kvMode = kvMode; this.isolationLevel = isolationLevel; this.commandPri = commandPri; - this.replicaRead = replicaRead; + this.replicaSelector = replicaSelector; if (leader == null || leader.getId() == 0) { if (meta.getPeersCount() == 0) { throw new TiClientInternalException("Empty peer list for region " + meta.getId()); @@ -86,18 +76,7 @@ public TiRegion( } // init replicaList - List followerList = getFollowerList(); - replicaList = new ArrayList<>(); - if (TiConfiguration.ReplicaRead.LEADER.equals(replicaRead)) { - replicaList.add(this.leader); - } else if (TiConfiguration.ReplicaRead.FOLLOWER.equals(replicaRead)) { - replicaList.addAll(followerList); - Collections.shuffle(replicaList); - } else if (TiConfiguration.ReplicaRead.LEADER_AND_FOLLOWER.equals(replicaRead)) { - replicaList.addAll(followerList); - Collections.shuffle(replicaList); - replicaList.add(this.leader); - } + replicaList = replicaSelector.select(this.leader, getFollowerList(), getLearnerList()); replicaIdx = 0; } @@ -230,7 +209,7 @@ public TiRegion switchPeer(long leaderStoreID) { for (Peer p : peers) { if (p.getStoreId() == leaderStoreID) { return new TiRegion( - this.meta, p, this.isolationLevel, this.commandPri, this.kvMode, this.replicaRead); + this.meta, p, this.isolationLevel, this.commandPri, this.kvMode, this.replicaSelector); } } return null; diff --git a/src/test/java/org/tikv/common/MockServerTest.java b/src/test/java/org/tikv/common/MockServerTest.java index 7109a886dee..1e40a4a3b44 100644 --- a/src/test/java/org/tikv/common/MockServerTest.java +++ b/src/test/java/org/tikv/common/MockServerTest.java @@ -33,7 +33,8 @@ public void setUp() throws IOException { r.getPeers(0), session.getConf().getIsolationLevel(), session.getConf().getCommandPriority(), - KVMode.TXN); + KVMode.TXN, + new InternalReplicaSelector(TiConfiguration.ReplicaRead.LEADER)); pdServer.addGetRegionResp(Pdpb.GetRegionResponse.newBuilder().setRegion(r).build()); server = new KVMockServer(); port = server.start(region); diff --git a/src/test/java/org/tikv/txn/ReplicaReadTest.java b/src/test/java/org/tikv/txn/ReplicaReadTest.java index 8a150eaab5a..6395e5d7692 100644 --- a/src/test/java/org/tikv/txn/ReplicaReadTest.java +++ b/src/test/java/org/tikv/txn/ReplicaReadTest.java @@ -1,13 +1,17 @@ package org.tikv.txn; import com.google.protobuf.ByteString; +import java.util.ArrayList; +import java.util.List; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import org.tikv.common.ReplicaSelector; import org.tikv.common.TiConfiguration; import org.tikv.common.TiSession; +import org.tikv.kvproto.Metapb; public class ReplicaReadTest extends TXNTest { private TiSession session; @@ -30,6 +34,29 @@ public void leadAndFollowerReadTest() { doTest(TiConfiguration.ReplicaRead.LEADER_AND_FOLLOWER); } + @Test + public void replicaSelectorTest() { + TiConfiguration conf = TiConfiguration.createDefault(); + + conf.setReplicaSelector( + new ReplicaSelector() { + @Override + public List select( + Metapb.Peer leader, List followers, List learners) { + List list = new ArrayList<>(); + list.addAll(followers); + list.addAll(learners); + list.add(leader); + return list; + } + }); + session = TiSession.create(conf); + + putKV(key, value); + ByteString v = session.createSnapshot().get(ByteString.copyFromUtf8(key)); + Assert.assertEquals(value, v.toStringUtf8()); + } + private void doTest(TiConfiguration.ReplicaRead replicaRead) { TiConfiguration conf = TiConfiguration.createDefault(); conf.setReplicaRead(replicaRead); From ee1cc833530f8757de3c62cf7f584ea1f51ea47d Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Fri, 26 Mar 2021 15:22:56 +0800 Subject: [PATCH 2/2] address code review Signed-off-by: marsishandsome --- .../java/org/tikv/common/TiConfiguration.java | 17 ++++++++-- .../java/org/tikv/common/region/TiRegion.java | 2 +- .../replica/FollowerReplicaSelector.java | 31 ++++++++++++++++++ .../LeaderFollowerReplicaSelector.java | 32 +++++++++++++++++++ .../LeaderReplicaSelector.java} | 24 ++------------ .../common/{ => replica}/ReplicaSelector.java | 6 +++- .../java/org/tikv/common/MockServerTest.java | 3 +- .../java/org/tikv/txn/ReplicaReadTest.java | 2 +- 8 files changed, 90 insertions(+), 27 deletions(-) create mode 100644 src/main/java/org/tikv/common/replica/FollowerReplicaSelector.java create mode 100644 src/main/java/org/tikv/common/replica/LeaderFollowerReplicaSelector.java rename src/main/java/org/tikv/common/{InternalReplicaSelector.java => replica/LeaderReplicaSelector.java} (53%) rename src/main/java/org/tikv/common/{ => replica}/ReplicaSelector.java (73%) diff --git a/src/main/java/org/tikv/common/TiConfiguration.java b/src/main/java/org/tikv/common/TiConfiguration.java index 4d9960a8cc7..0e97028d06f 100644 --- a/src/main/java/org/tikv/common/TiConfiguration.java +++ b/src/main/java/org/tikv/common/TiConfiguration.java @@ -24,6 +24,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.pd.PDUtils; +import org.tikv.common.replica.ReplicaSelector; import org.tikv.kvproto.Kvrpcpb.CommandPri; import org.tikv.kvproto.Kvrpcpb.IsolationLevel; @@ -247,7 +248,7 @@ private static ReplicaRead getReplicaRead(String key) { private int kvClientConcurrency = getInt(TIKV_KV_CLIENT_CONCURRENCY); private ReplicaRead replicaRead = getReplicaRead(TIKV_REPLICA_READ); - private ReplicaSelector internalReplicaSelector = new InternalReplicaSelector(replicaRead); + private ReplicaSelector internalReplicaSelector = getReplicaSelector(replicaRead); private ReplicaSelector replicaSelector; private boolean metricsEnable = getBoolean(TIKV_METRICS_ENABLE); @@ -482,10 +483,22 @@ public ReplicaRead getReplicaRead() { public TiConfiguration setReplicaRead(ReplicaRead replicaRead) { this.replicaRead = replicaRead; - this.internalReplicaSelector = new InternalReplicaSelector(this.replicaRead); + this.internalReplicaSelector = getReplicaSelector(this.replicaRead); return this; } + private ReplicaSelector getReplicaSelector(ReplicaRead replicaRead) { + if (TiConfiguration.ReplicaRead.LEADER.equals(replicaRead)) { + return ReplicaSelector.LEADER; + } else if (TiConfiguration.ReplicaRead.FOLLOWER.equals(replicaRead)) { + return ReplicaSelector.FOLLOWER; + } else if (TiConfiguration.ReplicaRead.LEADER_AND_FOLLOWER.equals(replicaRead)) { + return ReplicaSelector.LEADER_AND_FOLLOWER; + } else { + return null; + } + } + public ReplicaSelector getReplicaSelector() { if (replicaSelector != null) { return replicaSelector; diff --git a/src/main/java/org/tikv/common/region/TiRegion.java b/src/main/java/org/tikv/common/region/TiRegion.java index 20e9169740a..6170d0bbc92 100644 --- a/src/main/java/org/tikv/common/region/TiRegion.java +++ b/src/main/java/org/tikv/common/region/TiRegion.java @@ -25,13 +25,13 @@ import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.tikv.common.ReplicaSelector; import org.tikv.common.TiConfiguration.KVMode; import org.tikv.common.codec.Codec.BytesCodec; import org.tikv.common.codec.CodecDataInput; import org.tikv.common.codec.KeyUtils; import org.tikv.common.exception.TiClientInternalException; import org.tikv.common.key.Key; +import org.tikv.common.replica.ReplicaSelector; import org.tikv.common.util.FastByteComparisons; import org.tikv.common.util.KeyRangeUtils; import org.tikv.kvproto.Kvrpcpb; diff --git a/src/main/java/org/tikv/common/replica/FollowerReplicaSelector.java b/src/main/java/org/tikv/common/replica/FollowerReplicaSelector.java new file mode 100644 index 00000000000..64aa5cdfe54 --- /dev/null +++ b/src/main/java/org/tikv/common/replica/FollowerReplicaSelector.java @@ -0,0 +1,31 @@ +/* + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.tikv.common.replica; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.tikv.kvproto.Metapb; + +public class FollowerReplicaSelector implements ReplicaSelector { + @Override + public List select( + Metapb.Peer leader, List followers, List learners) { + List list = new ArrayList<>(followers); + Collections.shuffle(list); + return list; + } +} diff --git a/src/main/java/org/tikv/common/replica/LeaderFollowerReplicaSelector.java b/src/main/java/org/tikv/common/replica/LeaderFollowerReplicaSelector.java new file mode 100644 index 00000000000..52845d11ae2 --- /dev/null +++ b/src/main/java/org/tikv/common/replica/LeaderFollowerReplicaSelector.java @@ -0,0 +1,32 @@ +/* + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.tikv.common.replica; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.tikv.kvproto.Metapb; + +public class LeaderFollowerReplicaSelector implements ReplicaSelector { + @Override + public List select( + Metapb.Peer leader, List followers, List learners) { + List list = new ArrayList<>(followers); + Collections.shuffle(list); + list.add(leader); + return list; + } +} diff --git a/src/main/java/org/tikv/common/InternalReplicaSelector.java b/src/main/java/org/tikv/common/replica/LeaderReplicaSelector.java similarity index 53% rename from src/main/java/org/tikv/common/InternalReplicaSelector.java rename to src/main/java/org/tikv/common/replica/LeaderReplicaSelector.java index 8dcbb69b297..0dc2bfc3a50 100644 --- a/src/main/java/org/tikv/common/InternalReplicaSelector.java +++ b/src/main/java/org/tikv/common/replica/LeaderReplicaSelector.java @@ -13,36 +13,18 @@ * limitations under the License. */ -package org.tikv.common; +package org.tikv.common.replica; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import org.tikv.kvproto.Metapb; -public class InternalReplicaSelector implements ReplicaSelector { - private final TiConfiguration.ReplicaRead replicaRead; - - public InternalReplicaSelector(TiConfiguration.ReplicaRead replicaRead) { - this.replicaRead = replicaRead; - } - +public class LeaderReplicaSelector implements ReplicaSelector { @Override public List select( Metapb.Peer leader, List followers, List learners) { List list = new ArrayList<>(); - - if (TiConfiguration.ReplicaRead.LEADER.equals(replicaRead)) { - list.add(leader); - } else if (TiConfiguration.ReplicaRead.FOLLOWER.equals(replicaRead)) { - list.addAll(followers); - Collections.shuffle(list); - } else if (TiConfiguration.ReplicaRead.LEADER_AND_FOLLOWER.equals(replicaRead)) { - list.addAll(followers); - Collections.shuffle(list); - list.add(leader); - } - + list.add(leader); return list; } } diff --git a/src/main/java/org/tikv/common/ReplicaSelector.java b/src/main/java/org/tikv/common/replica/ReplicaSelector.java similarity index 73% rename from src/main/java/org/tikv/common/ReplicaSelector.java rename to src/main/java/org/tikv/common/replica/ReplicaSelector.java index dbe5712b0d4..144a6956df4 100644 --- a/src/main/java/org/tikv/common/ReplicaSelector.java +++ b/src/main/java/org/tikv/common/replica/ReplicaSelector.java @@ -13,13 +13,17 @@ * limitations under the License. */ -package org.tikv.common; +package org.tikv.common.replica; import java.io.Serializable; import java.util.List; import org.tikv.kvproto.Metapb; public interface ReplicaSelector extends Serializable { + public static final ReplicaSelector LEADER = new LeaderReplicaSelector(); + public static final ReplicaSelector FOLLOWER = new FollowerReplicaSelector(); + public static final ReplicaSelector LEADER_AND_FOLLOWER = new LeaderFollowerReplicaSelector(); + List select( Metapb.Peer leader, List followers, List learners); } diff --git a/src/test/java/org/tikv/common/MockServerTest.java b/src/test/java/org/tikv/common/MockServerTest.java index 1e40a4a3b44..bfce6db50cd 100644 --- a/src/test/java/org/tikv/common/MockServerTest.java +++ b/src/test/java/org/tikv/common/MockServerTest.java @@ -5,6 +5,7 @@ import org.junit.Before; import org.tikv.common.TiConfiguration.KVMode; import org.tikv.common.region.TiRegion; +import org.tikv.common.replica.ReplicaSelector; import org.tikv.kvproto.Metapb; import org.tikv.kvproto.Pdpb; @@ -34,7 +35,7 @@ public void setUp() throws IOException { session.getConf().getIsolationLevel(), session.getConf().getCommandPriority(), KVMode.TXN, - new InternalReplicaSelector(TiConfiguration.ReplicaRead.LEADER)); + ReplicaSelector.LEADER); pdServer.addGetRegionResp(Pdpb.GetRegionResponse.newBuilder().setRegion(r).build()); server = new KVMockServer(); port = server.start(region); diff --git a/src/test/java/org/tikv/txn/ReplicaReadTest.java b/src/test/java/org/tikv/txn/ReplicaReadTest.java index 6395e5d7692..1baaea2cc66 100644 --- a/src/test/java/org/tikv/txn/ReplicaReadTest.java +++ b/src/test/java/org/tikv/txn/ReplicaReadTest.java @@ -8,9 +8,9 @@ import org.junit.Before; import org.junit.Ignore; import org.junit.Test; -import org.tikv.common.ReplicaSelector; import org.tikv.common.TiConfiguration; import org.tikv.common.TiSession; +import org.tikv.common.replica.ReplicaSelector; import org.tikv.kvproto.Metapb; public class ReplicaReadTest extends TXNTest {