Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/main/java/org/tikv/common/ConfigUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,17 @@ public class ConfigUtils {
public static final String TIKV_NETWORK_MAPPING_NAME = "tikv.network.mapping";
public static final String TIKV_ENABLE_GRPC_FORWARD = "tikv.enable_grpc_forward";
public static final String TIKV_GRPC_HEALTH_CHECK_TIMEOUT = "tikv.grpc.health_check_timeout";
public static final String TIKV_HEALTH_CHECK_PERIOD_DURATION =
"tikv.health_check_period_duration";

public static final String TIKV_ENABLE_ATOMIC_FOR_CAS = "tikv.enable_atomic_for_cas";

public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379";
public static final String DEF_TIMEOUT = "300ms";
public static final String DEF_FORWARD_TIMEOUT = "600ms";
public static final String DEF_TIMEOUT = "200ms";
public static final String DEF_FORWARD_TIMEOUT = "300ms";
public static final String DEF_SCAN_TIMEOUT = "20s";
public static final int DEF_CHECK_HEALTH_TIMEOUT = 100;
public static final int DEF_HEALTH_CHECK_PERIOD_DURATION = 300;
public static final int DEF_SCAN_BATCH_SIZE = 10240;
public static final int DEF_MAX_FRAME_SIZE = 268435456 * 2; // 256 * 2 MB
public static final int DEF_INDEX_SCAN_BATCH_SIZE = 20000;
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/org/tikv/common/TiConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ private static void loadFromDefaultProperties() {
setIfMissing(TIKV_NETWORK_MAPPING_NAME, DEF_TIKV_NETWORK_MAPPING_NAME);
setIfMissing(TIKV_ENABLE_GRPC_FORWARD, DEF_GRPC_FORWARD_ENABLE);
setIfMissing(TIKV_GRPC_HEALTH_CHECK_TIMEOUT, DEF_CHECK_HEALTH_TIMEOUT);
setIfMissing(TIKV_HEALTH_CHECK_PERIOD_DURATION, DEF_HEALTH_CHECK_PERIOD_DURATION);
setIfMissing(TIKV_ENABLE_ATOMIC_FOR_CAS, DEF_TIKV_ENABLE_ATOMIC_FOR_CAS);
}

Expand Down Expand Up @@ -265,6 +266,7 @@ private static ReplicaRead getReplicaRead(String key) {
private boolean metricsEnable = getBoolean(TIKV_METRICS_ENABLE);
private int metricsPort = getInt(TIKV_METRICS_PORT);
private int grpcHealthCheckTimeout = getInt(TIKV_GRPC_HEALTH_CHECK_TIMEOUT);
private int healthCheckPeriodDuration = getInt(TIKV_HEALTH_CHECK_PERIOD_DURATION);

private final String networkMappingName = get(TIKV_NETWORK_MAPPING_NAME);
private HostMapping hostMapping = null;
Expand Down Expand Up @@ -573,6 +575,10 @@ public long getGrpcHealthCheckTimeout() {
return this.grpcHealthCheckTimeout;
}

public long getHealthCheckPeriodDuration() {
return this.healthCheckPeriodDuration;
}

public boolean isEnableAtomicForCAS() {
return enableAtomicForCAS;
}
Expand Down
20 changes: 1 addition & 19 deletions src/main/java/org/tikv/common/TiSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,10 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.catalog.Catalog;
import org.tikv.common.event.CacheInvalidateEvent;
import org.tikv.common.exception.TiKVException;
import org.tikv.common.key.Key;
import org.tikv.common.meta.TiTimestamp;
Expand All @@ -56,7 +54,6 @@ public class TiSession implements AutoCloseable {
private static final Map<String, TiSession> sessionCachedMap = new HashMap<>();
private final TiConfiguration conf;
private final ChannelFactory channelFactory;
private Function<CacheInvalidateEvent, Void> cacheInvalidateCallback;
// below object creation is either heavy or making connection (pd), pending for lazy loading
private volatile PDClient client;
private volatile Catalog catalog;
Expand Down Expand Up @@ -182,13 +179,7 @@ public RegionManager getRegionManager() {
if (res == null) {
synchronized (this) {
if (regionManager == null) {
regionManager =
new RegionManager(
getConf(),
getPDClient(),
this.cacheInvalidateCallback,
this.channelFactory,
this.enableGrpcForward);
regionManager = new RegionManager(getConf(), getPDClient(), this.channelFactory);
}
res = regionManager;
}
Expand Down Expand Up @@ -331,15 +322,6 @@ public ChannelFactory getChannelFactory() {
return channelFactory;
}

/**
* This is used for setting call back function to invalidate cache information
*
* @param callBackFunc callback function
*/
public void injectCallBackFunc(Function<CacheInvalidateEvent, Void> callBackFunc) {
this.cacheInvalidateCallback = callBackFunc;
}

/**
* split region and scatter
*
Expand Down
32 changes: 18 additions & 14 deletions src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,14 @@ public boolean onNotLeader(TiRegion newRegion) {

@Override
public boolean onStoreUnreachable() {
if (!targetStore.isValid()) {
logger.warn(
String.format("store [%d] has been invalid", region.getId(), targetStore.getId()));
targetStore = regionManager.getStoreById(targetStore.getId());
updateClientStub();
return true;
}

if (targetStore.getProxyStore() == null) {
if (targetStore.isReachable()) {
return true;
Expand Down Expand Up @@ -238,20 +246,22 @@ private boolean retryOtherStoreLeader() {

private void updateClientStub() {
String addressStr = targetStore.getStore().getAddress();
if (targetStore.getProxyStore() != null) {
addressStr = targetStore.getProxyStore().getAddress();
}
ManagedChannel channel =
channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping());
blockingStub = TikvGrpc.newBlockingStub(channel);
asyncStub = TikvGrpc.newStub(channel);
if (targetStore.getProxyStore() != null) {
Metadata header = new Metadata();
header.put(TiConfiguration.FORWARD_META_DATA_KEY, targetStore.getStore().getAddress());
blockingStub = MetadataUtils.attachHeaders(blockingStub, header);
asyncStub = MetadataUtils.attachHeaders(asyncStub, header);
}
}

private boolean retryOtherStoreByProxyForward() {
if (!targetStore.isValid()) {
targetStore = regionManager.getStoreById(targetStore.getId());
logger.warn(
String.format("store [%d] has been invalid", region.getId(), targetStore.getId()));
return true;
}

TiStore proxyStore = switchProxyStore();
if (proxyStore == null) {
logger.warn(
Expand All @@ -268,19 +278,13 @@ private boolean retryOtherStoreByProxyForward() {
}
targetStore = proxyStore;
retryForwardTimes += 1;
updateClientStub();
logger.warn(
String.format(
"forward request to store [%s] by store [%s] for region[%d]",
targetStore.getStore().getAddress(),
targetStore.getProxyStore().getAddress(),
region.getId()));
String addressStr = targetStore.getProxyStore().getAddress();
ManagedChannel channel =
channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping());
Metadata header = new Metadata();
header.put(TiConfiguration.FORWARD_META_DATA_KEY, targetStore.getStore().getAddress());
blockingStub = MetadataUtils.attachHeaders(TikvGrpc.newBlockingStub(channel), header);
asyncStub = MetadataUtils.attachHeaders(TikvGrpc.newStub(channel), header);
return true;
}

Expand Down
70 changes: 10 additions & 60 deletions src/main/java/org/tikv/common/region/RegionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,11 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.ReadOnlyPDClient;
import org.tikv.common.TiConfiguration;
import org.tikv.common.event.CacheInvalidateEvent;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.TiClientInternalException;
import org.tikv.common.util.BackOffer;
Expand Down Expand Up @@ -59,50 +57,24 @@ public class RegionManager {
private final ScheduledExecutorService executor;
private final StoreHealthyChecker storeChecker;

private final Function<CacheInvalidateEvent, Void> cacheInvalidateCallback;

// To avoid double retrieval, we used the async version of grpc
// When rpc not returned, instead of call again, it wait for previous one done
public RegionManager(
TiConfiguration conf,
ReadOnlyPDClient pdClient,
Function<CacheInvalidateEvent, Void> cacheInvalidateCallback) {
this.cache = new RegionCache();
this.pdClient = pdClient;
this.conf = conf;
this.cacheInvalidateCallback = cacheInvalidateCallback;
this.executor = null;
this.storeChecker = null;
}

public RegionManager(
TiConfiguration conf,
ReadOnlyPDClient pdClient,
Function<CacheInvalidateEvent, Void> cacheInvalidateCallback,
ChannelFactory channelFactory,
boolean enableGrpcForward) {
TiConfiguration conf, ReadOnlyPDClient pdClient, ChannelFactory channelFactory) {
this.cache = new RegionCache();
this.cacheInvalidateCallback = cacheInvalidateCallback;
this.pdClient = pdClient;
this.conf = conf;

if (enableGrpcForward) {
StoreHealthyChecker storeChecker =
new StoreHealthyChecker(channelFactory, pdClient, this.cache);
this.storeChecker = storeChecker;
this.executor = Executors.newScheduledThreadPool(1);
this.executor.scheduleAtFixedRate(storeChecker, 1, 1, TimeUnit.SECONDS);
} else {
this.storeChecker = null;
this.executor = null;
}
long period = conf.getHealthCheckPeriodDuration();
StoreHealthyChecker storeChecker =
new StoreHealthyChecker(
channelFactory, pdClient, this.cache, conf.getGrpcHealthCheckTimeout());
this.storeChecker = storeChecker;
this.executor = Executors.newScheduledThreadPool(1);
this.executor.scheduleAtFixedRate(storeChecker, period, period, TimeUnit.MILLISECONDS);
}

public RegionManager(TiConfiguration conf, ReadOnlyPDClient pdClient) {
this.cache = new RegionCache();
this.pdClient = pdClient;
this.conf = conf;
this.cacheInvalidateCallback = null;
this.storeChecker = null;
this.executor = null;
}
Expand All @@ -113,10 +85,6 @@ public synchronized void close() {
}
}

public Function<CacheInvalidateEvent, Void> getCacheInvalidateCallback() {
return cacheInvalidateCallback;
}

public ReadOnlyPDClient getPDClient() {
return this.pdClient;
}
Expand Down Expand Up @@ -234,7 +202,7 @@ public TiStore getStoreById(long id, BackOffer backOffer) {
if (store.getStore().getState().equals(StoreState.Tombstone)) {
return null;
}
if (cache.putStore(id, store)) {
if (cache.putStore(id, store) && storeChecker != null) {
storeChecker.scheduleStoreHealthCheck(store);
}
return store;
Expand Down Expand Up @@ -266,7 +234,7 @@ public TiRegion updateLeader(TiRegion region, long storeId) {
}

public synchronized void updateStore(TiStore oldStore, TiStore newStore) {
if (cache.updateStore(oldStore, newStore)) {
if (cache.updateStore(oldStore, newStore) && storeChecker != null) {
storeChecker.scheduleStoreHealthCheck(newStore);
}
}
Expand All @@ -285,24 +253,6 @@ public synchronized void onRequestFail(TiRegion region) {
cache.invalidateRegion(region);
}

/** If region has changed, return the new one and update cache. */
public TiRegion getRegionSkipCache(TiRegion region) {
BackOffer backOffer = ConcreteBackOffer.newGetBackOff();
try {
Pair<Metapb.Region, Metapb.Peer> regionAndLeader =
pdClient.getRegionByID(backOffer, region.getId());
if (!regionAndLeader.first.equals(region.getMeta())) {
region = createRegion(regionAndLeader.first, regionAndLeader.second, backOffer);
return cache.putRegion(region);
} else {
logger.warn("Cannot get region from PD for region id: " + region.getId());
return null;
}
} catch (Exception e) {
return null;
}
}

public void invalidateStore(long storeId) {
cache.invalidateStore(storeId);
}
Expand Down
9 changes: 5 additions & 4 deletions src/main/java/org/tikv/common/region/StoreHealthyChecker.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,21 @@
public class StoreHealthyChecker implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(StoreHealthyChecker.class);
private static final long MAX_CHECK_STORE_TOMBSTONE_TICK = 60;
private static final long SLEEP_MILLI_SECONDS_AFTER_DOUBLE_CHECK = 500;
private BlockingQueue<TiStore> taskQueue;
private final ChannelFactory channelFactory;
private final ReadOnlyPDClient pdClient;
private final RegionCache cache;
private long checkTombstoneTick;
private long timeout;

public StoreHealthyChecker(
ChannelFactory channelFactory, ReadOnlyPDClient pdClient, RegionCache cache) {
ChannelFactory channelFactory, ReadOnlyPDClient pdClient, RegionCache cache, long timeout) {
this.taskQueue = new LinkedBlockingQueue<>();
this.channelFactory = channelFactory;
this.pdClient = pdClient;
this.cache = cache;
this.checkTombstoneTick = 0;
this.timeout = timeout;
}

public boolean scheduleStoreHealthCheck(TiStore store) {
Expand Down Expand Up @@ -64,7 +65,7 @@ private boolean checkStoreHealth(TiStore store) {
try {
ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping());
HealthGrpc.HealthBlockingStub stub =
HealthGrpc.newBlockingStub(channel).withDeadlineAfter(200, TimeUnit.MILLISECONDS);
HealthGrpc.newBlockingStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS);
HealthCheckRequest req = HealthCheckRequest.newBuilder().build();
HealthCheckResponse resp = stub.check(req);
if (resp.getStatus() == HealthCheckResponse.ServingStatus.SERVING) {
Expand Down Expand Up @@ -133,7 +134,7 @@ public void run() {
}
if (!unreachableStore.isEmpty()) {
try {
Thread.sleep(SLEEP_MILLI_SECONDS_AFTER_DOUBLE_CHECK);
Thread.sleep(timeout);
} catch (Exception e) {
this.taskQueue.addAll(unreachableStore);
return;
Expand Down
8 changes: 2 additions & 6 deletions src/test/java/org/tikv/common/RegionManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void getRegionByKey() throws Exception {
int confVer = 1026;
int ver = 1027;
long regionId = 233;
String testAddress = "testAddress";
String testAddress = "127.0.0.1";
pdServer.addGetRegionResp(
GrpcUtils.makeGetRegionResponse(
pdServer.getClusterId(),
Expand Down Expand Up @@ -92,11 +92,7 @@ public void getRegionByKey() throws Exception {

// This will in turn invoke rpc and results in an error
// since we set just one rpc response
try {
mgr.getRegionByKey(searchKeyNotExists);
fail();
} catch (Exception ignored) {
}
assertNull(mgr.getRegionByKey(searchKeyNotExists));
}

@Test
Expand Down