Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions src/main/java/org/tikv/common/TiSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,34 +161,31 @@ private static VersionInfo getVersionInfo() {
}

private synchronized void warmUp() {
long warmUpStartTime = System.currentTimeMillis();
long warmUpStartTime = System.nanoTime();
BackOffer backOffer = ConcreteBackOffer.newRawKVBackOff();
try {
this.client = getPDClient();
this.regionManager = getRegionManager();
List<Metapb.Store> stores = this.client.getAllStores(ConcreteBackOffer.newGetBackOff());
List<Metapb.Store> stores = this.client.getAllStores(backOffer);
// warm up store cache
for (Metapb.Store store : stores) {
this.regionManager.updateStore(
null,
new TiStore(this.client.getStore(ConcreteBackOffer.newGetBackOff(), store.getId())));
null, new TiStore(this.client.getStore(backOffer, store.getId())));
}

// use scan region to load region cache with limit
ByteString startKey = ByteString.EMPTY;
do {
List<Pdpb.Region> regions =
regionManager.scanRegions(
ConcreteBackOffer.newGetBackOff(),
startKey,
ByteString.EMPTY,
conf.getScanRegionsLimit());
backOffer, startKey, ByteString.EMPTY, conf.getScanRegionsLimit());
if (regions == null || regions.isEmpty()) {
// something went wrong, but the warm-up process could continue
break;
}
for (Pdpb.Region region : regions) {
regionManager.insertRegionToCache(
regionManager.createRegion(region.getRegion(), ConcreteBackOffer.newGetBackOff()));
regionManager.createRegion(region.getRegion(), backOffer));
}
startKey = regions.get(regions.size() - 1).getRegion().getEndKey();
} while (!startKey.isEmpty());
Expand All @@ -211,7 +208,8 @@ private synchronized void warmUp() {
logger.info("warm up fails, ignored ", e);
} finally {
logger.info(
String.format("warm up duration %d ms", System.currentTimeMillis() - warmUpStartTime));
String.format(
"warm up duration %d ms", (System.nanoTime() - warmUpStartTime) / 1_000_000));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) {
// onNotLeader is only needed when updateLeader succeeds, thus switch
// to a new store address.
TiRegion newRegion = this.regionManager.updateLeader(recv.getRegion(), newStoreId);
retry = newRegion != null && recv.onNotLeader(newRegion);
retry = newRegion != null && recv.onNotLeader(newRegion, backOffer);

backOffFuncType = BackOffFunction.BackOffFuncType.BoUpdateLeader;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void close() throws GrpcException {}
* @return false when re-split is needed.
*/
@Override
public boolean onNotLeader(TiRegion newRegion) {
public boolean onNotLeader(TiRegion newRegion, BackOffer backOffer) {
if (logger.isDebugEnabled()) {
logger.debug(region + ", new leader = " + newRegion.getLeader().getStoreId());
}
Expand All @@ -123,7 +123,7 @@ public boolean onNotLeader(TiRegion newRegion) {
store = null;
}
region = newRegion;
store = regionManager.getStoreById(region.getLeader().getStoreId());
store = regionManager.getStoreById(region.getLeader().getStoreId(), backOffer);
updateClientStub();
return true;
}
Expand Down Expand Up @@ -193,10 +193,10 @@ private Boolean seekLeaderStore(BackOffer backOffer) {

logger.info(String.format("try switch leader: region[%d]", region.getId()));

Metapb.Peer peer = switchLeaderStore();
Metapb.Peer peer = switchLeaderStore(backOffer);
if (peer != null) {
// we found a leader
TiStore currentLeaderStore = regionManager.getStoreById(peer.getStoreId());
TiStore currentLeaderStore = regionManager.getStoreById(peer.getStoreId(), backOffer);
if (currentLeaderStore.isReachable()) {
logger.info(
String.format(
Expand Down Expand Up @@ -232,7 +232,7 @@ private boolean seekProxyStore(BackOffer backOffer) {
try {
logger.info(String.format("try grpc forward: region[%d]", region.getId()));
// when current leader cannot be reached
TiStore storeWithProxy = switchProxyStore();
TiStore storeWithProxy = switchProxyStore(backOffer);
if (storeWithProxy == null) {
// no store available, retry
logger.warn(String.format("No store available, retry: region[%d]", region.getId()));
Expand All @@ -250,11 +250,11 @@ private boolean seekProxyStore(BackOffer backOffer) {
}

// first: leader peer, second: true if any responses returned with grpc error
private Metapb.Peer switchLeaderStore() {
private Metapb.Peer switchLeaderStore(BackOffer backOffer) {
List<SwitchLeaderTask> responses = new LinkedList<>();
for (Metapb.Peer peer : region.getFollowerList()) {
ByteString key = region.getStartKey();
TiStore peerStore = regionManager.getStoreById(peer.getStoreId());
TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer);
ManagedChannel channel =
channelFactory.getChannel(
peerStore.getAddress(), regionManager.getPDClient().getHostMapping());
Expand Down Expand Up @@ -300,12 +300,12 @@ private Metapb.Peer switchLeaderStore() {
}
}

private TiStore switchProxyStore() {
private TiStore switchProxyStore(BackOffer backOffer) {
long forwardTimeout = conf.getForwardTimeout();
List<ForwardCheckTask> responses = new LinkedList<>();
for (Metapb.Peer peer : region.getFollowerList()) {
ByteString key = region.getStartKey();
TiStore peerStore = regionManager.getStoreById(peer.getStoreId());
TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer);
ManagedChannel channel =
channelFactory.getChannel(
peerStore.getAddress(), regionManager.getPDClient().getHostMapping());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.tikv.common.util.BackOffer;

public interface RegionErrorReceiver {
boolean onNotLeader(TiRegion region);
boolean onNotLeader(TiRegion region, BackOffer backOffer);

/// return whether we need to retry this request.
boolean onStoreUnreachable(BackOffer backOffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ public List<KvPair> scan(
boolean forWrite = false;
while (true) {
// we should refresh region
region = regionManager.getRegionByKey(startKey);
region = regionManager.getRegionByKey(startKey, backOffer);

Supplier<ScanRequest> request =
() ->
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/tikv/common/util/ConcreteBackOffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,6 @@ public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType) {
}

public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType, long maxSleepMs) {
SlowLogSpan slowLogSpan = getSlowLog().start("backoff " + funcType.name());
Histogram.Timer backOffTimer = BACKOFF_DURATION.labels(funcType.name()).startTimer();
BackOffFunction backOffFunction =
backOffFunctionMap.computeIfAbsent(funcType, this::createBackOffFunc);

Expand All @@ -185,6 +183,8 @@ public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType, long
}
}

Histogram.Timer backOffTimer = BACKOFF_DURATION.labels(funcType.name()).startTimer();
SlowLogSpan slowLogSpan = getSlowLog().start("backoff " + funcType.name());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why move it here? should BACKOFF_DURATION also be moved?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a return in line 184, which will cause "end":"N/A",.
it's hard to understand it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got. then why not move BACKOFF_DURATION to here as well?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving these lines to here might cause we can't see the backoff in slow log.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may end this span while returning though we will see this backoff end shortly but this span is still there.

try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
Expand Down
1 change: 1 addition & 0 deletions src/test/java/org/tikv/BaseRawKVTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ protected TiConfiguration createTiConfiguration() {
conf.setTest(true);
conf.setEnableAtomicForCAS(true);
conf.setEnableGrpcForward(false);
conf.setEnableAtomicForCAS(true);
return conf;
}
}