Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 11 additions & 35 deletions src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.tikv.common.log.SlowLogEmptyImpl;
import org.tikv.common.log.SlowLogSpan;
import org.tikv.common.util.ChannelFactory;
import org.tikv.common.util.Pair;
import org.tikv.kvproto.Kvrpcpb;
import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.TikvGrpc;
Expand Down Expand Up @@ -143,15 +142,6 @@ public boolean onStoreUnreachable(SlowLog slowLog) {
return true;
}

if (store.getProxyStore() == null && store.isReachable()) {
if (store.isReachable()) {
logger.info(
String.format(
"store[%d] for region[%d] is reachable, retry", store.getId(), region.getId()));
return true;
}
}

// seek an available leader store to send request
Boolean result = seekLeaderStore(slowLog);
if (result != null) {
Expand Down Expand Up @@ -206,26 +196,8 @@ private Boolean seekLeaderStore(SlowLog slowLog) {

logger.info(String.format("try switch leader: region[%d]", region.getId()));

Pair<Metapb.Peer, Boolean> pair = switchLeaderStore();
Metapb.Peer peer = pair.first;
boolean exceptionEncountered = pair.second;
if (peer == null) {
if (!exceptionEncountered) {
// all response returned normally, the leader is not elected, just wait until it is ready.
logger.info(
String.format(
"leader for region[%d] is not elected, just wait until it is ready",
region.getId()));
return true;
} else {
// no leader found, some response does not return normally, there may be network
// partition.
logger.warn(
String.format(
"leader for region[%d] is not found, it is possible that network partition occurred",
region.getId()));
}
} else {
Metapb.Peer peer = switchLeaderStore();
if (peer != null) {
// we found a leader
TiStore currentLeaderStore = regionManager.getStoreById(peer.getStoreId());
if (currentLeaderStore.isReachable()) {
Expand All @@ -240,6 +212,12 @@ private Boolean seekLeaderStore(SlowLog slowLog) {
updateClientStub();
return true;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add a log here to indicate that the leader is unreachable?

} else {
// no leader found, some response does not return normally, there may be network partition.
logger.warn(
String.format(
"leader for region[%d] is not found, it is possible that network partition occurred",
region.getId()));
}
} finally {
switchLeaderDurationTimer.observeDuration();
Expand Down Expand Up @@ -272,7 +250,7 @@ private boolean seekProxyStore(SlowLog slowLog) {
}

// first: leader peer, second: true if any responses returned with grpc error
private Pair<Metapb.Peer, Boolean> switchLeaderStore() {
private Metapb.Peer switchLeaderStore() {
List<SwitchLeaderTask> responses = new LinkedList<>();
for (Metapb.Peer peer : region.getFollowerList()) {
ByteString key = region.getStartKey();
Expand All @@ -290,7 +268,6 @@ private Pair<Metapb.Peer, Boolean> switchLeaderStore() {
ListenableFuture<Kvrpcpb.RawGetResponse> task = stub.rawGet(rawGetRequest);
responses.add(new SwitchLeaderTask(task, peer));
}
boolean exceptionEncountered = false;
while (true) {
try {
Thread.sleep(2);
Expand All @@ -310,15 +287,14 @@ private Pair<Metapb.Peer, Boolean> switchLeaderStore() {
// the peer is leader
logger.info(
String.format("rawGet response indicates peer[%d] is leader", task.peer.getId()));
return Pair.create(task.peer, exceptionEncountered);
return task.peer;
}
}
} catch (Exception ignored) {
exceptionEncountered = true;
}
}
if (unfinished.isEmpty()) {
return Pair.create(null, exceptionEncountered);
return null;
}
responses = unfinished;
}
Expand Down