Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .ci/integration_test.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,13 @@ def call(ghprbActualCommit, ghprbPullId, ghprbPullTitle, ghprbPullLink, ghprbPul
killall -9 pd-server || true
killall -9 java || true
sleep 10
bin/pd-server --name=pd --data-dir=pd --config=../.ci/config/pd.toml &>pd.log &
bin/pd-server --name=pd --data-dir=pd --config=../config/pd.toml &>pd.log &
sleep 10
bin/tikv-server --pd=127.0.0.1:2379 -s tikv --addr=0.0.0.0:20160 --advertise-addr=127.0.0.1:20160 --config=../.ci/config/tikv.toml &>tikv.log &
bin/tikv-server --pd=127.0.0.1:2379 -s tikv --addr=0.0.0.0:20160 --advertise-addr=127.0.0.1:20160 --config=../config/tikv.toml &>tikv.log &
sleep 10
ps aux | grep '-server' || true
curl -s 127.0.0.1:2379/pd/api/v1/status || true
bin/tidb-server --store=tikv --path="127.0.0.1:2379" --config=../.ci/config/tidb.toml &>tidb.log &
bin/tidb-server --store=tikv --path="127.0.0.1:2379" --config=../config/tidb.toml &>tidb.log &
sleep 60
"""
}
Expand Down
4 changes: 4 additions & 0 deletions config/pd.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# PD Configuration.
[replication]
enable-placement-rules = true
max-replicas = 1
1 change: 1 addition & 0 deletions config/tidb.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# TiDB Configuration.
5 changes: 5 additions & 0 deletions config/tikv.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# TiKV Configuration.

[raftstore]
# set store capacity, if no set, use disk capacity.
capacity = "8G"
8 changes: 6 additions & 2 deletions src/main/java/org/tikv/common/ConfigUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class ConfigUtils {
public static final String TIKV_KV_CLIENT_CONCURRENCY = "tikv.kv_client_concurrency";

public static final String TIKV_KV_MODE = "tikv.kv_mode";
public static final String TIKV_IS_REPLICA_READ = "tikv.is_replica_read";
public static final String TIKV_REPLICA_READ = "tikv.replica_read";

public static final String TIKV_METRICS_ENABLE = "tikv.metrics.enable";
public static final String TIKV_METRICS_PORT = "tikv.metrics.port";
Expand Down Expand Up @@ -72,7 +72,7 @@ public class ConfigUtils {
public static final String DEF_DB_PREFIX = "";
public static final int DEF_KV_CLIENT_CONCURRENCY = 10;
public static final TiConfiguration.KVMode DEF_KV_MODE = TiConfiguration.KVMode.TXN;
public static final boolean DEF_IS_REPLICA_READ = false;
public static final String DEF_REPLICA_READ = "LEADER";
public static final boolean DEF_METRICS_ENABLE = false;
public static final int DEF_METRICS_PORT = 3140;
public static final String DEF_TIKV_NETWORK_MAPPING_NAME = "";
Expand All @@ -86,4 +86,8 @@ public class ConfigUtils {

public static final String RAW_KV_MODE = "RAW";
public static final String TXN_KV_MODE = "TXN";

public static final String LEADER = "LEADER";
public static final String FOLLOWER = "FOLLOWER";
public static final String LEADER_AND_FOLLOWER = "LEADER_AND_FOLLOWER";
}
12 changes: 6 additions & 6 deletions src/main/java/org/tikv/common/PDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ public TiRegion getRegionByKey(BackOffer backOffer, ByteString key) {
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode(),
conf.isReplicaRead());
conf.getReplicaRead());
} finally {
requestTimer.observeDuration();
}
Expand All @@ -261,7 +261,7 @@ public Future<TiRegion> getRegionByKeyAsync(BackOffer backOffer, ByteString key)
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode(),
conf.isReplicaRead()));
conf.getReplicaRead()));
Supplier<GetRegionRequest> request =
() -> GetRegionRequest.newBuilder().setHeader(header).setRegionKey(key).build();

Expand All @@ -288,7 +288,7 @@ public TiRegion getRegionByID(BackOffer backOffer, long id) {
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode(),
conf.isReplicaRead());
conf.getReplicaRead());
}

@Override
Expand All @@ -302,7 +302,7 @@ public Future<TiRegion> getRegionByIDAsync(BackOffer backOffer, long id) {
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode(),
conf.isReplicaRead()));
conf.getReplicaRead()));

Supplier<GetRegionByIDRequest> request =
() -> GetRegionByIDRequest.newBuilder().setHeader(header).setRegionId(id).build();
Expand Down Expand Up @@ -361,8 +361,8 @@ public List<Store> getAllStores(BackOffer backOffer) {
}

@Override
public boolean isReplicaRead() {
return conf.isReplicaRead();
public TiConfiguration.ReplicaRead getReplicaRead() {
return conf.getReplicaRead();
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/tikv/common/ReadOnlyPDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,5 @@ public interface ReadOnlyPDClient {

List<Store> getAllStores(BackOffer backOffer);

boolean isReplicaRead();
TiConfiguration.ReplicaRead getReplicaRead();
}
29 changes: 23 additions & 6 deletions src/main/java/org/tikv/common/TiConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private static void loadFromDefaultProperties() {
setIfMissing(TIKV_DB_PREFIX, DEF_DB_PREFIX);
setIfMissing(TIKV_KV_CLIENT_CONCURRENCY, DEF_KV_CLIENT_CONCURRENCY);
setIfMissing(TIKV_KV_MODE, TXN_KV_MODE);
setIfMissing(TIKV_IS_REPLICA_READ, DEF_IS_REPLICA_READ);
setIfMissing(TIKV_REPLICA_READ, DEF_REPLICA_READ);
setIfMissing(TIKV_METRICS_ENABLE, DEF_METRICS_ENABLE);
setIfMissing(TIKV_METRICS_PORT, DEF_METRICS_PORT);
setIfMissing(TIKV_NETWORK_MAPPING_NAME, DEF_TIKV_NETWORK_MAPPING_NAME);
Expand Down Expand Up @@ -216,6 +216,17 @@ private static KVMode getKvMode(String key) {
}
}

private static ReplicaRead getReplicaRead(String key) {
String value = get(key).toUpperCase(Locale.ROOT);
if (FOLLOWER.equals(value)) {
return ReplicaRead.FOLLOWER;
} else if (LEADER_AND_FOLLOWER.equals(value)) {
return ReplicaRead.LEADER_AND_FOLLOWER;
} else {
return ReplicaRead.LEADER;
}
}

private long timeout = getTimeAsMs(TIKV_GRPC_TIMEOUT);
private long scanTimeout = getTimeAsMs(TIKV_GRPC_SCAN_TIMEOUT);
private int maxFrameSize = getInt(TIKV_GRPC_MAX_FRAME_SIZE);
Expand All @@ -235,7 +246,7 @@ private static KVMode getKvMode(String key) {
private KVMode kvMode = getKvMode(TIKV_KV_MODE);

private int kvClientConcurrency = getInt(TIKV_KV_CLIENT_CONCURRENCY);
private boolean isReplicaRead = getBoolean(TIKV_IS_REPLICA_READ);
private ReplicaRead replicaRead = getReplicaRead(TIKV_REPLICA_READ);

private boolean metricsEnable = getBoolean(TIKV_METRICS_ENABLE);
private int metricsPort = getInt(TIKV_METRICS_PORT);
Expand All @@ -247,6 +258,12 @@ public enum KVMode {
RAW
}

public enum ReplicaRead {
LEADER,
FOLLOWER,
LEADER_AND_FOLLOWER
}

public static TiConfiguration createDefault() {
return new TiConfiguration();
}
Expand Down Expand Up @@ -457,12 +474,12 @@ public TiConfiguration setKvClientConcurrency(int kvClientConcurrency) {
return this;
}

public boolean isReplicaRead() {
return isReplicaRead;
public ReplicaRead getReplicaRead() {
return replicaRead;
}

public TiConfiguration setReplicaRead(boolean isReplicaRead) {
this.isReplicaRead = isReplicaRead;
public TiConfiguration setReplicaRead(ReplicaRead replicaRead) {
this.replicaRead = replicaRead;
return this;
}

Expand Down
12 changes: 2 additions & 10 deletions src/main/java/org/tikv/common/region/RegionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ public class RegionManager {
// TODO: the region cache logic need rewrite.
// https://github.com/pingcap/tispark/issues/1170
private final RegionCache cache;
private final boolean isReplicaRead;

private final Function<CacheInvalidateEvent, Void> cacheInvalidateCallback;

Expand All @@ -65,13 +64,11 @@ public class RegionManager {
public RegionManager(
ReadOnlyPDClient pdClient, Function<CacheInvalidateEvent, Void> cacheInvalidateCallback) {
this.cache = new RegionCache(pdClient);
this.isReplicaRead = pdClient.isReplicaRead();
this.cacheInvalidateCallback = cacheInvalidateCallback;
}

public RegionManager(ReadOnlyPDClient pdClient) {
this.cache = new RegionCache(pdClient);
this.isReplicaRead = pdClient.isReplicaRead();
this.cacheInvalidateCallback = null;
}

Expand Down Expand Up @@ -126,13 +123,8 @@ public Pair<TiRegion, Store> getRegionStorePairByKey(

Store store = null;
if (storeType == TiStoreType.TiKV) {
if (isReplicaRead) {
Peer peer = region.getCurrentFollower();
store = cache.getStoreById(peer.getStoreId(), backOffer);
} else {
Peer leader = region.getLeader();
store = cache.getStoreById(leader.getStoreId(), backOffer);
}
Peer peer = region.getCurrentReplica();
store = cache.getStoreById(peer.getStoreId(), backOffer);
} else {
outerLoop:
for (Peer peer : region.getLearnerList()) {
Expand Down
Loading