Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 29 additions & 9 deletions src/main/java/org/tikv/common/TiSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,23 +93,46 @@ public TiSession(TiConfiguration conf) {

private synchronized void warmUp() {
long warmUpStartTime = System.currentTimeMillis();
warmUpStore();
warmUpRegion();
warmUpGrpc();
logger.info(
String.format("warm up duration %d ms", System.currentTimeMillis() - warmUpStartTime));
}

private void warmUpStore() {
try {
BackOffer backOffer = ConcreteBackOffer.newRawKVBackOff();
this.client = getPDClient();
this.regionManager = getRegionManager();
List<Metapb.Store> stores = this.client.getAllStores(ConcreteBackOffer.newGetBackOff());
List<Metapb.Store> stores = this.client.getAllStores(backOffer);
// warm up store cache
for (Metapb.Store store : stores) {
this.regionManager.updateStore(
null,
new TiStore(this.client.getStore(ConcreteBackOffer.newGetBackOff(), store.getId())));
null, new TiStore(this.client.getStore(backOffer, store.getId())));
}
ByteString startKey = ByteString.EMPTY;
} catch (Exception e) {
// ignore error
logger.info("warm up store fails, ignored ", e);
}
}

private void warmUpRegion() {
try {
BackOffer backOffer = ConcreteBackOffer.newRawKVBackOff();
ByteString startKey = ByteString.EMPTY;
do {
TiRegion region = regionManager.getRegionByKey(startKey);
TiRegion region = regionManager.getRegionByKey(startKey, backOffer);
startKey = region.getEndKey();
} while (!startKey.isEmpty());
} catch (Exception e) {
// ignore error
logger.info("warm up region fails, ignored ", e);
}
}

private void warmUpGrpc() {
try {
RawKVClient rawKVClient = createRawClient();
ByteString exampleKey = ByteString.EMPTY;
ByteString prev = rawKVClient.get(exampleKey);
Expand All @@ -118,10 +141,7 @@ private synchronized void warmUp() {
rawKVClient.put(exampleKey, prev);
} catch (Exception e) {
// ignore error
logger.info("warm up fails, ignored ", e);
} finally {
logger.info(
String.format("warm up duration %d ms", System.currentTimeMillis() - warmUpStartTime));
logger.info("warm up grpc fails, ignored ", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) {
// onNotLeader is only needed when updateLeader succeeds, thus switch
// to a new store address.
TiRegion newRegion = this.regionManager.updateLeader(recv.getRegion(), newStoreId);
retry = newRegion != null && recv.onNotLeader(newRegion);
retry = newRegion != null && recv.onNotLeader(newRegion, backOffer);

backOffFuncType = BackOffFunction.BackOffFuncType.BoUpdateLeader;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void close() throws GrpcException {}
* @return false when re-split is needed.
*/
@Override
public boolean onNotLeader(TiRegion newRegion) {
public boolean onNotLeader(TiRegion newRegion, BackOffer backOffer) {
if (logger.isDebugEnabled()) {
logger.debug(region + ", new leader = " + newRegion.getLeader().getStoreId());
}
Expand All @@ -123,7 +123,7 @@ public boolean onNotLeader(TiRegion newRegion) {
store = null;
}
region = newRegion;
store = regionManager.getStoreById(region.getLeader().getStoreId());
store = regionManager.getStoreById(region.getLeader().getStoreId(), backOffer);
updateClientStub();
return true;
}
Expand Down Expand Up @@ -193,10 +193,10 @@ private Boolean seekLeaderStore(BackOffer backOffer) {

logger.info(String.format("try switch leader: region[%d]", region.getId()));

Metapb.Peer peer = switchLeaderStore();
Metapb.Peer peer = switchLeaderStore(backOffer);
if (peer != null) {
// we found a leader
TiStore currentLeaderStore = regionManager.getStoreById(peer.getStoreId());
TiStore currentLeaderStore = regionManager.getStoreById(peer.getStoreId(), backOffer);
if (currentLeaderStore.isReachable()) {
logger.info(
String.format(
Expand Down Expand Up @@ -232,7 +232,7 @@ private boolean seekProxyStore(BackOffer backOffer) {
try {
logger.info(String.format("try grpc forward: region[%d]", region.getId()));
// when current leader cannot be reached
TiStore storeWithProxy = switchProxyStore();
TiStore storeWithProxy = switchProxyStore(backOffer);
if (storeWithProxy == null) {
// no store available, retry
logger.warn(String.format("No store available, retry: region[%d]", region.getId()));
Expand All @@ -250,11 +250,11 @@ private boolean seekProxyStore(BackOffer backOffer) {
}

// first: leader peer, second: true if any responses returned with grpc error
private Metapb.Peer switchLeaderStore() {
private Metapb.Peer switchLeaderStore(BackOffer backOffer) {
List<SwitchLeaderTask> responses = new LinkedList<>();
for (Metapb.Peer peer : region.getFollowerList()) {
ByteString key = region.getStartKey();
TiStore peerStore = regionManager.getStoreById(peer.getStoreId());
TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer);
ManagedChannel channel =
channelFactory.getChannel(
peerStore.getAddress(), regionManager.getPDClient().getHostMapping());
Expand Down Expand Up @@ -300,12 +300,12 @@ private Metapb.Peer switchLeaderStore() {
}
}

private TiStore switchProxyStore() {
private TiStore switchProxyStore(BackOffer backOffer) {
long forwardTimeout = conf.getForwardTimeout();
List<ForwardCheckTask> responses = new LinkedList<>();
for (Metapb.Peer peer : region.getFollowerList()) {
ByteString key = region.getStartKey();
TiStore peerStore = regionManager.getStoreById(peer.getStoreId());
TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer);
ManagedChannel channel =
channelFactory.getChannel(
peerStore.getAddress(), regionManager.getPDClient().getHostMapping());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.tikv.common.util.BackOffer;

public interface RegionErrorReceiver {
boolean onNotLeader(TiRegion region);
boolean onNotLeader(TiRegion region, BackOffer backOffer);

/// return whether we need to retry this request.
boolean onStoreUnreachable(BackOffer backOffer);
Expand Down
17 changes: 11 additions & 6 deletions src/main/java/org/tikv/common/region/RegionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -228,13 +228,18 @@ public TiStore getStoreById(long id) {
}

public TiStore getStoreById(long id, BackOffer backOffer) {
TiStore store = getStoreByIdWithBackOff(id, backOffer);
if (store == null) {
logger.warn(String.format("failed to fetch store %d, the store may be missing", id));
cache.clearAll();
throw new InvalidStoreException(id);
SlowLogSpan span = backOffer.getSlowLog().start("getStoreById");
try {
TiStore store = getStoreByIdWithBackOff(id, backOffer);
if (store == null) {
logger.warn(String.format("failed to fetch store %d, the store may be missing", id));
cache.clearAll();
throw new InvalidStoreException(id);
}
return store;
} finally {
span.end();
}
return store;
}

public void onRegionStale(TiRegion region) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ public List<KvPair> scan(
boolean forWrite = false;
while (true) {
// we should refresh region
region = regionManager.getRegionByKey(startKey);
region = regionManager.getRegionByKey(startKey, backOffer);

Supplier<ScanRequest> request =
() ->
Expand Down