From bfca20b7a81cbbc25e5ef58277fb3486dc0921ae Mon Sep 17 00:00:00 2001 From: Xiaoguang Sun Date: Wed, 19 May 2021 17:35:54 +0800 Subject: [PATCH 01/13] Support select replica with rich meta data Signed-off-by: Xiaoguang Sun --- src/main/java/org/tikv/common/PDClient.java | 82 +++++++------ .../org/tikv/common/ReadOnlyPDClient.java | 11 +- src/main/java/org/tikv/common/TiSession.java | 14 +-- .../org/tikv/common/region/RegionManager.java | 48 ++++++-- .../tikv/common/region/RegionStoreClient.java | 17 +-- .../java/org/tikv/common/region/TiRegion.java | 68 ++++------- .../replica/FollowerReplicaSelector.java | 13 ++- .../LeaderFollowerReplicaSelector.java | 13 ++- .../common/replica/LeaderReplicaSelector.java | 8 +- .../java/org/tikv/common/replica/Region.java | 59 ++++++++++ .../tikv/common/replica/ReplicaSelector.java | 10 +- .../java/org/tikv/common/replica/Store.java | 109 ++++++++++++++++++ .../java/org/tikv/common/MockServerTest.java | 13 ++- .../java/org/tikv/common/PDClientTest.java | 36 +++--- .../tikv/common/RegionStoreClientTest.java | 2 +- .../java/org/tikv/txn/ReplicaReadTest.java | 14 +-- 16 files changed, 358 insertions(+), 159 deletions(-) create mode 100644 src/main/java/org/tikv/common/replica/Region.java create mode 100644 src/main/java/org/tikv/common/replica/Store.java diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index 9bbda330bad..9dabf90ea8d 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -47,6 +47,7 @@ import org.slf4j.LoggerFactory; import org.tikv.common.TiConfiguration.KVMode; import org.tikv.common.codec.Codec.BytesCodec; +import org.tikv.common.codec.CodecDataInput; import org.tikv.common.codec.CodecDataOutput; import org.tikv.common.codec.KeyUtils; import org.tikv.common.exception.GrpcException; @@ -54,12 +55,13 @@ import org.tikv.common.meta.TiTimestamp; import org.tikv.common.operation.NoopHandler; import org.tikv.common.operation.PDErrorHandler; -import org.tikv.common.region.TiRegion; import org.tikv.common.util.BackOffFunction.BackOffFuncType; import org.tikv.common.util.BackOffer; import org.tikv.common.util.ChannelFactory; import org.tikv.common.util.ConcreteBackOffer; import org.tikv.common.util.FutureObserver; +import org.tikv.common.util.Pair; +import org.tikv.kvproto.Metapb; import org.tikv.kvproto.Metapb.Store; import org.tikv.kvproto.PDGrpc; import org.tikv.kvproto.PDGrpc.PDBlockingStub; @@ -143,7 +145,7 @@ public TiTimestamp getTimestamp(BackOffer backOffer) { * * @param region represents a region info */ - void scatterRegion(TiRegion region, BackOffer backOffer) { + void scatterRegion(Metapb.Region region, BackOffer backOffer) { Supplier request = () -> ScatterRegionRequest.newBuilder().setHeader(header).setRegionId(region.getId()).build(); @@ -167,7 +169,7 @@ void scatterRegion(TiRegion region, BackOffer backOffer) { * * @param region */ - void waitScatterRegionFinish(TiRegion region, BackOffer backOffer) { + void waitScatterRegionFinish(Metapb.Region region, BackOffer backOffer) { for (; ; ) { GetOperatorResponse resp = getOperator(region.getId()); if (resp != null) { @@ -220,7 +222,7 @@ private boolean isScatterRegionFinish(GetOperatorResponse resp) { } @Override - public TiRegion getRegionByKey(BackOffer backOffer, ByteString key) { + public Pair getRegionByKey(BackOffer backOffer, ByteString key) { Histogram.Timer requestTimer = PD_GET_REGION_BY_KEY_REQUEST_LATENCY.startTimer(); try { if (conf.getKvMode() == KVMode.TXN) { @@ -238,30 +240,21 @@ public TiRegion getRegionByKey(BackOffer backOffer, ByteString key) { GetRegionResponse resp = callWithRetry(backOffer, PDGrpc.getGetRegionMethod(), request, handler); - return new TiRegion( - resp.getRegion(), - resp.getLeader(), - conf.getIsolationLevel(), - conf.getCommandPriority(), - conf.getKvMode(), - conf.getReplicaSelector()); + return new Pair(decodeRegion(resp.getRegion()), resp.getLeader()); } finally { requestTimer.observeDuration(); } } @Override - public Future getRegionByKeyAsync(BackOffer backOffer, ByteString key) { - FutureObserver responseObserver = + public Future> getRegionByKeyAsync( + BackOffer backOffer, ByteString key) { + // TODO do not call blocking get store + FutureObserver, GetRegionResponse> responseObserver = new FutureObserver<>( resp -> - new TiRegion( - resp.getRegion(), - resp.getLeader(), - conf.getIsolationLevel(), - conf.getCommandPriority(), - conf.getKvMode(), - conf.getReplicaSelector())); + new Pair( + decodeRegion(resp.getRegion()), resp.getLeader())); Supplier request = () -> GetRegionRequest.newBuilder().setHeader(header).setRegionKey(key).build(); @@ -273,7 +266,7 @@ public Future getRegionByKeyAsync(BackOffer backOffer, ByteString key) } @Override - public TiRegion getRegionByID(BackOffer backOffer, long id) { + public Pair getRegionByID(BackOffer backOffer, long id) { Supplier request = () -> GetRegionByIDRequest.newBuilder().setHeader(header).setRegionId(id).build(); PDErrorHandler handler = @@ -281,28 +274,16 @@ public TiRegion getRegionByID(BackOffer backOffer, long id) { GetRegionResponse resp = callWithRetry(backOffer, PDGrpc.getGetRegionByIDMethod(), request, handler); - // Instead of using default leader instance, explicitly set no leader to null - return new TiRegion( - resp.getRegion(), - resp.getLeader(), - conf.getIsolationLevel(), - conf.getCommandPriority(), - conf.getKvMode(), - conf.getReplicaSelector()); + return new Pair(decodeRegion(resp.getRegion()), resp.getLeader()); } @Override - public Future getRegionByIDAsync(BackOffer backOffer, long id) { - FutureObserver responseObserver = + public Future> getRegionByIDAsync(BackOffer backOffer, long id) { + FutureObserver, GetRegionResponse> responseObserver = new FutureObserver<>( resp -> - new TiRegion( - resp.getRegion(), - resp.getLeader(), - conf.getIsolationLevel(), - conf.getCommandPriority(), - conf.getKvMode(), - conf.getReplicaSelector())); + new Pair( + decodeRegion(resp.getRegion()), resp.getLeader())); Supplier request = () -> GetRegionByIDRequest.newBuilder().setHeader(header).setRegionId(id).build(); @@ -620,4 +601,29 @@ public String toString() { return "[leaderInfo: " + leaderInfo + "]"; } } + + private Metapb.Region decodeRegion(Metapb.Region region) { + boolean isRawRegion = getConf().getKvMode() == KVMode.RAW; + Metapb.Region.Builder builder = + Metapb.Region.newBuilder() + .setId(region.getId()) + .setRegionEpoch(region.getRegionEpoch()) + .addAllPeers(region.getPeersList()); + + if (region.getStartKey().isEmpty() || isRawRegion) { + builder.setStartKey(region.getStartKey()); + } else { + byte[] decodedStartKey = BytesCodec.readBytes(new CodecDataInput(region.getStartKey())); + builder.setStartKey(ByteString.copyFrom(decodedStartKey)); + } + + if (region.getEndKey().isEmpty() || isRawRegion) { + builder.setEndKey(region.getEndKey()); + } else { + byte[] decodedEndKey = BytesCodec.readBytes(new CodecDataInput(region.getEndKey())); + builder.setEndKey(ByteString.copyFrom(decodedEndKey)); + } + + return builder.build(); + } } diff --git a/src/main/java/org/tikv/common/ReadOnlyPDClient.java b/src/main/java/org/tikv/common/ReadOnlyPDClient.java index eab6850abe6..8d1bc144f36 100644 --- a/src/main/java/org/tikv/common/ReadOnlyPDClient.java +++ b/src/main/java/org/tikv/common/ReadOnlyPDClient.java @@ -19,8 +19,9 @@ import java.util.List; import java.util.concurrent.Future; import org.tikv.common.meta.TiTimestamp; -import org.tikv.common.region.TiRegion; import org.tikv.common.util.BackOffer; +import org.tikv.common.util.Pair; +import org.tikv.kvproto.Metapb; import org.tikv.kvproto.Metapb.Store; /** Readonly PD client including only reading related interface Supposed for TiDB-like use cases */ @@ -38,9 +39,9 @@ public interface ReadOnlyPDClient { * @param key key in bytes for locating a region * @return the region whose startKey and endKey range covers the given key */ - TiRegion getRegionByKey(BackOffer backOffer, ByteString key); + Pair getRegionByKey(BackOffer backOffer, ByteString key); - Future getRegionByKeyAsync(BackOffer backOffer, ByteString key); + Future> getRegionByKeyAsync(BackOffer backOffer, ByteString key); /** * Get Region by Region Id @@ -48,9 +49,9 @@ public interface ReadOnlyPDClient { * @param id Region Id * @return the region corresponding to the given Id */ - TiRegion getRegionByID(BackOffer backOffer, long id); + Pair getRegionByID(BackOffer backOffer, long id); - Future getRegionByIDAsync(BackOffer backOffer, long id); + Future> getRegionByIDAsync(BackOffer backOffer, long id); HostMapping getHostMapping(); diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index 41410d64d3b..8c56396d9b0 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -199,7 +199,7 @@ public synchronized RegionManager getRegionManager() { if (res == null) { synchronized (this) { if (regionManager == null) { - regionManager = new RegionManager(getPDClient(), this.cacheInvalidateCallback); + regionManager = new RegionManager(getConf(), getPDClient(), this.cacheInvalidateCallback); } res = regionManager; } @@ -369,7 +369,7 @@ public void splitRegionAndScatter( long startMS = System.currentTimeMillis(); // split region - List newRegions = + List newRegions = splitRegion( splitKeys .stream() @@ -378,7 +378,7 @@ public void splitRegionAndScatter( ConcreteBackOffer.newCustomBackOff(splitRegionBackoffMS)); // scatter region - for (TiRegion newRegion : newRegions) { + for (Metapb.Region newRegion : newRegions) { try { getPDClient() .scatterRegion(newRegion, ConcreteBackOffer.newCustomBackOff(scatterRegionBackoffMS)); @@ -391,7 +391,7 @@ public void splitRegionAndScatter( if (scatterWaitMS > 0) { logger.info("start to wait scatter region finish"); long scatterRegionStartMS = System.currentTimeMillis(); - for (TiRegion newRegion : newRegions) { + for (Metapb.Region newRegion : newRegions) { long remainMS = (scatterRegionStartMS + scatterWaitMS) - System.currentTimeMillis(); if (remainMS <= 0) { logger.warn("wait scatter region timeout"); @@ -408,8 +408,8 @@ public void splitRegionAndScatter( logger.info("splitRegionAndScatter cost {} seconds", (endMS - startMS) / 1000); } - private List splitRegion(List splitKeys, BackOffer backOffer) { - List regions = new ArrayList<>(); + private List splitRegion(List splitKeys, BackOffer backOffer) { + List regions = new ArrayList<>(); Map> groupKeys = groupKeysByRegion(regionManager, splitKeys, backOffer); @@ -431,7 +431,7 @@ private List splitRegion(List splitKeys, BackOffer backOff "split key equal to region start key or end key. Region splitting is not needed."); } else { logger.info("start to split region id={}, split size={}", region.getId(), splits.size()); - List newRegions; + List newRegions; try { newRegions = getRegionStoreClientBuilder().build(region, store).splitRegion(splits); } catch (final TiKVException e) { diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 67eee685cff..31fa9b37898 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -29,9 +29,11 @@ import java.util.List; import java.util.Map; import java.util.function.Function; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.ReadOnlyPDClient; +import org.tikv.common.TiConfiguration; import org.tikv.common.event.CacheInvalidateEvent; import org.tikv.common.exception.GrpcException; import org.tikv.common.exception.TiClientInternalException; @@ -53,6 +55,8 @@ public class RegionManager { private final Function cacheInvalidateCallback; + private final TiConfiguration conf; + public static final Histogram GET_REGION_BY_KEY_REQUEST_LATENCY = Histogram.build() .name("client_java_get_region_by_requests_latency") @@ -62,14 +66,16 @@ public class RegionManager { // To avoid double retrieval, we used the async version of grpc // When rpc not returned, instead of call again, it wait for previous one done public RegionManager( - ReadOnlyPDClient pdClient, Function cacheInvalidateCallback) { - this.cache = new RegionCache(pdClient); + TiConfiguration conf, + ReadOnlyPDClient pdClient, + Function cacheInvalidateCallback) { + this.conf = conf; + this.cache = new RegionCache(conf, pdClient); this.cacheInvalidateCallback = cacheInvalidateCallback; } - public RegionManager(ReadOnlyPDClient pdClient) { - this.cache = new RegionCache(pdClient); - this.cacheInvalidateCallback = null; + public RegionManager(TiConfiguration conf, ReadOnlyPDClient pdClient) { + this(conf, pdClient, null); } public Function getCacheInvalidateCallback() { @@ -208,12 +214,14 @@ public static class RegionCache { private final Map storeCache; private final RangeMap keyToRegionIdCache; private final ReadOnlyPDClient pdClient; + private final TiConfiguration conf; - public RegionCache(ReadOnlyPDClient pdClient) { + public RegionCache(TiConfiguration conf, ReadOnlyPDClient pdClient) { regionCache = new HashMap<>(); storeCache = new HashMap<>(); keyToRegionIdCache = TreeRangeMap.create(); + this.conf = conf; this.pdClient = pdClient; } @@ -234,7 +242,9 @@ public synchronized TiRegion getRegionByKey(ByteString key, BackOffer backOffer) if (regionId == null) { logger.debug("Key not found in keyToRegionIdCache:" + formatBytesUTF8(key)); - TiRegion region = pdClient.getRegionByKey(backOffer, key); + Pair regionAndLeader = + pdClient.getRegionByKey(backOffer, key); + TiRegion region = createRegion(regionAndLeader.first, regionAndLeader.second, backOffer); if (!putRegion(region)) { throw new TiClientInternalException("Invalid Region: " + region.toString()); } @@ -268,7 +278,9 @@ private synchronized TiRegion getRegionById(BackOffer backOffer, long regionId) logger.debug(String.format("getRegionByKey ID[%s] -> Region[%s]", regionId, region)); } if (region == null) { - region = pdClient.getRegionByID(backOffer, regionId); + Pair regionAndLeader = + pdClient.getRegionByID(backOffer, regionId); + region = createRegion(regionAndLeader.first, regionAndLeader.second, backOffer); if (!putRegion(region)) { throw new TiClientInternalException("Invalid Region: " + region.toString()); } @@ -332,5 +344,25 @@ public synchronized Store getStoreById(long id, BackOffer backOffer) { throw new GrpcException(e); } } + + private List getRegionStore(List peers, BackOffer backOffer) { + return peers + .stream() + .map(p -> getStoreById(p.getStoreId(), backOffer)) + .collect(Collectors.toList()); + } + + private TiRegion createRegion(Metapb.Region region, Metapb.Peer leader, BackOffer backOffer) { + List peers = region.getPeersList(); + List stores = getRegionStore(peers, backOffer); + return new TiRegion( + region, + leader, + peers, + stores, + conf.getIsolationLevel(), + conf.getCommandPriority(), + conf.getReplicaSelector()); + } } } diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java index 6ea51aa9cb8..14c68049154 100644 --- a/src/main/java/org/tikv/common/region/RegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java @@ -29,7 +29,6 @@ import io.prometheus.client.Histogram; import java.util.*; import java.util.function.Supplier; -import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.PDClient; @@ -43,6 +42,7 @@ import org.tikv.kvproto.Coprocessor; import org.tikv.kvproto.Errorpb; import org.tikv.kvproto.Kvrpcpb.*; +import org.tikv.kvproto.Metapb; import org.tikv.kvproto.Metapb.Store; import org.tikv.kvproto.TikvGrpc; import org.tikv.kvproto.TikvGrpc.TikvBlockingStub; @@ -745,7 +745,7 @@ public Iterator coprocessStreaming( * @param splitKeys is the split points for a specific region. * @return a split region info. */ - public List splitRegion(Iterable splitKeys) { + public List splitRegion(Iterable splitKeys) { Supplier request = () -> SplitRegionRequest.newBuilder() @@ -780,18 +780,7 @@ public List splitRegion(Iterable splitKeys) { region.getId(), resp.getRegionError().toString())); } - return resp.getRegionsList() - .stream() - .map( - region -> - new TiRegion( - region, - null, - conf.getIsolationLevel(), - conf.getCommandPriority(), - conf.getKvMode(), - conf.getReplicaSelector())) - .collect(Collectors.toList()); + return resp.getRegionsList(); } // APIs for Raw Scan/Put/Get/Delete diff --git a/src/main/java/org/tikv/common/region/TiRegion.java b/src/main/java/org/tikv/common/region/TiRegion.java index 6170d0bbc92..dd07d0d89d3 100644 --- a/src/main/java/org/tikv/common/region/TiRegion.java +++ b/src/main/java/org/tikv/common/region/TiRegion.java @@ -23,11 +23,9 @@ import java.util.List; import java.util.Objects; import java.util.Set; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.tikv.common.TiConfiguration.KVMode; -import org.tikv.common.codec.Codec.BytesCodec; -import org.tikv.common.codec.CodecDataInput; import org.tikv.common.codec.KeyUtils; import org.tikv.common.exception.TiClientInternalException; import org.tikv.common.key.Key; @@ -39,31 +37,35 @@ import org.tikv.kvproto.Metapb; import org.tikv.kvproto.Metapb.Peer; import org.tikv.kvproto.Metapb.Region; +import org.tikv.kvproto.Metapb.Store; public class TiRegion implements Serializable { private static final Logger logger = LoggerFactory.getLogger(TiRegion.class); private final Region meta; - private final KVMode kvMode; private final IsolationLevel isolationLevel; private final Kvrpcpb.CommandPri commandPri; private final Peer leader; private final ReplicaSelector replicaSelector; private final List replicaList; private int replicaIdx; + private final List peers; + private final List stores; public TiRegion( Region meta, Peer leader, + List peers, + List stores, IsolationLevel isolationLevel, Kvrpcpb.CommandPri commandPri, - KVMode kvMode, ReplicaSelector replicaSelector) { Objects.requireNonNull(meta, "meta is null"); - this.meta = decodeRegion(meta, kvMode == KVMode.RAW); - this.kvMode = kvMode; + this.meta = meta; this.isolationLevel = isolationLevel; this.commandPri = commandPri; + this.peers = peers; + this.stores = stores; this.replicaSelector = replicaSelector; if (leader == null || leader.getId() == 0) { if (meta.getPeersCount() == 0) { @@ -76,50 +78,20 @@ public TiRegion( } // init replicaList - replicaList = replicaSelector.select(this.leader, getFollowerList(), getLearnerList()); + // replicaList = replicaSelector.select(this.leader, getFollowerList(), getLearnerList()); + replicaList = + replicaSelector + .select(new org.tikv.common.replica.Region(meta, leader, peers, stores)) + .stream() + .map(org.tikv.common.replica.Store::getPeer) + .collect(Collectors.toList()); replicaIdx = 0; } - private Region decodeRegion(Region region, boolean isRawRegion) { - Region.Builder builder = - Region.newBuilder() - .setId(region.getId()) - .setRegionEpoch(region.getRegionEpoch()) - .addAllPeers(region.getPeersList()); - - if (region.getStartKey().isEmpty() || isRawRegion) { - builder.setStartKey(region.getStartKey()); - } else { - byte[] decodedStartKey = BytesCodec.readBytes(new CodecDataInput(region.getStartKey())); - builder.setStartKey(ByteString.copyFrom(decodedStartKey)); - } - - if (region.getEndKey().isEmpty() || isRawRegion) { - builder.setEndKey(region.getEndKey()); - } else { - byte[] decodedEndKey = BytesCodec.readBytes(new CodecDataInput(region.getEndKey())); - builder.setEndKey(ByteString.copyFrom(decodedEndKey)); - } - - return builder.build(); - } - public Peer getLeader() { return leader; } - public List getFollowerList() { - List peers = new ArrayList<>(); - for (Peer peer : getMeta().getPeersList()) { - if (!peer.equals(this.leader)) { - if (peer.getRole().equals(Metapb.PeerRole.Voter)) { - peers.add(peer); - } - } - } - return peers; - } - public List getLearnerList() { List peers = new ArrayList<>(); for (Peer peer : getMeta().getPeersList()) { @@ -209,7 +181,13 @@ public TiRegion switchPeer(long leaderStoreID) { for (Peer p : peers) { if (p.getStoreId() == leaderStoreID) { return new TiRegion( - this.meta, p, this.isolationLevel, this.commandPri, this.kvMode, this.replicaSelector); + this.meta, + p, + peers, + stores, + this.isolationLevel, + this.commandPri, + this.replicaSelector); } } return null; diff --git a/src/main/java/org/tikv/common/replica/FollowerReplicaSelector.java b/src/main/java/org/tikv/common/replica/FollowerReplicaSelector.java index 64aa5cdfe54..c0c5c125424 100644 --- a/src/main/java/org/tikv/common/replica/FollowerReplicaSelector.java +++ b/src/main/java/org/tikv/common/replica/FollowerReplicaSelector.java @@ -18,13 +18,18 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import org.tikv.kvproto.Metapb; public class FollowerReplicaSelector implements ReplicaSelector { @Override - public List select( - Metapb.Peer leader, List followers, List learners) { - List list = new ArrayList<>(followers); + public List select(Region region) { + Store[] stores = region.getStores(); + Store leader = region.getLeader(); + List list = new ArrayList<>(stores.length); + for (Store store : stores) { + if (!store.isLearner() && !leader.equals(store)) { + list.add(store); + } + } Collections.shuffle(list); return list; } diff --git a/src/main/java/org/tikv/common/replica/LeaderFollowerReplicaSelector.java b/src/main/java/org/tikv/common/replica/LeaderFollowerReplicaSelector.java index 52845d11ae2..94b1aee2c89 100644 --- a/src/main/java/org/tikv/common/replica/LeaderFollowerReplicaSelector.java +++ b/src/main/java/org/tikv/common/replica/LeaderFollowerReplicaSelector.java @@ -18,13 +18,18 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import org.tikv.kvproto.Metapb; public class LeaderFollowerReplicaSelector implements ReplicaSelector { @Override - public List select( - Metapb.Peer leader, List followers, List learners) { - List list = new ArrayList<>(followers); + public List select(Region region) { + Store[] stores = region.getStores(); + Store leader = region.getLeader(); + List list = new ArrayList<>(stores.length); + for (Store store : stores) { + if (!store.isLearner() && !leader.equals(store)) { + list.add(store); + } + } Collections.shuffle(list); list.add(leader); return list; diff --git a/src/main/java/org/tikv/common/replica/LeaderReplicaSelector.java b/src/main/java/org/tikv/common/replica/LeaderReplicaSelector.java index 0dc2bfc3a50..e654e621783 100644 --- a/src/main/java/org/tikv/common/replica/LeaderReplicaSelector.java +++ b/src/main/java/org/tikv/common/replica/LeaderReplicaSelector.java @@ -17,14 +17,12 @@ import java.util.ArrayList; import java.util.List; -import org.tikv.kvproto.Metapb; public class LeaderReplicaSelector implements ReplicaSelector { @Override - public List select( - Metapb.Peer leader, List followers, List learners) { - List list = new ArrayList<>(); - list.add(leader); + public List select(Region region) { + List list = new ArrayList<>(1); + list.add(region.getLeader()); return list; } } diff --git a/src/main/java/org/tikv/common/replica/Region.java b/src/main/java/org/tikv/common/replica/Region.java new file mode 100644 index 00000000000..9f3779feec1 --- /dev/null +++ b/src/main/java/org/tikv/common/replica/Region.java @@ -0,0 +1,59 @@ +package org.tikv.common.replica; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import org.tikv.kvproto.Metapb; + +public class Region { + private final Metapb.Region region; + private final Store[] stores; + private Store leaderStore; + + public Region( + final Metapb.Region region, + final Metapb.Peer leader, + final List peers, + final List stores) { + this.region = region; + this.stores = new Store[stores.size()]; + Iterator peer = peers.iterator(); + Iterator store = stores.iterator(); + for (int idx = 0; idx < peers.size(); idx++) { + Metapb.Peer currentPeer = peer.next(); + boolean isLeader = currentPeer.equals(leader); + this.stores[idx] = new Store(currentPeer, store.next(), isLeader); + if (isLeader) { + leaderStore = this.stores[idx]; + } + } + } + + public Store[] getStores() { + return stores; + } + + public Store getLeader() { + return leaderStore; + } + + public long getId() { + return region.getId(); + } + + public byte[] getStartKey() { + return region.getStartKey().toByteArray(); + } + + public byte[] getEndKey() { + return region.getEndKey().toByteArray(); + } + + public String toString() { + return "{\nregion:{\n" + + region.toString() + + "},\nstores:[\n" + + Arrays.deepToString(stores) + + "]\n}"; + } +} diff --git a/src/main/java/org/tikv/common/replica/ReplicaSelector.java b/src/main/java/org/tikv/common/replica/ReplicaSelector.java index 144a6956df4..ad4d6609b15 100644 --- a/src/main/java/org/tikv/common/replica/ReplicaSelector.java +++ b/src/main/java/org/tikv/common/replica/ReplicaSelector.java @@ -17,13 +17,11 @@ import java.io.Serializable; import java.util.List; -import org.tikv.kvproto.Metapb; public interface ReplicaSelector extends Serializable { - public static final ReplicaSelector LEADER = new LeaderReplicaSelector(); - public static final ReplicaSelector FOLLOWER = new FollowerReplicaSelector(); - public static final ReplicaSelector LEADER_AND_FOLLOWER = new LeaderFollowerReplicaSelector(); + ReplicaSelector LEADER = new LeaderReplicaSelector(); + ReplicaSelector FOLLOWER = new FollowerReplicaSelector(); + ReplicaSelector LEADER_AND_FOLLOWER = new LeaderFollowerReplicaSelector(); - List select( - Metapb.Peer leader, List followers, List learners); + List select(Region region); } diff --git a/src/main/java/org/tikv/common/replica/Store.java b/src/main/java/org/tikv/common/replica/Store.java new file mode 100644 index 00000000000..a57fcbe95ff --- /dev/null +++ b/src/main/java/org/tikv/common/replica/Store.java @@ -0,0 +1,109 @@ +package org.tikv.common.replica; + +import java.util.List; +import org.tikv.kvproto.Metapb; + +public class Store { + public static class Label { + private final org.tikv.kvproto.Metapb.StoreLabel label; + + Label(org.tikv.kvproto.Metapb.StoreLabel label) { + this.label = label; + } + + public String getKey() { + return label.getKey(); + } + + public String getValue() { + return label.getValue(); + } + } + + public enum State { + Unknown, + Up, + Offline, + Tombstone + } + + private static final Label[] EMPTY_LABELS = new Label[0]; + private Label[] labels; + private final Metapb.Peer peer; + private final Metapb.Store store; + private final boolean isLeader; + + Store( + final org.tikv.kvproto.Metapb.Peer peer, + final org.tikv.kvproto.Metapb.Store store, + boolean isLeader) { + this.peer = peer; + this.store = store; + this.isLeader = isLeader; + } + + public Metapb.Peer getPeer() { + return peer; + } + + public Label[] getLabels() { + if (labels == null) { + List labelList = store.getLabelsList(); + if (labelList.isEmpty()) { + labels = EMPTY_LABELS; + } else { + labels = labelList.stream().map(Label::new).toArray(Label[]::new); + } + } + return labels; + } + + public boolean isLearner() { + return peer.getRole() == Metapb.PeerRole.Learner; + } + + public boolean isLeader() { + return isLeader; + } + + public boolean isFollower() { + return peer.getRole() == Metapb.PeerRole.Voter && !isLeader; + } + + public long getId() { + return store.getId(); + } + + public String getAddress() { + return store.getAddress(); + } + + public String getVersion() { + return store.getVersion(); + } + + public State getState() { + switch (store.getState()) { + case Up: + return State.Up; + case Offline: + return State.Offline; + case Tombstone: + return State.Tombstone; + default: + return State.Unknown; + } + } + + public boolean equals(Object o) { + if (!(o instanceof Store)) { + return false; + } + Store other = (Store) o; + return this.peer.equals(other.peer); + } + + public String toString() { + return "{\npeer:{\n" + peer.toString() + "},\nstore:{\n" + store.toString() + "}\n}"; + } +} diff --git a/src/test/java/org/tikv/common/MockServerTest.java b/src/test/java/org/tikv/common/MockServerTest.java index bfce6db50cd..45afddfafd7 100644 --- a/src/test/java/org/tikv/common/MockServerTest.java +++ b/src/test/java/org/tikv/common/MockServerTest.java @@ -2,8 +2,8 @@ import com.google.protobuf.ByteString; import java.io.IOException; +import java.util.List; import org.junit.Before; -import org.tikv.common.TiConfiguration.KVMode; import org.tikv.common.region.TiRegion; import org.tikv.common.replica.ReplicaSelector; import org.tikv.kvproto.Metapb; @@ -28,13 +28,22 @@ public void setUp() throws IOException { .addPeers(Metapb.Peer.newBuilder().setId(11).setStoreId(13)) .build(); + List s = + List.of( + Metapb.Store.newBuilder() + .setAddress("localhost:1234") + .setVersion("5.0.0") + .setId(13) + .build()); + region = new TiRegion( r, r.getPeers(0), + r.getPeersList(), + s, session.getConf().getIsolationLevel(), session.getConf().getCommandPriority(), - KVMode.TXN, ReplicaSelector.LEADER); pdServer.addGetRegionResp(Pdpb.GetRegionResponse.newBuilder().setRegion(r).build()); server = new KVMockServer(); diff --git a/src/test/java/org/tikv/common/PDClientTest.java b/src/test/java/org/tikv/common/PDClientTest.java index 14514601de1..ca7b747df4e 100644 --- a/src/test/java/org/tikv/common/PDClientTest.java +++ b/src/test/java/org/tikv/common/PDClientTest.java @@ -24,9 +24,9 @@ import org.junit.Test; import org.tikv.common.exception.GrpcException; import org.tikv.common.meta.TiTimestamp; -import org.tikv.common.region.TiRegion; import org.tikv.common.util.BackOffer; import org.tikv.common.util.ConcreteBackOffer; +import org.tikv.common.util.Pair; import org.tikv.kvproto.Metapb; import org.tikv.kvproto.Metapb.Store; import org.tikv.kvproto.Metapb.StoreState; @@ -85,13 +85,16 @@ public void testGetRegionByKey() throws Exception { GrpcUtils.makePeer(1, 10), GrpcUtils.makePeer(2, 20)))); try (PDClient client = session.getPDClient()) { - TiRegion r = client.getRegionByKey(defaultBackOff(), ByteString.EMPTY); + Pair rl = + client.getRegionByKey(defaultBackOff(), ByteString.EMPTY); + Metapb.Region r = rl.first; + Metapb.Peer l = rl.second; assertEquals(r.getStartKey(), ByteString.copyFrom(startKey)); assertEquals(r.getEndKey(), ByteString.copyFrom(endKey)); assertEquals(r.getRegionEpoch().getConfVer(), confVer); assertEquals(r.getRegionEpoch().getVersion(), ver); - assertEquals(r.getLeader().getId(), 1); - assertEquals(r.getLeader().getStoreId(), 10); + assertEquals(l.getId(), 1); + assertEquals(l.getStoreId(), 10); } } @@ -112,13 +115,16 @@ public void testGetRegionByKeyAsync() throws Exception { GrpcUtils.makePeer(1, 10), GrpcUtils.makePeer(2, 20)))); try (PDClient client = session.getPDClient()) { - TiRegion r = client.getRegionByKeyAsync(defaultBackOff(), ByteString.EMPTY).get(); + Pair rl = + client.getRegionByKeyAsync(defaultBackOff(), ByteString.EMPTY).get(); + Metapb.Region r = rl.first; + Metapb.Peer l = rl.second; assertEquals(r.getStartKey(), ByteString.copyFrom(startKey)); assertEquals(r.getEndKey(), ByteString.copyFrom(endKey)); assertEquals(r.getRegionEpoch().getConfVer(), confVer); assertEquals(r.getRegionEpoch().getVersion(), ver); - assertEquals(r.getLeader().getId(), 1); - assertEquals(r.getLeader().getStoreId(), 10); + assertEquals(l.getId(), 1); + assertEquals(l.getStoreId(), 10); } } @@ -140,13 +146,15 @@ public void testGetRegionById() throws Exception { GrpcUtils.makePeer(1, 10), GrpcUtils.makePeer(2, 20)))); try (PDClient client = session.getPDClient()) { - TiRegion r = client.getRegionByID(defaultBackOff(), 0); + Pair rl = client.getRegionByID(defaultBackOff(), 0); + Metapb.Region r = rl.first; + Metapb.Peer l = rl.second; assertEquals(r.getStartKey(), ByteString.copyFrom(startKey)); assertEquals(r.getEndKey(), ByteString.copyFrom(endKey)); assertEquals(r.getRegionEpoch().getConfVer(), confVer); assertEquals(r.getRegionEpoch().getVersion(), ver); - assertEquals(r.getLeader().getId(), 1); - assertEquals(r.getLeader().getStoreId(), 10); + assertEquals(l.getId(), 1); + assertEquals(l.getStoreId(), 10); } } @@ -167,13 +175,15 @@ public void testGetRegionByIdAsync() throws Exception { GrpcUtils.makePeer(1, 10), GrpcUtils.makePeer(2, 20)))); try (PDClient client = session.getPDClient()) { - TiRegion r = client.getRegionByIDAsync(defaultBackOff(), 0).get(); + Pair rl = client.getRegionByIDAsync(defaultBackOff(), 0).get(); + Metapb.Region r = rl.first; + Metapb.Peer l = rl.second; assertEquals(r.getStartKey(), ByteString.copyFrom(startKey)); assertEquals(r.getEndKey(), ByteString.copyFrom(endKey)); assertEquals(r.getRegionEpoch().getConfVer(), confVer); assertEquals(r.getRegionEpoch().getVersion(), ver); - assertEquals(r.getLeader().getId(), 1); - assertEquals(r.getLeader().getStoreId(), 10); + assertEquals(l.getId(), 1); + assertEquals(l.getStoreId(), 10); } } diff --git a/src/test/java/org/tikv/common/RegionStoreClientTest.java b/src/test/java/org/tikv/common/RegionStoreClientTest.java index e74c5823aef..28bc90080ce 100644 --- a/src/test/java/org/tikv/common/RegionStoreClientTest.java +++ b/src/test/java/org/tikv/common/RegionStoreClientTest.java @@ -52,7 +52,7 @@ private RegionStoreClient createClient(String version) { new RegionStoreClientBuilder( session.getConf(), session.getChannelFactory(), - new RegionManager(session.getPDClient()), + new RegionManager(session.getConf(), session.getPDClient()), session.getPDClient()); return builder.build(region, store); diff --git a/src/test/java/org/tikv/txn/ReplicaReadTest.java b/src/test/java/org/tikv/txn/ReplicaReadTest.java index 1baaea2cc66..acfa2e97baa 100644 --- a/src/test/java/org/tikv/txn/ReplicaReadTest.java +++ b/src/test/java/org/tikv/txn/ReplicaReadTest.java @@ -10,8 +10,9 @@ import org.junit.Test; import org.tikv.common.TiConfiguration; import org.tikv.common.TiSession; +import org.tikv.common.replica.Region; import org.tikv.common.replica.ReplicaSelector; -import org.tikv.kvproto.Metapb; +import org.tikv.common.replica.Store; public class ReplicaReadTest extends TXNTest { private TiSession session; @@ -41,12 +42,11 @@ public void replicaSelectorTest() { conf.setReplicaSelector( new ReplicaSelector() { @Override - public List select( - Metapb.Peer leader, List followers, List learners) { - List list = new ArrayList<>(); - list.addAll(followers); - list.addAll(learners); - list.add(leader); + public List select(Region region) { + List list = new ArrayList<>(); + for (Store store : region.getStores()) { + list.add(store); + } return list; } }); From a3b56b0426465a262b46bdfb45b5cbb28ae2385e Mon Sep 17 00:00:00 2001 From: Xiaoguang Sun Date: Thu, 20 May 2021 16:48:13 +0800 Subject: [PATCH 02/13] Use string helper Signed-off-by: Xiaoguang Sun --- src/main/java/org/tikv/common/replica/Region.java | 9 +++------ src/main/java/org/tikv/common/replica/Store.java | 4 +++- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/tikv/common/replica/Region.java b/src/main/java/org/tikv/common/replica/Region.java index 9f3779feec1..b893ea70327 100644 --- a/src/main/java/org/tikv/common/replica/Region.java +++ b/src/main/java/org/tikv/common/replica/Region.java @@ -1,6 +1,7 @@ package org.tikv.common.replica; -import java.util.Arrays; +import static com.google.common.base.MoreObjects.toStringHelper; + import java.util.Iterator; import java.util.List; import org.tikv.kvproto.Metapb; @@ -50,10 +51,6 @@ public byte[] getEndKey() { } public String toString() { - return "{\nregion:{\n" - + region.toString() - + "},\nstores:[\n" - + Arrays.deepToString(stores) - + "]\n}"; + return toStringHelper(this).add("region", region).add("stores", stores).toString(); } } diff --git a/src/main/java/org/tikv/common/replica/Store.java b/src/main/java/org/tikv/common/replica/Store.java index a57fcbe95ff..6881d4be91b 100644 --- a/src/main/java/org/tikv/common/replica/Store.java +++ b/src/main/java/org/tikv/common/replica/Store.java @@ -1,5 +1,7 @@ package org.tikv.common.replica; +import static com.google.common.base.MoreObjects.toStringHelper; + import java.util.List; import org.tikv.kvproto.Metapb; @@ -104,6 +106,6 @@ public boolean equals(Object o) { } public String toString() { - return "{\npeer:{\n" + peer.toString() + "},\nstore:{\n" + store.toString() + "}\n}"; + return toStringHelper(this).add("peer", peer).add("store", store).toString(); } } From 42f10cd9743bae705b4651a7030fda597ccb13c1 Mon Sep 17 00:00:00 2001 From: Xiaoguang Sun Date: Wed, 26 May 2021 21:39:29 +0800 Subject: [PATCH 03/13] Print debug log when failed to get member Signed-off-by: Xiaoguang Sun --- src/main/java/org/tikv/common/PDClient.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index 9dabf90ea8d..7fba1113c43 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -528,6 +528,7 @@ private void initCluster() { if (resp != null) { break; } + logger.info("Could not get leader member with pd: " + u); } checkNotNull(resp, "Failed to init client for PD cluster."); long clusterId = resp.getHeader().getClusterId(); From de050bf98c7bd784540ae8ad2cc72dc5f097dba2 Mon Sep 17 00:00:00 2001 From: Xiaoguang Sun Date: Sat, 12 Jun 2021 00:02:59 +0800 Subject: [PATCH 04/13] Support customized HostMapping implementation Signed-off-by: Xiaoguang Sun --- .../org/tikv/common/DefaultHostMapping.java | 87 +++++++++++++++++++ .../java/org/tikv/common/HostMapping.java | 70 +-------------- src/main/java/org/tikv/common/PDClient.java | 5 +- .../java/org/tikv/common/TiConfiguration.java | 9 ++ 4 files changed, 103 insertions(+), 68 deletions(-) create mode 100644 src/main/java/org/tikv/common/DefaultHostMapping.java diff --git a/src/main/java/org/tikv/common/DefaultHostMapping.java b/src/main/java/org/tikv/common/DefaultHostMapping.java new file mode 100644 index 00000000000..2e53fff37fd --- /dev/null +++ b/src/main/java/org/tikv/common/DefaultHostMapping.java @@ -0,0 +1,87 @@ +/* + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.tikv.common; + +import static org.tikv.common.pd.PDUtils.addrToUri; + +import com.google.common.annotations.Beta; +import io.etcd.jetcd.ByteSequence; +import io.etcd.jetcd.Client; +import io.etcd.jetcd.KeyValue; +import io.etcd.jetcd.kv.GetResponse; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DefaultHostMapping implements HostMapping { + private static final String NETWORK_MAPPING_PATH = "/client/url-mapping"; + private final Client etcdClient; + private final String networkMappingName; + private final ConcurrentMap hostMapping; + private final Logger logger = LoggerFactory.getLogger(DefaultHostMapping.class); + + public DefaultHostMapping(Client etcdClient, String networkMappingName) { + this.etcdClient = etcdClient; + this.networkMappingName = networkMappingName; + this.hostMapping = new ConcurrentHashMap<>(); + } + + private ByteSequence hostToNetworkMappingKey(String host) { + String path = NETWORK_MAPPING_PATH + "/" + networkMappingName + "/" + host; + return ByteSequence.from(path, StandardCharsets.UTF_8); + } + + @Beta + private String getMappedHostFromPD(String host) { + ByteSequence hostKey = hostToNetworkMappingKey(host); + for (int i = 0; i < 5; i++) { + CompletableFuture future = etcdClient.getKVClient().get(hostKey); + try { + GetResponse resp = future.get(); + List kvs = resp.getKvs(); + if (kvs.size() != 1) { + break; + } + return kvs.get(0).getValue().toString(StandardCharsets.UTF_8); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + logger.info("failed to get mapped Host from PD: " + host, e); + break; + } catch (Exception ignore) { + // ignore + break; + } + } + return host; + } + + public URI getMappedURI(URI uri) { + if (networkMappingName.isEmpty()) { + return uri; + } + return addrToUri( + hostMapping.computeIfAbsent(uri.getHost(), this::getMappedHostFromPD) + + ":" + + uri.getPort()); + } +} diff --git a/src/main/java/org/tikv/common/HostMapping.java b/src/main/java/org/tikv/common/HostMapping.java index 08ae8048c7e..e13eda8ff7d 100644 --- a/src/main/java/org/tikv/common/HostMapping.java +++ b/src/main/java/org/tikv/common/HostMapping.java @@ -15,73 +15,9 @@ package org.tikv.common; -import static org.tikv.common.pd.PDUtils.addrToUri; - -import com.google.common.annotations.Beta; -import io.etcd.jetcd.ByteSequence; -import io.etcd.jetcd.Client; -import io.etcd.jetcd.KeyValue; -import io.etcd.jetcd.kv.GetResponse; +import java.io.Serializable; import java.net.URI; -import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class HostMapping { - private static final String NETWORK_MAPPING_PATH = "/client/url-mapping"; - private final Client etcdClient; - private final String networkMappingName; - private final ConcurrentMap hostMapping; - private final Logger logger = LoggerFactory.getLogger(HostMapping.class); - - public HostMapping(Client etcdClient, String networkMappingName) { - this.etcdClient = etcdClient; - this.networkMappingName = networkMappingName; - this.hostMapping = new ConcurrentHashMap<>(); - } - - private ByteSequence hostToNetworkMappingKey(String host) { - String path = NETWORK_MAPPING_PATH + "/" + networkMappingName + "/" + host; - return ByteSequence.from(path, StandardCharsets.UTF_8); - } - - @Beta - private String getMappedHostFromPD(String host) { - ByteSequence hostKey = hostToNetworkMappingKey(host); - for (int i = 0; i < 5; i++) { - CompletableFuture future = etcdClient.getKVClient().get(hostKey); - try { - GetResponse resp = future.get(); - List kvs = resp.getKvs(); - if (kvs.size() != 1) { - break; - } - return kvs.get(0).getValue().toString(StandardCharsets.UTF_8); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (ExecutionException e) { - logger.info("failed to get mapped Host from PD: " + host, e); - break; - } catch (Exception ignore) { - // ignore - break; - } - } - return host; - } - public URI getMappedURI(URI uri) { - if (networkMappingName.isEmpty()) { - return uri; - } - return addrToUri( - hostMapping.computeIfAbsent(uri.getHost(), this::getMappedHostFromPD) - + ":" - + uri.getPort()); - } +public interface HostMapping extends Serializable { + URI getMappedURI(URI uri); } diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index 466cef3d59c..d01db9fd5aa 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -34,6 +34,7 @@ import java.net.URI; import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -522,7 +523,9 @@ private void initCluster() { .setDaemon(true) .build())) .build(); - this.hostMapping = new HostMapping(this.etcdClient, conf.getNetworkMappingName()); + this.hostMapping = + Optional.ofNullable(getConf().getHostMapping()) + .orElseGet(() -> new DefaultHostMapping(this.etcdClient, conf.getNetworkMappingName())); for (URI u : pdAddrs) { resp = getMembers(u); if (resp != null) { diff --git a/src/main/java/org/tikv/common/TiConfiguration.java b/src/main/java/org/tikv/common/TiConfiguration.java index 0e97028d06f..4f52114fb0b 100644 --- a/src/main/java/org/tikv/common/TiConfiguration.java +++ b/src/main/java/org/tikv/common/TiConfiguration.java @@ -255,6 +255,7 @@ private static ReplicaRead getReplicaRead(String key) { private int metricsPort = getInt(TIKV_METRICS_PORT); private final String networkMappingName = get(TIKV_NETWORK_MAPPING_NAME); + private HostMapping hostMapping = null; public enum KVMode { TXN, @@ -532,4 +533,12 @@ public TiConfiguration setMetricsPort(int metricsPort) { public String getNetworkMappingName() { return this.networkMappingName; } + + public HostMapping getHostMapping() { + return hostMapping; + } + + public void setHostMapping(HostMapping mapping) { + this.hostMapping = mapping; + } } From 58668af07127e28bc6535ddc1dd41cdc608d9d1f Mon Sep 17 00:00:00 2001 From: Xiaoguang Sun Date: Tue, 15 Jun 2021 12:13:45 +0800 Subject: [PATCH 05/13] Use ImmutableList as List.of only exists since Java 9 Signed-off-by: Xiaoguang Sun --- src/test/java/org/tikv/common/MockServerTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/test/java/org/tikv/common/MockServerTest.java b/src/test/java/org/tikv/common/MockServerTest.java index 45afddfafd7..620623396ab 100644 --- a/src/test/java/org/tikv/common/MockServerTest.java +++ b/src/test/java/org/tikv/common/MockServerTest.java @@ -1,5 +1,6 @@ package org.tikv.common; +import com.google.common.collect.ImmutableList; import com.google.protobuf.ByteString; import java.io.IOException; import java.util.List; @@ -29,7 +30,7 @@ public void setUp() throws IOException { .build(); List s = - List.of( + ImmutableList.of( Metapb.Store.newBuilder() .setAddress("localhost:1234") .setVersion("5.0.0") From 12512f08f5bcaab3517b4b28b4f9102197395a5d Mon Sep 17 00:00:00 2001 From: Xiaoguang Sun Date: Thu, 17 Jun 2021 20:39:51 +0800 Subject: [PATCH 06/13] Use this.leader in TiRegion Signed-off-by: Xiaoguang Sun --- src/main/java/org/tikv/common/PDClient.java | 1 - src/main/java/org/tikv/common/region/TiRegion.java | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index d01db9fd5aa..e9db3538f6a 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -250,7 +250,6 @@ public Pair getRegionByKey(BackOffer backOffer, Byte @Override public Future> getRegionByKeyAsync( BackOffer backOffer, ByteString key) { - // TODO do not call blocking get store FutureObserver, GetRegionResponse> responseObserver = new FutureObserver<>( resp -> diff --git a/src/main/java/org/tikv/common/region/TiRegion.java b/src/main/java/org/tikv/common/region/TiRegion.java index dd07d0d89d3..5ec271d8ea2 100644 --- a/src/main/java/org/tikv/common/region/TiRegion.java +++ b/src/main/java/org/tikv/common/region/TiRegion.java @@ -78,10 +78,9 @@ public TiRegion( } // init replicaList - // replicaList = replicaSelector.select(this.leader, getFollowerList(), getLearnerList()); replicaList = replicaSelector - .select(new org.tikv.common.replica.Region(meta, leader, peers, stores)) + .select(new org.tikv.common.replica.Region(meta, this.leader, peers, stores)) .stream() .map(org.tikv.common.replica.Store::getPeer) .collect(Collectors.toList()); From 0171bc97e3c77d4ae1a9ef58fcca34510072ca77 Mon Sep 17 00:00:00 2001 From: Xiaoguang Sun Date: Thu, 17 Jun 2021 21:27:02 +0800 Subject: [PATCH 07/13] Fix couple of integration tests Signed-off-by: Xiaoguang Sun --- .../org/tikv/common/RegionManagerTest.java | 34 ++++++++++++++----- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/src/test/java/org/tikv/common/RegionManagerTest.java b/src/test/java/org/tikv/common/RegionManagerTest.java index d382e21b520..9a50bd9348f 100644 --- a/src/test/java/org/tikv/common/RegionManagerTest.java +++ b/src/test/java/org/tikv/common/RegionManagerTest.java @@ -61,6 +61,7 @@ public void getRegionByKey() throws Exception { int confVer = 1026; int ver = 1027; long regionId = 233; + String testAddress = "testAddress"; pdServer.addGetRegionResp( GrpcUtils.makeGetRegionResponse( pdServer.getClusterId(), @@ -71,6 +72,18 @@ public void getRegionByKey() throws Exception { GrpcUtils.makeRegionEpoch(confVer, ver), GrpcUtils.makePeer(1, 10), GrpcUtils.makePeer(2, 20)))); + for (long id : new long[] {10, 20}) { + pdServer.addGetStoreResp( + GrpcUtils.makeGetStoreResponse( + pdServer.getClusterId(), + GrpcUtils.makeStore( + id, + testAddress, + Metapb.StoreState.Up, + GrpcUtils.makeStoreLabel("k1", "v1"), + GrpcUtils.makeStoreLabel("k2", "v2")))); + } + TiRegion region = mgr.getRegionByKey(startKey); assertEquals(region.getId(), regionId); @@ -106,15 +119,18 @@ public void getStoreByKey() throws Exception { GrpcUtils.makeRegionEpoch(confVer, ver), GrpcUtils.makePeer(storeId, 10), GrpcUtils.makePeer(storeId + 1, 20)))); - pdServer.addGetStoreResp( - GrpcUtils.makeGetStoreResponse( - pdServer.getClusterId(), - GrpcUtils.makeStore( - storeId, - testAddress, - Metapb.StoreState.Up, - GrpcUtils.makeStoreLabel("k1", "v1"), - GrpcUtils.makeStoreLabel("k2", "v2")))); + for (long id : new long[] {10, 20}) { + pdServer.addGetStoreResp( + GrpcUtils.makeGetStoreResponse( + pdServer.getClusterId(), + GrpcUtils.makeStore( + id, + testAddress, + Metapb.StoreState.Up, + GrpcUtils.makeStoreLabel("k1", "v1"), + GrpcUtils.makeStoreLabel("k2", "v2")))); + } + Pair pair = mgr.getRegionStorePairByKey(searchKey); assertEquals(pair.first.getId(), regionId); assertEquals(pair.first.getId(), storeId); From 26a8f53ea3bbc76a01e0c558865ef670e138e8db Mon Sep 17 00:00:00 2001 From: Xiaoguang Sun Date: Thu, 17 Jun 2021 21:36:51 +0800 Subject: [PATCH 08/13] Fix one more integration test Signed-off-by: Xiaoguang Sun --- src/test/java/org/tikv/common/MockServerTest.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/test/java/org/tikv/common/MockServerTest.java b/src/test/java/org/tikv/common/MockServerTest.java index 620623396ab..ad4327142fe 100644 --- a/src/test/java/org/tikv/common/MockServerTest.java +++ b/src/test/java/org/tikv/common/MockServerTest.java @@ -47,6 +47,9 @@ public void setUp() throws IOException { session.getConf().getCommandPriority(), ReplicaSelector.LEADER); pdServer.addGetRegionResp(Pdpb.GetRegionResponse.newBuilder().setRegion(r).build()); + for (Metapb.Store store : s) { + pdServer.addGetStoreResp(Pdpb.GetStoreResponse.newBuilder().setStore(store).build()); + } server = new KVMockServer(); port = server.start(region); } From 633649297e9c126306c04d3dc73b526b0a50115a Mon Sep 17 00:00:00 2001 From: Xiaoguang Sun Date: Fri, 18 Jun 2021 17:01:15 +0800 Subject: [PATCH 09/13] Fix replica read peer mismatch Signed-off-by: Xiaoguang Sun --- src/main/java/org/tikv/common/region/TiRegion.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/tikv/common/region/TiRegion.java b/src/main/java/org/tikv/common/region/TiRegion.java index 5ec271d8ea2..10847716fde 100644 --- a/src/main/java/org/tikv/common/region/TiRegion.java +++ b/src/main/java/org/tikv/common/region/TiRegion.java @@ -148,7 +148,8 @@ public Kvrpcpb.Context getReplicaContext(Set resolvedLocks, TiStoreType st private Kvrpcpb.Context getContext( Peer currentPeer, Set resolvedLocks, TiStoreType storeType) { - boolean replicaRead = !isLeader(getCurrentReplica()) && TiStoreType.TiKV.equals(storeType); + currentPeer = getCurrentReplica(); + boolean replicaRead = !isLeader(currentPeer) && TiStoreType.TiKV.equals(storeType); Kvrpcpb.Context.Builder builder = Kvrpcpb.Context.newBuilder(); builder From ecb8af1103e96ae2d0c497cbc20c928bbf96fe5b Mon Sep 17 00:00:00 2001 From: Xiaoguang Sun Date: Fri, 18 Jun 2021 22:34:50 +0800 Subject: [PATCH 10/13] Use passed in peer as it already selected peer Signed-off-by: Xiaoguang Sun --- src/main/java/org/tikv/common/region/TiRegion.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/org/tikv/common/region/TiRegion.java b/src/main/java/org/tikv/common/region/TiRegion.java index 10847716fde..48e9df20fca 100644 --- a/src/main/java/org/tikv/common/region/TiRegion.java +++ b/src/main/java/org/tikv/common/region/TiRegion.java @@ -148,7 +148,6 @@ public Kvrpcpb.Context getReplicaContext(Set resolvedLocks, TiStoreType st private Kvrpcpb.Context getContext( Peer currentPeer, Set resolvedLocks, TiStoreType storeType) { - currentPeer = getCurrentReplica(); boolean replicaRead = !isLeader(currentPeer) && TiStoreType.TiKV.equals(storeType); Kvrpcpb.Context.Builder builder = Kvrpcpb.Context.newBuilder(); From 0fe131eeac83dba9300f835a7c6bda565448499c Mon Sep 17 00:00:00 2001 From: Xiaoguang Sun Date: Wed, 23 Jun 2021 11:09:07 +0800 Subject: [PATCH 11/13] Refactor after merge master Signed-off-by: Xiaoguang Sun --- src/main/java/org/tikv/common/PDClient.java | 2 +- .../org/tikv/common/region/RegionManager.java | 11 +---- .../java/org/tikv/common/region/TiRegion.java | 44 +++++-------------- .../java/org/tikv/common/MockServerTest.java | 8 +--- 4 files changed, 15 insertions(+), 50 deletions(-) diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index 5d35e4a1fa4..c22a16d8e09 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -607,7 +607,7 @@ public String toString() { } private Metapb.Region decodeRegion(Metapb.Region region) { - boolean isRawRegion = getConf().getKvMode() == KVMode.RAW; + final boolean isRawRegion = conf.getKvMode() == KVMode.RAW; Metapb.Region.Builder builder = Metapb.Region.newBuilder() .setId(region.getId()) diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 3f77682b2bc..fd58c1368ef 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -417,16 +417,7 @@ private List getRegionStore(List peers, BackOffer backOffe private TiRegion createRegion(Metapb.Region region, Metapb.Peer leader, BackOffer backOffer) { List peers = region.getPeersList(); List stores = getRegionStore(peers, backOffer); - return new TiRegion( - region, - leader, - peers, - stores, - null, - conf.getIsolationLevel(), - conf.getCommandPriority(), - conf.getKvMode(), - conf.getReplicaSelector()); + return new TiRegion(conf, region, leader, peers, stores, null); } public synchronized void clearAll() { diff --git a/src/main/java/org/tikv/common/region/TiRegion.java b/src/main/java/org/tikv/common/region/TiRegion.java index 8ef9cde429e..2fbb980612d 100644 --- a/src/main/java/org/tikv/common/region/TiRegion.java +++ b/src/main/java/org/tikv/common/region/TiRegion.java @@ -26,7 +26,7 @@ import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.tikv.common.TiConfiguration.KVMode; +import org.tikv.common.TiConfiguration; import org.tikv.common.codec.KeyUtils; import org.tikv.common.exception.TiClientInternalException; import org.tikv.common.key.Key; @@ -43,9 +43,9 @@ public class TiRegion implements Serializable { private static final Logger logger = LoggerFactory.getLogger(TiRegion.class); private final Region meta; - private final KVMode kvMode; private final IsolationLevel isolationLevel; private final Kvrpcpb.CommandPri commandPri; + private final TiConfiguration conf; private final Peer leader; private final ReplicaSelector replicaSelector; private final List replicaList; @@ -55,23 +55,19 @@ public class TiRegion implements Serializable { private final List stores; public TiRegion( + TiConfiguration conf, Region meta, Peer leader, List peers, List stores, - TiStore proxyStore, - IsolationLevel isolationLevel, - Kvrpcpb.CommandPri commandPri, - KVMode kvMode, - ReplicaSelector replicaSelector) { - Objects.requireNonNull(meta, "meta is null"); - this.meta = meta; - this.kvMode = kvMode; - this.isolationLevel = isolationLevel; - this.commandPri = commandPri; + TiStore proxyStore) { + this.conf = Objects.requireNonNull(conf, "conf is null"); + this.meta = Objects.requireNonNull(meta, "meta is null"); + this.isolationLevel = conf.getIsolationLevel(); + this.commandPri = conf.getCommandPriority(); this.peers = peers; this.stores = stores; - this.replicaSelector = replicaSelector; + this.replicaSelector = conf.getReplicaSelector(); this.proxyStore = proxyStore; if (leader == null || leader.getId() == 0) { if (meta.getPeersCount() == 0) { @@ -201,32 +197,14 @@ public TiRegion switchPeer(long leaderStoreID) { List peers = meta.getPeersList(); for (Peer p : peers) { if (p.getStoreId() == leaderStoreID) { - return new TiRegion( - this.meta, - p, - peers, - this.stores, - this.proxyStore, - this.isolationLevel, - this.commandPri, - this.kvMode, - this.replicaSelector); + return new TiRegion(this.conf, this.meta, p, peers, this.stores, this.proxyStore); } } return null; } public TiRegion switchProxyStore(TiStore store) { - return new TiRegion( - this.meta, - this.leader, - this.peers, - this.stores, - store, - this.isolationLevel, - this.commandPri, - this.kvMode, - this.replicaSelector); + return new TiRegion(this.conf, this.meta, this.leader, this.peers, this.stores, store); } public boolean isMoreThan(ByteString key) { diff --git a/src/test/java/org/tikv/common/MockServerTest.java b/src/test/java/org/tikv/common/MockServerTest.java index 6831227737d..d3fd36f1fe4 100644 --- a/src/test/java/org/tikv/common/MockServerTest.java +++ b/src/test/java/org/tikv/common/MockServerTest.java @@ -8,7 +8,6 @@ import org.junit.Before; import org.tikv.common.region.TiRegion; import org.tikv.common.region.TiStore; -import org.tikv.common.replica.ReplicaSelector; import org.tikv.kvproto.Metapb; import org.tikv.kvproto.Pdpb; @@ -41,15 +40,12 @@ public void setUp() throws IOException { region = new TiRegion( + session.getConf(), r, r.getPeers(0), r.getPeersList(), s.stream().map(TiStore::new).collect(Collectors.toList()), - null, - session.getConf().getIsolationLevel(), - session.getConf().getCommandPriority(), - TiConfiguration.KVMode.TXN, - ReplicaSelector.LEADER); + null); pdServer.addGetRegionResp(Pdpb.GetRegionResponse.newBuilder().setRegion(r).build()); for (Metapb.Store store : s) { pdServer.addGetStoreResp(Pdpb.GetStoreResponse.newBuilder().setStore(store).build()); From c73df3c8693c6fd4100bce62f31751de7f004093 Mon Sep 17 00:00:00 2001 From: Xiaoguang Sun Date: Wed, 23 Jun 2021 11:40:54 +0800 Subject: [PATCH 12/13] Try fix integration test Signed-off-by: Xiaoguang Sun --- src/test/java/org/tikv/common/PDClientTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/java/org/tikv/common/PDClientTest.java b/src/test/java/org/tikv/common/PDClientTest.java index 33460961b7c..a3989d74132 100644 --- a/src/test/java/org/tikv/common/PDClientTest.java +++ b/src/test/java/org/tikv/common/PDClientTest.java @@ -46,14 +46,14 @@ public void testSwitchLeader() throws Exception { try (PDClient client = session.getPDClient()) { client.trySwitchLeader("http://" + LOCAL_ADDR + ":" + (pdServer.port + 1)); assertEquals( - client.getPdClientWrapper().getLeaderInfo(), LOCAL_ADDR + ":" + (pdServer.port + 1)); + client.getPdClientWrapper().getLeaderInfo(), "http://" + LOCAL_ADDR + ":" + (pdServer.port + 1)); } tearDown(); setUp(LOCAL_ADDR_IPV6); try (PDClient client = session.getPDClient()) { client.trySwitchLeader("http://" + LOCAL_ADDR_IPV6 + ":" + (pdServer.port + 2)); assertEquals( - client.getPdClientWrapper().getLeaderInfo(), LOCAL_ADDR_IPV6 + ":" + (pdServer.port + 2)); + client.getPdClientWrapper().getLeaderInfo(), "http://" + LOCAL_ADDR_IPV6 + ":" + (pdServer.port + 2)); } } From 2a63421ea1bbca85654a8db9ff436ea160badd03 Mon Sep 17 00:00:00 2001 From: Xiaoguang Sun Date: Wed, 23 Jun 2021 12:39:11 +0800 Subject: [PATCH 13/13] Workaround lint issue with duplicated dependency for different scopes Signed-off-by: Xiaoguang Sun --- pom.xml | 2 +- src/test/java/org/tikv/common/PDClientTest.java | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index 82b01cee7b7..d7f8c298a87 100644 --- a/pom.xml +++ b/pom.xml @@ -187,7 +187,7 @@ org.apache.commons commons-lang3 - 3.9 + 3.10 test diff --git a/src/test/java/org/tikv/common/PDClientTest.java b/src/test/java/org/tikv/common/PDClientTest.java index a3989d74132..d26f09e436d 100644 --- a/src/test/java/org/tikv/common/PDClientTest.java +++ b/src/test/java/org/tikv/common/PDClientTest.java @@ -46,14 +46,16 @@ public void testSwitchLeader() throws Exception { try (PDClient client = session.getPDClient()) { client.trySwitchLeader("http://" + LOCAL_ADDR + ":" + (pdServer.port + 1)); assertEquals( - client.getPdClientWrapper().getLeaderInfo(), "http://" + LOCAL_ADDR + ":" + (pdServer.port + 1)); + client.getPdClientWrapper().getLeaderInfo(), + "http://" + LOCAL_ADDR + ":" + (pdServer.port + 1)); } tearDown(); setUp(LOCAL_ADDR_IPV6); try (PDClient client = session.getPDClient()) { client.trySwitchLeader("http://" + LOCAL_ADDR_IPV6 + ":" + (pdServer.port + 2)); assertEquals( - client.getPdClientWrapper().getLeaderInfo(), "http://" + LOCAL_ADDR_IPV6 + ":" + (pdServer.port + 2)); + client.getPdClientWrapper().getLeaderInfo(), + "http://" + LOCAL_ADDR_IPV6 + ":" + (pdServer.port + 2)); } }