Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/main/java/org/tikv/common/PDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ public TiRegion getRegionByKey(BackOffer backOffer, ByteString key) {
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode(),
conf.getReplicaRead());
conf.getReplicaSelector());
} finally {
requestTimer.observeDuration();
}
Expand All @@ -261,7 +261,7 @@ public Future<TiRegion> getRegionByKeyAsync(BackOffer backOffer, ByteString key)
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode(),
conf.getReplicaRead()));
conf.getReplicaSelector()));
Supplier<GetRegionRequest> request =
() -> GetRegionRequest.newBuilder().setHeader(header).setRegionKey(key).build();

Expand All @@ -288,7 +288,7 @@ public TiRegion getRegionByID(BackOffer backOffer, long id) {
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode(),
conf.getReplicaRead());
conf.getReplicaSelector());
}

@Override
Expand All @@ -302,7 +302,7 @@ public Future<TiRegion> getRegionByIDAsync(BackOffer backOffer, long id) {
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode(),
conf.getReplicaRead()));
conf.getReplicaSelector()));

Supplier<GetRegionByIDRequest> request =
() -> GetRegionByIDRequest.newBuilder().setHeader(header).setRegionId(id).build();
Expand Down
28 changes: 28 additions & 0 deletions src/main/java/org/tikv/common/TiConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.pd.PDUtils;
import org.tikv.common.replica.ReplicaSelector;
import org.tikv.kvproto.Kvrpcpb.CommandPri;
import org.tikv.kvproto.Kvrpcpb.IsolationLevel;

Expand Down Expand Up @@ -247,6 +248,8 @@ private static ReplicaRead getReplicaRead(String key) {

private int kvClientConcurrency = getInt(TIKV_KV_CLIENT_CONCURRENCY);
private ReplicaRead replicaRead = getReplicaRead(TIKV_REPLICA_READ);
private ReplicaSelector internalReplicaSelector = getReplicaSelector(replicaRead);
private ReplicaSelector replicaSelector;

private boolean metricsEnable = getBoolean(TIKV_METRICS_ENABLE);
private int metricsPort = getInt(TIKV_METRICS_PORT);
Expand Down Expand Up @@ -480,9 +483,34 @@ public ReplicaRead getReplicaRead() {

public TiConfiguration setReplicaRead(ReplicaRead replicaRead) {
this.replicaRead = replicaRead;
this.internalReplicaSelector = getReplicaSelector(this.replicaRead);
return this;
}

private ReplicaSelector getReplicaSelector(ReplicaRead replicaRead) {
if (TiConfiguration.ReplicaRead.LEADER.equals(replicaRead)) {
return ReplicaSelector.LEADER;
} else if (TiConfiguration.ReplicaRead.FOLLOWER.equals(replicaRead)) {
return ReplicaSelector.FOLLOWER;
} else if (TiConfiguration.ReplicaRead.LEADER_AND_FOLLOWER.equals(replicaRead)) {
return ReplicaSelector.LEADER_AND_FOLLOWER;
} else {
return null;
}
}

public ReplicaSelector getReplicaSelector() {
if (replicaSelector != null) {
return replicaSelector;
} else {
return internalReplicaSelector;
}
}

public void setReplicaSelector(ReplicaSelector replicaSelector) {
this.replicaSelector = replicaSelector;
}

public boolean isMetricsEnable() {
return metricsEnable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,7 @@ public List<TiRegion> splitRegion(Iterable<ByteString> splitKeys) {
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode(),
conf.getReplicaRead()))
conf.getReplicaSelector()))
.collect(Collectors.toList());
}

