Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions src/main/java/org/tikv/common/region/RegionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void invalidateAll() {
}

public TiRegion getRegionByKey(ByteString key) {
return getRegionByKey(key, ConcreteBackOffer.newGetBackOff());
return getRegionByKey(key, defaultBackOff());
}

public TiRegion getRegionByKey(ByteString key, BackOffer backOffer) {
Expand Down Expand Up @@ -124,7 +124,7 @@ public TiRegion getRegionByKey(ByteString key, BackOffer backOffer) {
// Consider region A, B. After merge of (A, B) -> A, region ID B does not exist.
// This request is unrecoverable.
public TiRegion getRegionById(long regionId) {
BackOffer backOffer = ConcreteBackOffer.newGetBackOff();
BackOffer backOffer = defaultBackOff();
TiRegion region = cache.getRegionById(regionId);
if (region == null) {
Pair<Metapb.Region, Metapb.Peer> regionAndLeader =
Expand All @@ -144,7 +144,7 @@ public Pair<TiRegion, TiStore> getRegionStorePairByKey(ByteString key) {
}

public Pair<TiRegion, TiStore> getRegionStorePairByKey(ByteString key, TiStoreType storeType) {
return getRegionStorePairByKey(key, storeType, ConcreteBackOffer.newGetBackOff());
return getRegionStorePairByKey(key, storeType, defaultBackOff());
}

public Pair<TiRegion, TiStore> getRegionStorePairByKey(
Expand Down Expand Up @@ -216,7 +216,7 @@ public TiStore getStoreById(long id, BackOffer backOffer) {
}

public TiStore getStoreById(long id) {
return getStoreById(id, ConcreteBackOffer.newGetBackOff());
return getStoreById(id, defaultBackOff());
}

public void onRegionStale(TiRegion region) {
Expand Down Expand Up @@ -264,4 +264,8 @@ public void invalidateStore(long storeId) {
public void invalidateRegion(TiRegion region) {
cache.invalidateRegion(region);
}

private BackOffer defaultBackOff() {
return ConcreteBackOffer.newCustomBackOff(conf.getRawKVDefaultBackoffInMS());
}
}
24 changes: 22 additions & 2 deletions src/main/java/org/tikv/common/region/RegionStoreClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1302,19 +1302,39 @@ public synchronized RegionStoreClient build(ByteString key) throws GrpcException
return build(key, TiStoreType.TiKV);
}

public synchronized RegionStoreClient build(ByteString key, BackOffer backOffer)
throws GrpcException {
return build(key, TiStoreType.TiKV, backOffer);
}

public synchronized RegionStoreClient build(ByteString key, TiStoreType storeType)
throws GrpcException {
Pair<TiRegion, TiStore> pair = regionManager.getRegionStorePairByKey(key, storeType);
return build(key, storeType, defaultBackOff());
}

public synchronized RegionStoreClient build(
ByteString key, TiStoreType storeType, BackOffer backOffer) throws GrpcException {
Pair<TiRegion, TiStore> pair =
regionManager.getRegionStorePairByKey(key, storeType, backOffer);
return build(pair.first, pair.second, storeType);
}

public synchronized RegionStoreClient build(TiRegion region) throws GrpcException {
TiStore store = regionManager.getStoreById(region.getLeader().getStoreId());
return build(region, defaultBackOff());
}

public synchronized RegionStoreClient build(TiRegion region, BackOffer backOffer)
throws GrpcException {
TiStore store = regionManager.getStoreById(region.getLeader().getStoreId(), backOffer);
return build(region, store, TiStoreType.TiKV);
}

public RegionManager getRegionManager() {
return regionManager;
}

private BackOffer defaultBackOff() {
return ConcreteBackOffer.newCustomBackOff(conf.getRawKVDefaultBackoffInMS());
}
}
}
1 change: 0 additions & 1 deletion src/main/java/org/tikv/common/util/BackOffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
public interface BackOffer {
// Back off types.
int seconds = 1000;
int COP_BUILD_TASK_MAX_BACKOFF = 5 * seconds;
int TSO_MAX_BACKOFF = 5 * seconds;
int SCANNER_NEXT_MAX_BACKOFF = 40 * seconds;
int BATCH_GET_MAX_BACKOFF = 40 * seconds;
Expand Down
16 changes: 8 additions & 8 deletions src/main/java/org/tikv/raw/RawKVClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void put(ByteString key, ByteString value, long ttl) {
try {
BackOffer backOffer = defaultBackOff();
while (true) {
RegionStoreClient client = clientBuilder.build(key);
RegionStoreClient client = clientBuilder.build(key, backOffer);
try {
client.rawPut(backOffer, key, value, ttl, atomicForCAS);
RAW_REQUEST_SUCCESS.labels(label).inc();
Expand Down Expand Up @@ -211,7 +211,7 @@ public void compareAndSet(
try {
BackOffer backOffer = defaultBackOff();
while (true) {
RegionStoreClient client = clientBuilder.build(key);
RegionStoreClient client = clientBuilder.build(key, backOffer);
try {
client.rawCompareAndSet(backOffer, key, prevValue, value, ttl);
RAW_REQUEST_SUCCESS.labels(label).inc();
Expand Down Expand Up @@ -269,7 +269,7 @@ public Optional<ByteString> get(ByteString key) {
try {
BackOffer backOffer = defaultBackOff();
while (true) {
RegionStoreClient client = clientBuilder.build(key);
RegionStoreClient client = clientBuilder.build(key, backOffer);
try {
Optional<ByteString> result = client.rawGet(defaultBackOff(), key);
RAW_REQUEST_SUCCESS.labels(label).inc();
Expand Down Expand Up @@ -342,7 +342,7 @@ public Optional<Long> getKeyTTL(ByteString key) {
try {
BackOffer backOffer = defaultBackOff();
while (true) {
RegionStoreClient client = clientBuilder.build(key);
RegionStoreClient client = clientBuilder.build(key, backOffer);
try {
Optional<Long> result = client.rawGetKeyTTL(defaultBackOff(), key);
RAW_REQUEST_SUCCESS.labels(label).inc();
Expand Down Expand Up @@ -590,7 +590,7 @@ public void delete(ByteString key) {
try {
BackOffer backOffer = defaultBackOff();
while (true) {
RegionStoreClient client = clientBuilder.build(key);
RegionStoreClient client = clientBuilder.build(key, backOffer);
try {
client.rawDelete(defaultBackOff(), key, atomicForCAS);
RAW_REQUEST_SUCCESS.labels(label).inc();
Expand Down Expand Up @@ -820,7 +820,7 @@ private List<KvPair> doSendBatchGet(BackOffer backOffer, List<ByteString> keys)

private Pair<List<Batch>, List<KvPair>> doSendBatchGetInBatchesWithRetry(
BackOffer backOffer, Batch batch) {
RegionStoreClient client = clientBuilder.build(batch.getRegion());
RegionStoreClient client = clientBuilder.build(batch.getRegion(), backOffer);
try {
List<KvPair> partialResult = client.rawBatchGet(backOffer, batch.getKeys());
return Pair.create(new ArrayList<>(), partialResult);
Expand Down Expand Up @@ -860,7 +860,7 @@ private void doSendBatchDelete(BackOffer backOffer, List<ByteString> keys) {
}

private List<Batch> doSendBatchDeleteInBatchesWithRetry(BackOffer backOffer, Batch batch) {
RegionStoreClient client = clientBuilder.build(batch.getRegion());
RegionStoreClient client = clientBuilder.build(batch.getRegion(), backOffer);
try {
client.rawBatchDelete(backOffer, batch.getKeys(), atomicForCAS);
return new ArrayList<>();
Expand Down Expand Up @@ -910,7 +910,7 @@ private void doSendDeleteRange(BackOffer backOffer, ByteString startKey, ByteStr
}

private List<DeleteRange> doSendDeleteRangeWithRetry(BackOffer backOffer, DeleteRange range) {
try (RegionStoreClient client = clientBuilder.build(range.getRegion())) {
try (RegionStoreClient client = clientBuilder.build(range.getRegion(), backOffer)) {
client.setTimeout(conf.getScanTimeout());
client.rawDeleteRange(backOffer, range.getStartKey(), range.getEndKey());
return new ArrayList<>();
Expand Down