diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index e7270bff913..17e9d0460fe 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -244,7 +244,7 @@ public TiRegion getRegionByKey(BackOffer backOffer, ByteString key) { conf.getIsolationLevel(), conf.getCommandPriority(), conf.getKvMode(), - conf.getReplicaRead()); + conf.getReplicaSelector()); } finally { requestTimer.observeDuration(); } @@ -261,7 +261,7 @@ public Future getRegionByKeyAsync(BackOffer backOffer, ByteString key) conf.getIsolationLevel(), conf.getCommandPriority(), conf.getKvMode(), - conf.getReplicaRead())); + conf.getReplicaSelector())); Supplier request = () -> GetRegionRequest.newBuilder().setHeader(header).setRegionKey(key).build(); @@ -288,7 +288,7 @@ public TiRegion getRegionByID(BackOffer backOffer, long id) { conf.getIsolationLevel(), conf.getCommandPriority(), conf.getKvMode(), - conf.getReplicaRead()); + conf.getReplicaSelector()); } @Override @@ -302,7 +302,7 @@ public Future getRegionByIDAsync(BackOffer backOffer, long id) { conf.getIsolationLevel(), conf.getCommandPriority(), conf.getKvMode(), - conf.getReplicaRead())); + conf.getReplicaSelector())); Supplier request = () -> GetRegionByIDRequest.newBuilder().setHeader(header).setRegionId(id).build(); diff --git a/src/main/java/org/tikv/common/TiConfiguration.java b/src/main/java/org/tikv/common/TiConfiguration.java index 5af96a6d48e..0e97028d06f 100644 --- a/src/main/java/org/tikv/common/TiConfiguration.java +++ b/src/main/java/org/tikv/common/TiConfiguration.java @@ -24,6 +24,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.pd.PDUtils; +import org.tikv.common.replica.ReplicaSelector; import org.tikv.kvproto.Kvrpcpb.CommandPri; import org.tikv.kvproto.Kvrpcpb.IsolationLevel; @@ -247,6 +248,8 @@ private static ReplicaRead getReplicaRead(String key) { private int kvClientConcurrency = getInt(TIKV_KV_CLIENT_CONCURRENCY); private ReplicaRead replicaRead = getReplicaRead(TIKV_REPLICA_READ); + private ReplicaSelector internalReplicaSelector = getReplicaSelector(replicaRead); + private ReplicaSelector replicaSelector; private boolean metricsEnable = getBoolean(TIKV_METRICS_ENABLE); private int metricsPort = getInt(TIKV_METRICS_PORT); @@ -480,9 +483,34 @@ public ReplicaRead getReplicaRead() { public TiConfiguration setReplicaRead(ReplicaRead replicaRead) { this.replicaRead = replicaRead; + this.internalReplicaSelector = getReplicaSelector(this.replicaRead); return this; } + private ReplicaSelector getReplicaSelector(ReplicaRead replicaRead) { + if (TiConfiguration.ReplicaRead.LEADER.equals(replicaRead)) { + return ReplicaSelector.LEADER; + } else if (TiConfiguration.ReplicaRead.FOLLOWER.equals(replicaRead)) { + return ReplicaSelector.FOLLOWER; + } else if (TiConfiguration.ReplicaRead.LEADER_AND_FOLLOWER.equals(replicaRead)) { + return ReplicaSelector.LEADER_AND_FOLLOWER; + } else { + return null; + } + } + + public ReplicaSelector getReplicaSelector() { + if (replicaSelector != null) { + return replicaSelector; + } else { + return internalReplicaSelector; + } + } + + public void setReplicaSelector(ReplicaSelector replicaSelector) { + this.replicaSelector = replicaSelector; + } + public boolean isMetricsEnable() { return metricsEnable; } diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java index 7e5d500f412..6ea51aa9cb8 100644 --- a/src/main/java/org/tikv/common/region/RegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java @@ -790,7 +790,7 @@ public List splitRegion(Iterable splitKeys) { conf.getIsolationLevel(), conf.getCommandPriority(), conf.getKvMode(), - conf.getReplicaRead())) + conf.getReplicaSelector())) .collect(Collectors.toList()); } diff --git a/src/main/java/org/tikv/common/region/TiRegion.java b/src/main/java/org/tikv/common/region/TiRegion.java index 19dac68f1a2..6170d0bbc92 100644 --- a/src/main/java/org/tikv/common/region/TiRegion.java +++ b/src/main/java/org/tikv/common/region/TiRegion.java @@ -20,19 +20,18 @@ import com.google.protobuf.ByteString; import java.io.Serializable; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.tikv.common.TiConfiguration; import org.tikv.common.TiConfiguration.KVMode; import org.tikv.common.codec.Codec.BytesCodec; import org.tikv.common.codec.CodecDataInput; import org.tikv.common.codec.KeyUtils; import org.tikv.common.exception.TiClientInternalException; import org.tikv.common.key.Key; +import org.tikv.common.replica.ReplicaSelector; import org.tikv.common.util.FastByteComparisons; import org.tikv.common.util.KeyRangeUtils; import org.tikv.kvproto.Kvrpcpb; @@ -49,32 +48,23 @@ public class TiRegion implements Serializable { private final IsolationLevel isolationLevel; private final Kvrpcpb.CommandPri commandPri; private final Peer leader; - private final TiConfiguration.ReplicaRead replicaRead; + private final ReplicaSelector replicaSelector; private final List replicaList; private int replicaIdx; - public TiRegion( - Region meta, - Peer leader, - IsolationLevel isolationLevel, - Kvrpcpb.CommandPri commandPri, - KVMode kvMode) { - this(meta, leader, isolationLevel, commandPri, kvMode, TiConfiguration.ReplicaRead.LEADER); - } - public TiRegion( Region meta, Peer leader, IsolationLevel isolationLevel, Kvrpcpb.CommandPri commandPri, KVMode kvMode, - TiConfiguration.ReplicaRead replicaRead) { + ReplicaSelector replicaSelector) { Objects.requireNonNull(meta, "meta is null"); this.meta = decodeRegion(meta, kvMode == KVMode.RAW); this.kvMode = kvMode; this.isolationLevel = isolationLevel; this.commandPri = commandPri; - this.replicaRead = replicaRead; + this.replicaSelector = replicaSelector; if (leader == null || leader.getId() == 0) { if (meta.getPeersCount() == 0) { throw new TiClientInternalException("Empty peer list for region " + meta.getId()); @@ -86,18 +76,7 @@ public TiRegion( } // init replicaList - List followerList = getFollowerList(); - replicaList = new ArrayList<>(); - if (TiConfiguration.ReplicaRead.LEADER.equals(replicaRead)) { - replicaList.add(this.leader); - } else if (TiConfiguration.ReplicaRead.FOLLOWER.equals(replicaRead)) { - replicaList.addAll(followerList); - Collections.shuffle(replicaList); - } else if (TiConfiguration.ReplicaRead.LEADER_AND_FOLLOWER.equals(replicaRead)) { - replicaList.addAll(followerList); - Collections.shuffle(replicaList); - replicaList.add(this.leader); - } + replicaList = replicaSelector.select(this.leader, getFollowerList(), getLearnerList()); replicaIdx = 0; } @@ -230,7 +209,7 @@ public TiRegion switchPeer(long leaderStoreID) { for (Peer p : peers) { if (p.getStoreId() == leaderStoreID) { return new TiRegion( - this.meta, p, this.isolationLevel, this.commandPri, this.kvMode, this.replicaRead); + this.meta, p, this.isolationLevel, this.commandPri, this.kvMode, this.replicaSelector); } } return null; diff --git a/src/main/java/org/tikv/common/replica/FollowerReplicaSelector.java b/src/main/java/org/tikv/common/replica/FollowerReplicaSelector.java new file mode 100644 index 00000000000..64aa5cdfe54 --- /dev/null +++ b/src/main/java/org/tikv/common/replica/FollowerReplicaSelector.java @@ -0,0 +1,31 @@ +/* + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.tikv.common.replica; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.tikv.kvproto.Metapb; + +public class FollowerReplicaSelector implements ReplicaSelector { + @Override + public List select( + Metapb.Peer leader, List followers, List learners) { + List list = new ArrayList<>(followers); + Collections.shuffle(list); + return list; + } +} diff --git a/src/main/java/org/tikv/common/replica/LeaderFollowerReplicaSelector.java b/src/main/java/org/tikv/common/replica/LeaderFollowerReplicaSelector.java new file mode 100644 index 00000000000..52845d11ae2 --- /dev/null +++ b/src/main/java/org/tikv/common/replica/LeaderFollowerReplicaSelector.java @@ -0,0 +1,32 @@ +/* + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.tikv.common.replica; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.tikv.kvproto.Metapb; + +public class LeaderFollowerReplicaSelector implements ReplicaSelector { + @Override + public List select( + Metapb.Peer leader, List followers, List learners) { + List list = new ArrayList<>(followers); + Collections.shuffle(list); + list.add(leader); + return list; + } +} diff --git a/src/main/java/org/tikv/common/replica/LeaderReplicaSelector.java b/src/main/java/org/tikv/common/replica/LeaderReplicaSelector.java new file mode 100644 index 00000000000..0dc2bfc3a50 --- /dev/null +++ b/src/main/java/org/tikv/common/replica/LeaderReplicaSelector.java @@ -0,0 +1,30 @@ +/* + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.tikv.common.replica; + +import java.util.ArrayList; +import java.util.List; +import org.tikv.kvproto.Metapb; + +public class LeaderReplicaSelector implements ReplicaSelector { + @Override + public List select( + Metapb.Peer leader, List followers, List learners) { + List list = new ArrayList<>(); + list.add(leader); + return list; + } +} diff --git a/src/main/java/org/tikv/common/replica/ReplicaSelector.java b/src/main/java/org/tikv/common/replica/ReplicaSelector.java new file mode 100644 index 00000000000..144a6956df4 --- /dev/null +++ b/src/main/java/org/tikv/common/replica/ReplicaSelector.java @@ -0,0 +1,29 @@ +/* + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.tikv.common.replica; + +import java.io.Serializable; +import java.util.List; +import org.tikv.kvproto.Metapb; + +public interface ReplicaSelector extends Serializable { + public static final ReplicaSelector LEADER = new LeaderReplicaSelector(); + public static final ReplicaSelector FOLLOWER = new FollowerReplicaSelector(); + public static final ReplicaSelector LEADER_AND_FOLLOWER = new LeaderFollowerReplicaSelector(); + + List select( + Metapb.Peer leader, List followers, List learners); +} diff --git a/src/test/java/org/tikv/common/MockServerTest.java b/src/test/java/org/tikv/common/MockServerTest.java index 7109a886dee..bfce6db50cd 100644 --- a/src/test/java/org/tikv/common/MockServerTest.java +++ b/src/test/java/org/tikv/common/MockServerTest.java @@ -5,6 +5,7 @@ import org.junit.Before; import org.tikv.common.TiConfiguration.KVMode; import org.tikv.common.region.TiRegion; +import org.tikv.common.replica.ReplicaSelector; import org.tikv.kvproto.Metapb; import org.tikv.kvproto.Pdpb; @@ -33,7 +34,8 @@ public void setUp() throws IOException { r.getPeers(0), session.getConf().getIsolationLevel(), session.getConf().getCommandPriority(), - KVMode.TXN); + KVMode.TXN, + ReplicaSelector.LEADER); pdServer.addGetRegionResp(Pdpb.GetRegionResponse.newBuilder().setRegion(r).build()); server = new KVMockServer(); port = server.start(region); diff --git a/src/test/java/org/tikv/txn/ReplicaReadTest.java b/src/test/java/org/tikv/txn/ReplicaReadTest.java index 8a150eaab5a..1baaea2cc66 100644 --- a/src/test/java/org/tikv/txn/ReplicaReadTest.java +++ b/src/test/java/org/tikv/txn/ReplicaReadTest.java @@ -1,6 +1,8 @@ package org.tikv.txn; import com.google.protobuf.ByteString; +import java.util.ArrayList; +import java.util.List; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -8,6 +10,8 @@ import org.junit.Test; import org.tikv.common.TiConfiguration; import org.tikv.common.TiSession; +import org.tikv.common.replica.ReplicaSelector; +import org.tikv.kvproto.Metapb; public class ReplicaReadTest extends TXNTest { private TiSession session; @@ -30,6 +34,29 @@ public void leadAndFollowerReadTest() { doTest(TiConfiguration.ReplicaRead.LEADER_AND_FOLLOWER); } + @Test + public void replicaSelectorTest() { + TiConfiguration conf = TiConfiguration.createDefault(); + + conf.setReplicaSelector( + new ReplicaSelector() { + @Override + public List select( + Metapb.Peer leader, List followers, List learners) { + List list = new ArrayList<>(); + list.addAll(followers); + list.addAll(learners); + list.add(leader); + return list; + } + }); + session = TiSession.create(conf); + + putKV(key, value); + ByteString v = session.createSnapshot().get(ByteString.copyFromUtf8(key)); + Assert.assertEquals(value, v.toStringUtf8()); + } + private void doTest(TiConfiguration.ReplicaRead replicaRead) { TiConfiguration conf = TiConfiguration.createDefault(); conf.setReplicaRead(replicaRead);