Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/main/java/org/tikv/common/exception/InvalidStoreException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.tikv.common.exception;

public class InvalidStoreException extends TiKVException {

public InvalidStoreException(long storeId) {
super(String.format("Invalid storeId: %d", storeId));
}
}
5 changes: 5 additions & 0 deletions src/main/java/org/tikv/common/operation/PDErrorHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.tikv.common.pd.PDError.buildFromPdpbError;

import io.grpc.StatusRuntimeException;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -74,6 +75,10 @@ public boolean handleResponseError(BackOffer backOffer, RespT resp) {

@Override
public boolean handleRequestError(BackOffer backOffer, Exception e) {
// store id is not found
if (e instanceof StatusRuntimeException && e.getMessage().contains("invalid store ID")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we directly check whether the instance is of InvalidStoreException?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, should the error message be Invalid storeId?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, the exception tikv returns is a grpc exception, it is a StatusRuntimeException that only contains "invalid store id xx" information in its message.

return false;
}
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoPDRPC, e);
client.updateLeaderOrforwardFollower();
return true;
Expand Down
30 changes: 20 additions & 10 deletions src/main/java/org/tikv/common/region/RegionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.tikv.common.ReadOnlyPDClient;
import org.tikv.common.TiConfiguration;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.InvalidStoreException;
import org.tikv.common.exception.TiClientInternalException;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ChannelFactory;
Expand Down Expand Up @@ -152,9 +153,6 @@ public Pair<TiRegion, TiStore> getRegionStorePairByKey(
if (storeType == TiStoreType.TiKV) {
Peer peer = region.getCurrentReplica();
store = getStoreById(peer.getStoreId(), backOffer);
if (store == null) {
cache.clearAll();
}
} else {
outerLoop:
for (Peer peer : region.getLearnerList()) {
Expand All @@ -168,16 +166,11 @@ public Pair<TiRegion, TiStore> getRegionStorePairByKey(
}
}
if (store == null) {
// clear the region cache so we may get the learner peer next time
// clear the region cache, so we may get the learner peer next time
cache.invalidateRegion(region);
}
}

if (store == null) {
throw new TiClientInternalException(
"Cannot find valid store on " + storeType + " for region " + region.toString());
}

return Pair.create(region, store);
}

Expand All @@ -200,13 +193,20 @@ private List<TiStore> getRegionStore(List<Metapb.Peer> peers, BackOffer backOffe
.collect(Collectors.toList());
}

public TiStore getStoreById(long id, BackOffer backOffer) {
private TiStore getStoreByIdWithBackOff(long id, BackOffer backOffer) {
try {
TiStore store = cache.getStoreById(id);
if (store == null) {
store = new TiStore(pdClient.getStore(backOffer, id));
}
// if we did not get store info from pd, remove store from cache
if (store.getStore() == null) {
logger.warn(String.format("failed to get store %d from pd", id));
return null;
}
// if the store is already tombstone, remove store from cache
if (store.getStore().getState().equals(StoreState.Tombstone)) {
logger.warn(String.format("store %d is tombstone", id));
return null;
}
if (cache.putStore(id, store) && storeChecker != null) {
Expand All @@ -222,6 +222,16 @@ public TiStore getStoreById(long id) {
return getStoreById(id, defaultBackOff());
}

public TiStore getStoreById(long id, BackOffer backOffer) {
TiStore store = getStoreByIdWithBackOff(id, backOffer);
if (store == null) {
logger.warn(String.format("failed to fetch store %d, the store may be missing", id));
cache.clearAll();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should log info or warn messages for this critical operation.

throw new InvalidStoreException(id);
}
return store;
}

public void onRegionStale(TiRegion region) {
cache.invalidateRegion(region);
}
Expand Down
32 changes: 13 additions & 19 deletions src/main/java/org/tikv/raw/RawKVClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,8 @@ private void put(ByteString key, ByteString value, long ttl, boolean atomic) {
ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS(), slowLog);
try {
while (true) {
RegionStoreClient client = clientBuilder.build(key, backOffer);
slowLog.addProperty("region", client.getRegion().toString());
try {
try (RegionStoreClient client = clientBuilder.build(key, backOffer)) {
slowLog.addProperty("region", client.getRegion().toString());
client.rawPut(backOffer, key, value, ttl, atomic);
RAW_REQUEST_SUCCESS.labels(label).inc();
return;
Expand Down Expand Up @@ -203,9 +202,8 @@ public ByteString putIfAbsent(ByteString key, ByteString value, long ttl) {
ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS(), slowLog);
try {
while (true) {
RegionStoreClient client = clientBuilder.build(key, backOffer);
slowLog.addProperty("region", client.getRegion().toString());
try {
try (RegionStoreClient client = clientBuilder.build(key, backOffer)) {
slowLog.addProperty("region", client.getRegion().toString());
ByteString result = client.rawPutIfAbsent(backOffer, key, value, ttl);
RAW_REQUEST_SUCCESS.labels(label).inc();
return result;
Expand Down Expand Up @@ -311,9 +309,8 @@ public ByteString get(ByteString key) {
ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVReadTimeoutInMS(), slowLog);
try {
while (true) {
RegionStoreClient client = clientBuilder.build(key, backOffer);
slowLog.addProperty("region", client.getRegion().toString());
try {
try (RegionStoreClient client = clientBuilder.build(key, backOffer)) {
slowLog.addProperty("region", client.getRegion().toString());
ByteString result = client.rawGet(backOffer, key);
RAW_REQUEST_SUCCESS.labels(label).inc();
return result;
Expand Down Expand Up @@ -434,9 +431,8 @@ public Long getKeyTTL(ByteString key) {
ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVReadTimeoutInMS(), slowLog);
try {
while (true) {
RegionStoreClient client = clientBuilder.build(key, backOffer);
slowLog.addProperty("region", client.getRegion().toString());
try {
try (RegionStoreClient client = clientBuilder.build(key, backOffer)) {
slowLog.addProperty("region", client.getRegion().toString());
Long result = client.rawGetKeyTTL(backOffer, key);
RAW_REQUEST_SUCCESS.labels(label).inc();
return result;
Expand Down Expand Up @@ -709,9 +705,8 @@ private void delete(ByteString key, boolean atomic) {
ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS(), slowLog);
try {
while (true) {
RegionStoreClient client = clientBuilder.build(key, backOffer);
slowLog.addProperty("region", client.getRegion().toString());
try {
try (RegionStoreClient client = clientBuilder.build(key, backOffer)) {
slowLog.addProperty("region", client.getRegion().toString());
client.rawDelete(backOffer, key, atomic);
RAW_REQUEST_SUCCESS.labels(label).inc();
return;
Expand Down Expand Up @@ -887,8 +882,8 @@ private List<KvPair> doSendBatchGet(BackOffer backOffer, List<ByteString> keys,

private Pair<List<Batch>, List<KvPair>> doSendBatchGetInBatchesWithRetry(
BackOffer backOffer, Batch batch) {
RegionStoreClient client = clientBuilder.build(batch.getRegion(), backOffer);
try {

try (RegionStoreClient client = clientBuilder.build(batch.getRegion(), backOffer)) {
List<KvPair> partialResult = client.rawBatchGet(backOffer, batch.getKeys());
return Pair.create(new ArrayList<>(), partialResult);
} catch (final TiKVException e) {
Expand Down Expand Up @@ -939,8 +934,7 @@ private void doSendBatchDelete(

private List<Batch> doSendBatchDeleteInBatchesWithRetry(
BackOffer backOffer, Batch batch, boolean atomic) {
RegionStoreClient client = clientBuilder.build(batch.getRegion(), backOffer);
try {
try (RegionStoreClient client = clientBuilder.build(batch.getRegion(), backOffer)) {
client.rawBatchDelete(backOffer, batch.getKeys(), atomic);
return new ArrayList<>();
} catch (final TiKVException e) {
Expand Down
7 changes: 6 additions & 1 deletion src/test/java/org/tikv/common/RegionManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,12 @@ public void getStoreById() throws Exception {
StoreState.Tombstone,
GrpcUtils.makeStoreLabel("k1", "v1"),
GrpcUtils.makeStoreLabel("k2", "v2"))));
assertNull(mgr.getStoreById(storeId + 1));

try {
mgr.getStoreById(storeId + 1);
fail();
} catch (Exception ignored) {
}

mgr.invalidateStore(storeId);
try {
Expand Down