Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions src/main/java/org/tikv/common/TiSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,21 +92,22 @@ public TiSession(TiConfiguration conf) {
}

private synchronized void warmUp() {
long warmUpStartTime = System.currentTimeMillis();
long warmUpStartTime = System.nanoTime();
BackOffer backOffer = ConcreteBackOffer.newRawKVBackOff();

try {
this.client = getPDClient();
this.regionManager = getRegionManager();
List<Metapb.Store> stores = this.client.getAllStores(ConcreteBackOffer.newGetBackOff());
List<Metapb.Store> stores = this.client.getAllStores(backOffer);
// warm up store cache
for (Metapb.Store store : stores) {
this.regionManager.updateStore(
null,
new TiStore(this.client.getStore(ConcreteBackOffer.newGetBackOff(), store.getId())));
null, new TiStore(this.client.getStore(backOffer, store.getId())));
}
ByteString startKey = ByteString.EMPTY;

do {
TiRegion region = regionManager.getRegionByKey(startKey);
TiRegion region = regionManager.getRegionByKey(startKey, backOffer);
startKey = region.getEndKey();
} while (!startKey.isEmpty());

Expand All @@ -121,7 +122,8 @@ private synchronized void warmUp() {
logger.info("warm up fails, ignored ", e);
} finally {
logger.info(
String.format("warm up duration %d ms", System.currentTimeMillis() - warmUpStartTime));
String.format(
"warm up duration %d ms", (System.nanoTime() - warmUpStartTime) / 1_000_000));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) {
// onNotLeader is only needed when updateLeader succeeds, thus switch
// to a new store address.
TiRegion newRegion = this.regionManager.updateLeader(recv.getRegion(), newStoreId);
retry = newRegion != null && recv.onNotLeader(newRegion);
retry = newRegion != null && recv.onNotLeader(newRegion, backOffer);

backOffFuncType = BackOffFunction.BackOffFuncType.BoUpdateLeader;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void close() throws GrpcException {}
* @return false when re-split is needed.
*/
@Override
public boolean onNotLeader(TiRegion newRegion) {
public boolean onNotLeader(TiRegion newRegion, BackOffer backOffer) {
if (logger.isDebugEnabled()) {
logger.debug(region + ", new leader = " + newRegion.getLeader().getStoreId());
}
Expand All @@ -123,7 +123,7 @@ public boolean onNotLeader(TiRegion newRegion) {
store = null;
}
region = newRegion;
store = regionManager.getStoreById(region.getLeader().getStoreId());
store = regionManager.getStoreById(region.getLeader().getStoreId(), backOffer);
updateClientStub();
return true;
}
Expand Down Expand Up @@ -193,10 +193,10 @@ private Boolean seekLeaderStore(BackOffer backOffer) {

logger.info(String.format("try switch leader: region[%d]", region.getId()));

Metapb.Peer peer = switchLeaderStore();
Metapb.Peer peer = switchLeaderStore(backOffer);
if (peer != null) {
// we found a leader
TiStore currentLeaderStore = regionManager.getStoreById(peer.getStoreId());
TiStore currentLeaderStore = regionManager.getStoreById(peer.getStoreId(), backOffer);
if (currentLeaderStore.isReachable()) {
logger.info(
String.format(
Expand Down Expand Up @@ -232,7 +232,7 @@ private boolean seekProxyStore(BackOffer backOffer) {
try {
logger.info(String.format("try grpc forward: region[%d]", region.getId()));
// when current leader cannot be reached
TiStore storeWithProxy = switchProxyStore();
TiStore storeWithProxy = switchProxyStore(backOffer);
if (storeWithProxy == null) {
// no store available, retry
logger.warn(String.format("No store available, retry: region[%d]", region.getId()));
Expand All @@ -250,11 +250,11 @@ private boolean seekProxyStore(BackOffer backOffer) {
}

// first: leader peer, second: true if any responses returned with grpc error
private Metapb.Peer switchLeaderStore() {
private Metapb.Peer switchLeaderStore(BackOffer backOffer) {
List<SwitchLeaderTask> responses = new LinkedList<>();
for (Metapb.Peer peer : region.getFollowerList()) {
ByteString key = region.getStartKey();
TiStore peerStore = regionManager.getStoreById(peer.getStoreId());
TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer);
ManagedChannel channel =
channelFactory.getChannel(
peerStore.getAddress(), regionManager.getPDClient().getHostMapping());
Expand Down Expand Up @@ -300,12 +300,12 @@ private Metapb.Peer switchLeaderStore() {
}
}

private TiStore switchProxyStore() {
private TiStore switchProxyStore(BackOffer backOffer) {
long forwardTimeout = conf.getForwardTimeout();
List<ForwardCheckTask> responses = new LinkedList<>();
for (Metapb.Peer peer : region.getFollowerList()) {
ByteString key = region.getStartKey();
TiStore peerStore = regionManager.getStoreById(peer.getStoreId());
TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer);
ManagedChannel channel =
channelFactory.getChannel(
peerStore.getAddress(), regionManager.getPDClient().getHostMapping());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.tikv.common.util.BackOffer;

public interface RegionErrorReceiver {
boolean onNotLeader(TiRegion region);
boolean onNotLeader(TiRegion region, BackOffer backOffer);

/// return whether we need to retry this request.
boolean onStoreUnreachable(BackOffer backOffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ public List<KvPair> scan(
boolean forWrite = false;
while (true) {
// we should refresh region
region = regionManager.getRegionByKey(startKey);
region = regionManager.getRegionByKey(startKey, backOffer);

Supplier<ScanRequest> request =
() ->
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/tikv/common/util/ConcreteBackOffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,6 @@ public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType) {
}

public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType, long maxSleepMs) {
SlowLogSpan slowLogSpan = getSlowLog().start("backoff " + funcType.name());
Histogram.Timer backOffTimer = BACKOFF_DURATION.labels(funcType.name()).startTimer();
BackOffFunction backOffFunction =
backOffFunctionMap.computeIfAbsent(funcType, this::createBackOffFunc);

Expand All @@ -185,6 +183,8 @@ public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType, long
}
}

Histogram.Timer backOffTimer = BACKOFF_DURATION.labels(funcType.name()).startTimer();
SlowLogSpan slowLogSpan = getSlowLog().start("backoff " + funcType.name());
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
Expand Down