Expand Down
33 changes: 6 additions & 27 deletions src/main/java/org/tikv/common/region/TiRegion.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,18 @@
import com.google.protobuf.ByteString;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.TiConfiguration;
import org.tikv.common.TiConfiguration.KVMode;
import org.tikv.common.codec.Codec.BytesCodec;
import org.tikv.common.codec.CodecDataInput;
import org.tikv.common.codec.KeyUtils;
import org.tikv.common.exception.TiClientInternalException;
import org.tikv.common.key.Key;
import org.tikv.common.replica.ReplicaSelector;
import org.tikv.common.util.FastByteComparisons;
import org.tikv.common.util.KeyRangeUtils;
import org.tikv.kvproto.Kvrpcpb;
Expand All @@ -49,32 +48,23 @@ public class TiRegion implements Serializable {
private final IsolationLevel isolationLevel;
private final Kvrpcpb.CommandPri commandPri;
private final Peer leader;
private final TiConfiguration.ReplicaRead replicaRead;
private final ReplicaSelector replicaSelector;
private final List<Peer> replicaList;
private int replicaIdx;

public TiRegion(
Region meta,
Peer leader,
IsolationLevel isolationLevel,
Kvrpcpb.CommandPri commandPri,
KVMode kvMode) {
this(meta, leader, isolationLevel, commandPri, kvMode, TiConfiguration.ReplicaRead.LEADER);
}

public TiRegion(
Region meta,
Peer leader,
IsolationLevel isolationLevel,
Kvrpcpb.CommandPri commandPri,
KVMode kvMode,
TiConfiguration.ReplicaRead replicaRead) {
ReplicaSelector replicaSelector) {
Objects.requireNonNull(meta, "meta is null");
this.meta = decodeRegion(meta, kvMode == KVMode.RAW);
this.kvMode = kvMode;
this.isolationLevel = isolationLevel;
this.commandPri = commandPri;
this.replicaRead = replicaRead;
this.replicaSelector = replicaSelector;
if (leader == null || leader.getId() == 0) {
if (meta.getPeersCount() == 0) {
throw new TiClientInternalException("Empty peer list for region " + meta.getId());
Expand All @@ -86,18 +76,7 @@ public TiRegion(
}

// init replicaList
List<Peer> followerList = getFollowerList();
replicaList = new ArrayList<>();
if (TiConfiguration.ReplicaRead.LEADER.equals(replicaRead)) {
replicaList.add(this.leader);
} else if (TiConfiguration.ReplicaRead.FOLLOWER.equals(replicaRead)) {
replicaList.addAll(followerList);
Collections.shuffle(replicaList);
} else if (TiConfiguration.ReplicaRead.LEADER_AND_FOLLOWER.equals(replicaRead)) {
replicaList.addAll(followerList);
Collections.shuffle(replicaList);
replicaList.add(this.leader);
}
replicaList = replicaSelector.select(this.leader, getFollowerList(), getLearnerList());
replicaIdx = 0;
}

Expand Down Expand Up @@ -230,7 +209,7 @@ public TiRegion switchPeer(long leaderStoreID) {
for (Peer p : peers) {
if (p.getStoreId() == leaderStoreID) {
return new TiRegion(
this.meta, p, this.isolationLevel, this.commandPri, this.kvMode, this.replicaRead);
this.meta, p, this.isolationLevel, this.commandPri, this.kvMode, this.replicaSelector);
}
}
return null;
Expand Down
31 changes: 31 additions & 0 deletions src/main/java/org/tikv/common/replica/FollowerReplicaSelector.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.tikv.common.replica;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.tikv.kvproto.Metapb;

public class FollowerReplicaSelector implements ReplicaSelector {
@Override
public List<Metapb.Peer> select(
Metapb.Peer leader, List<Metapb.Peer> followers, List<Metapb.Peer> learners) {
List<Metapb.Peer> list = new ArrayList<>(followers);
Collections.shuffle(list);
return list;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.tikv.common.replica;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.tikv.kvproto.Metapb;

public class LeaderFollowerReplicaSelector implements ReplicaSelector {
@Override
public List<Metapb.Peer> select(
Metapb.Peer leader, List<Metapb.Peer> followers, List<Metapb.Peer> learners) {
List<Metapb.Peer> list = new ArrayList<>(followers);
Collections.shuffle(list);
list.add(leader);
return list;
}
}
30 changes: 30 additions & 0 deletions src/main/java/org/tikv/common/replica/LeaderReplicaSelector.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.tikv.common.replica;

import java.util.ArrayList;
import java.util.List;
import org.tikv.kvproto.Metapb;

public class LeaderReplicaSelector implements ReplicaSelector {
@Override
public List<Metapb.Peer> select(
Metapb.Peer leader, List<Metapb.Peer> followers, List<Metapb.Peer> learners) {
List<Metapb.Peer> list = new ArrayList<>();
list.add(leader);
return list;
}
}
29 changes: 29 additions & 0 deletions src/main/java/org/tikv/common/replica/ReplicaSelector.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.tikv.common.replica;

import java.io.Serializable;
import java.util.List;
import org.tikv.kvproto.Metapb;

public interface ReplicaSelector extends Serializable {
public static final ReplicaSelector LEADER = new LeaderReplicaSelector();
public static final ReplicaSelector FOLLOWER = new FollowerReplicaSelector();
public static final ReplicaSelector LEADER_AND_FOLLOWER = new LeaderFollowerReplicaSelector();

List<Metapb.Peer> select(
Metapb.Peer leader, List<Metapb.Peer> followers, List<Metapb.Peer> learners);
}
4 changes: 3 additions & 1 deletion src/test/java/org/tikv/common/MockServerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.junit.Before;
import org.tikv.common.TiConfiguration.KVMode;
import org.tikv.common.region.TiRegion;
import org.tikv.common.replica.ReplicaSelector;
import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.Pdpb;

Expand Down Expand Up @@ -33,7 +34,8 @@ public void setUp() throws IOException {
r.getPeers(0),
session.getConf().getIsolationLevel(),
session.getConf().getCommandPriority(),
KVMode.TXN);
KVMode.TXN,
ReplicaSelector.LEADER);
pdServer.addGetRegionResp(Pdpb.GetRegionResponse.newBuilder().setRegion(r).build());
server = new KVMockServer();
port = server.start(region);
Expand Down
27 changes: 27 additions & 0 deletions src/test/java/org/tikv/txn/ReplicaReadTest.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package org.tikv.txn;

import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.List;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.tikv.common.TiConfiguration;
import org.tikv.common.TiSession;
import org.tikv.common.replica.ReplicaSelector;
import org.tikv.kvproto.Metapb;

public class ReplicaReadTest extends TXNTest {
private TiSession session;
Expand All @@ -30,6 +34,29 @@ public void leadAndFollowerReadTest() {
doTest(TiConfiguration.ReplicaRead.LEADER_AND_FOLLOWER);
}

@Test
public void replicaSelectorTest() {
TiConfiguration conf = TiConfiguration.createDefault();

conf.setReplicaSelector(
new ReplicaSelector() {
@Override
public List<Metapb.Peer> select(
Metapb.Peer leader, List<Metapb.Peer> followers, List<Metapb.Peer> learners) {
List<Metapb.Peer> list = new ArrayList<>();
list.addAll(followers);
list.addAll(learners);
list.add(leader);
return list;
}
});
session = TiSession.create(conf);

putKV(key, value);
ByteString v = session.createSnapshot().get(ByteString.copyFromUtf8(key));
Assert.assertEquals(value, v.toStringUtf8());
}

private void doTest(TiConfiguration.ReplicaRead replicaRead) {
TiConfiguration conf = TiConfiguration.createDefault();
conf.setReplicaRead(replicaRead);
Expand Down