Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions src/main/java/org/tikv/common/AbstractGRPCClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ public long getTimeout() {

private boolean doCheckHealth(BackOffer backOffer, String addressStr, HostMapping hostMapping) {
while (true) {
backOffer.checkTimeout();

try {
ManagedChannel channel = channelFactory.getChannel(addressStr, hostMapping);
HealthGrpc.HealthBlockingStub stub =
Expand All @@ -198,10 +200,6 @@ private boolean doCheckHealth(BackOffer backOffer, String addressStr, HostMappin
}

protected boolean checkHealth(BackOffer backOffer, String addressStr, HostMapping hostMapping) {
try {
return doCheckHealth(backOffer, addressStr, hostMapping);
} catch (Exception e) {
return false;
}
return doCheckHealth(backOffer, addressStr, hostMapping);
}
}
34 changes: 28 additions & 6 deletions src/main/java/org/tikv/common/PDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,12 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.http.client.methods.CloseableHttpResponse;
Expand Down Expand Up @@ -122,6 +125,8 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDFutureStub>
private ConcurrentMap<Long, Double> tiflashReplicaMap;
private HostMapping hostMapping;
private long lastUpdateLeaderTime;
private final ExecutorService updateLeaderService = Executors.newSingleThreadExecutor();
private final AtomicBoolean updateLeaderNotify = new AtomicBoolean();

public static final Histogram PD_GET_REGION_BY_KEY_REQUEST_LATENCY =
HistogramUtils.buildDuration()
Expand Down Expand Up @@ -426,6 +431,8 @@ public void close() throws InterruptedException {
if (channelFactory != null) {
channelFactory.close();
}

updateLeaderService.shutdownNow();
}

@VisibleForTesting
Expand Down Expand Up @@ -462,11 +469,7 @@ private GetMembersResponse doGetMembers(BackOffer backOffer, URI uri) {
}

private GetMembersResponse getMembers(BackOffer backOffer, URI uri) {
try {
return doGetMembers(backOffer, uri);
} catch (Exception e) {
return null;
}
return doGetMembers(backOffer, uri);
}

// return whether the leader has changed to target address `leaderUrlStr`.
Expand Down Expand Up @@ -518,7 +521,26 @@ synchronized boolean createFollowerClientWrapper(
return true;
}

public synchronized void updateLeaderOrForwardFollower(BackOffer backOffer) {
public void tryUpdateLeaderOrForwardFollower() {
if (updateLeaderNotify.compareAndSet(false, true)) {
try {
BackOffer backOffer = defaultBackOffer();
updateLeaderService.submit(
() -> {
try {
updateLeaderOrForwardFollower(backOffer);
} finally {
updateLeaderNotify.set(false);
}
});
} catch (RejectedExecutionException e) {
logger.error("PDClient is shutdown", e);
updateLeaderNotify.set(false);
}
}
}

private synchronized void updateLeaderOrForwardFollower(BackOffer backOffer) {
if (System.currentTimeMillis() - lastUpdateLeaderTime < MIN_TRY_UPDATE_DURATION) {
return;
}
Expand Down
15 changes: 13 additions & 2 deletions src/main/java/org/tikv/common/operation/PDErrorHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.tikv.common.PDClient;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.TiClientInternalException;
import org.tikv.common.log.SlowLogSpan;
import org.tikv.common.pd.PDError;
import org.tikv.common.util.BackOffFunction;
import org.tikv.common.util.BackOffer;
Expand Down Expand Up @@ -59,7 +60,12 @@ public boolean handleResponseError(BackOffer backOffer, RespT resp) {
case PD_ERROR:
backOffer.doBackOff(
BackOffFunction.BackOffFuncType.BoPDRPC, new GrpcException(error.toString()));
client.updateLeaderOrForwardFollower(backOffer);
SlowLogSpan tryUpdateLeaderSpan = backOffer.getSlowLog().start("try_update_leader");
try {
client.tryUpdateLeaderOrForwardFollower();
} finally {
tryUpdateLeaderSpan.end();
}
return true;
case REGION_PEER_NOT_ELECTED:
logger.debug(error.getMessage());
Expand All @@ -80,7 +86,12 @@ public boolean handleRequestError(BackOffer backOffer, Exception e) {
return false;
}
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoPDRPC, e);
client.updateLeaderOrForwardFollower(backOffer);
SlowLogSpan updateLeaderSpan = backOffer.getSlowLog().start("try_update_leader");
try {
client.tryUpdateLeaderOrForwardFollower();
} finally {
updateLeaderSpan.end();
}
return true;
}
}
2 changes: 1 addition & 1 deletion src/main/java/org/tikv/common/region/RegionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public Pair<TiRegion, TiStore> getRegionStorePairByKey(ByteString key, TiStoreTy
public Pair<TiRegion, TiStore> getRegionStorePairByKey(
ByteString key, TiStoreType storeType, BackOffer backOffer) {
TiRegion region = getRegionByKey(key, backOffer);
if (!region.isValid()) {
if (region == null || !region.isValid()) {
throw new TiClientInternalException("Region invalid: " + region);
}

Expand Down
20 changes: 8 additions & 12 deletions src/main/java/org/tikv/common/region/RegionStoreClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1408,38 +1408,34 @@ public RegionStoreClient build(TiRegion region, TiStore store, TiStoreType store
this);
}

public synchronized RegionStoreClient build(TiRegion region, TiStore store)
throws GrpcException {
public RegionStoreClient build(TiRegion region, TiStore store) throws GrpcException {
return build(region, store, TiStoreType.TiKV);
}

public synchronized RegionStoreClient build(ByteString key) throws GrpcException {
public RegionStoreClient build(ByteString key) throws GrpcException {
return build(key, TiStoreType.TiKV);
}

public synchronized RegionStoreClient build(ByteString key, BackOffer backOffer)
throws GrpcException {
public RegionStoreClient build(ByteString key, BackOffer backOffer) throws GrpcException {
return build(key, TiStoreType.TiKV, backOffer);
}

public synchronized RegionStoreClient build(ByteString key, TiStoreType storeType)
throws GrpcException {
public RegionStoreClient build(ByteString key, TiStoreType storeType) throws GrpcException {
return build(key, storeType, defaultBackOff());
}

public synchronized RegionStoreClient build(
ByteString key, TiStoreType storeType, BackOffer backOffer) throws GrpcException {
public RegionStoreClient build(ByteString key, TiStoreType storeType, BackOffer backOffer)
throws GrpcException {
Pair<TiRegion, TiStore> pair =
regionManager.getRegionStorePairByKey(key, storeType, backOffer);
return build(pair.first, pair.second, storeType);
}

public synchronized RegionStoreClient build(TiRegion region) throws GrpcException {
public RegionStoreClient build(TiRegion region) throws GrpcException {
return build(region, defaultBackOff());
}

public synchronized RegionStoreClient build(TiRegion region, BackOffer backOffer)
throws GrpcException {
public RegionStoreClient build(TiRegion region, BackOffer backOffer) throws GrpcException {
TiStore store = regionManager.getStoreById(region.getLeader().getStoreId(), backOffer);
return build(region, store, TiStoreType.TiKV);
}
Expand Down
33 changes: 15 additions & 18 deletions src/test/java/org/tikv/common/PDClientV2MockTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,16 @@ public void testGetRegionById() throws Exception {
String start = "getRegionById";
String end = "getRegionByIdEnd";
leader.addGetRegionByIDListener(request -> makeGetRegionResponse(start, end));
try (PDClient client = createClient()) {
Metapb.Region r = client.getRegionByID(ConcreteBackOffer.newRawKVBackOff(), 1).first;
Assert.assertEquals(start, r.getStartKey().toStringUtf8());
Assert.assertEquals(end, r.getEndKey().toStringUtf8());
}
PDClient client = createClient();
Metapb.Region r = client.getRegionByID(ConcreteBackOffer.newRawKVBackOff(), 1).first;
Assert.assertEquals(start, r.getStartKey().toStringUtf8());
Assert.assertEquals(end, r.getEndKey().toStringUtf8());

leader.addGetRegionByIDListener(request -> makeGetRegionResponse(start, ""));
try (PDClient client = createClient()) {
Metapb.Region r = client.getRegionByID(ConcreteBackOffer.newRawKVBackOff(), 1).first;
Assert.assertEquals(start, r.getStartKey().toStringUtf8());
Assert.assertEquals("", r.getEndKey().toStringUtf8());
}

r = client.getRegionByID(ConcreteBackOffer.newRawKVBackOff(), 1).first;
Assert.assertEquals(start, r.getStartKey().toStringUtf8());
Assert.assertEquals("", r.getEndKey().toStringUtf8());
}

@Test
Expand All @@ -113,15 +111,14 @@ public void testScanRegions() throws Exception {
.addRegions(Pdpb.Region.newBuilder().setRegion(makeRegion(start, end)).build())
.build());

try (PDClient client = createClient()) {
List<Region> regions =
client.scanRegions(
ConcreteBackOffer.newRawKVBackOff(), ByteString.EMPTY, ByteString.EMPTY, 1);
PDClient client = createClient();
List<Region> regions =
client.scanRegions(
ConcreteBackOffer.newRawKVBackOff(), ByteString.EMPTY, ByteString.EMPTY, 1);

for (Region r : regions) {
Assert.assertEquals(start, r.getRegion().getStartKey().toStringUtf8());
Assert.assertEquals(end, r.getRegion().getEndKey().toStringUtf8());
}
for (Region r : regions) {
Assert.assertEquals(start, r.getRegion().getStartKey().toStringUtf8());
Assert.assertEquals(end, r.getRegion().getEndKey().toStringUtf8());
}
}
}
7 changes: 5 additions & 2 deletions src/test/java/org/tikv/common/TimeoutTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,12 @@ public void testTimeoutInTime() {
try (RawKVClient client = createClient()) {
pdServers.get(0).stop();
long start = System.currentTimeMillis();
client.get(ByteString.copyFromUtf8("key"));
try {
client.get(ByteString.copyFromUtf8("key"));
} catch (Exception ignore) {
}
long end = System.currentTimeMillis();
Assert.assertTrue(end - start < session.getConf().getRawKVReadTimeoutInMS() * 2L);
Assert.assertTrue(end - start < (session.getConf().getRawKVReadTimeoutInMS() * 1.5));
}
}
}