From 289c1187bb58e42b71e192dea2bb977f2a46bd03 Mon Sep 17 00:00:00 2001 From: birdstorm Date: Sat, 20 Nov 2021 22:40:27 +0800 Subject: [PATCH 1/5] fix pd request throws invalid store id Signed-off-by: birdstorm --- src/main/java/org/tikv/common/PDClient.java | 4 +-- .../exception/InvalidStoreException.java | 23 +++++++++++++ .../tikv/common/operation/PDErrorHandler.java | 6 ++++ .../org/tikv/common/region/RegionManager.java | 28 +++++++++------- src/main/java/org/tikv/raw/RawKVClient.java | 32 ++++++++----------- 5 files changed, 60 insertions(+), 33 deletions(-) create mode 100644 src/main/java/org/tikv/common/exception/InvalidStoreException.java diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index 59e16a5c5a6..9d39348e97a 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -247,7 +247,7 @@ public Pair getRegionByKey(BackOffer backOffer, Byte GetRegionResponse resp = callWithRetry(backOffer, PDGrpc.getGetRegionMethod(), request, handler); - return new Pair(decodeRegion(resp.getRegion()), resp.getLeader()); + return Pair.create(decodeRegion(resp.getRegion()), resp.getLeader()); } finally { requestTimer.observeDuration(); } @@ -262,7 +262,7 @@ public Pair getRegionByID(BackOffer backOffer, long GetRegionResponse resp = callWithRetry(backOffer, PDGrpc.getGetRegionByIDMethod(), request, handler); - return new Pair(decodeRegion(resp.getRegion()), resp.getLeader()); + return Pair.create(decodeRegion(resp.getRegion()), resp.getLeader()); } private Supplier buildGetStoreReq(long storeId) { diff --git a/src/main/java/org/tikv/common/exception/InvalidStoreException.java b/src/main/java/org/tikv/common/exception/InvalidStoreException.java new file mode 100644 index 00000000000..4d854762dc1 --- /dev/null +++ b/src/main/java/org/tikv/common/exception/InvalidStoreException.java @@ -0,0 +1,23 @@ +/* + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.tikv.common.exception; + +public class InvalidStoreException extends TiKVException { + + public InvalidStoreException(long storeId) { + super(String.format("Invalid storeId = %d", storeId)); + } +} diff --git a/src/main/java/org/tikv/common/operation/PDErrorHandler.java b/src/main/java/org/tikv/common/operation/PDErrorHandler.java index 0d280da5eef..20d49abfc87 100644 --- a/src/main/java/org/tikv/common/operation/PDErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/PDErrorHandler.java @@ -20,6 +20,8 @@ import static org.tikv.common.pd.PDError.buildFromPdpbError; import java.util.function.Function; + +import io.grpc.StatusRuntimeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.PDClient; @@ -75,6 +77,10 @@ public boolean handleResponseError(BackOffer backOffer, RespT resp) { @Override public boolean handleRequestError(BackOffer backOffer, Exception e) { backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoPDRPC, e); + // store id is not found + if (e instanceof StatusRuntimeException && e.getMessage().contains("invalid store ID")) { + return false; + } client.updateLeaderOrforwardFollower(); return true; } diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 46b0cab2cc4..c2a7d8d0060 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -31,7 +31,9 @@ import org.tikv.common.ReadOnlyPDClient; import org.tikv.common.TiConfiguration; import org.tikv.common.exception.GrpcException; +import org.tikv.common.exception.InvalidStoreException; import org.tikv.common.exception.TiClientInternalException; +import org.tikv.common.exception.TiKVException; import org.tikv.common.util.BackOffer; import org.tikv.common.util.ChannelFactory; import org.tikv.common.util.ConcreteBackOffer; @@ -145,16 +147,13 @@ public Pair getRegionStorePairByKey( ByteString key, TiStoreType storeType, BackOffer backOffer) { TiRegion region = getRegionByKey(key, backOffer); if (!region.isValid()) { - throw new TiClientInternalException("Region invalid: " + region.toString()); + throw new TiClientInternalException("Region invalid: " + region); } TiStore store = null; if (storeType == TiStoreType.TiKV) { Peer peer = region.getCurrentReplica(); store = getStoreById(peer.getStoreId(), backOffer); - if (store == null) { - cache.clearAll(); - } } else { outerLoop: for (Peer peer : region.getLearnerList()) { @@ -168,16 +167,11 @@ public Pair getRegionStorePairByKey( } } if (store == null) { - // clear the region cache so we may get the learner peer next time + // clear the region cache, so we may get the learner peer next time cache.invalidateRegion(region); } } - if (store == null) { - throw new TiClientInternalException( - "Cannot find valid store on " + storeType + " for region " + region.toString()); - } - return Pair.create(region, store); } @@ -200,13 +194,14 @@ private List getRegionStore(List peers, BackOffer backOffe .collect(Collectors.toList()); } - public TiStore getStoreById(long id, BackOffer backOffer) { + private TiStore getStoreByIdWithBackOff(long id, BackOffer backOffer) { try { TiStore store = cache.getStoreById(id); if (store == null) { store = new TiStore(pdClient.getStore(backOffer, id)); } - if (store.getStore().getState().equals(StoreState.Tombstone)) { + // if we did not get store info from pd or the store is already tombstone, remove store from cache + if (store.getStore() == null || store.getStore().getState().equals(StoreState.Tombstone)) { return null; } if (cache.putStore(id, store) && storeChecker != null) { @@ -222,6 +217,15 @@ public TiStore getStoreById(long id) { return getStoreById(id, defaultBackOff()); } + public TiStore getStoreById(long id, BackOffer backOffer) { + TiStore store = getStoreByIdWithBackOff(id, backOffer); + if (store == null || store.getStore() == null) { + cache.clearAll(); + throw new InvalidStoreException(id); + } + return store; + } + public void onRegionStale(TiRegion region) { cache.invalidateRegion(region); } diff --git a/src/main/java/org/tikv/raw/RawKVClient.java b/src/main/java/org/tikv/raw/RawKVClient.java index e488ab71886..d68e162912d 100644 --- a/src/main/java/org/tikv/raw/RawKVClient.java +++ b/src/main/java/org/tikv/raw/RawKVClient.java @@ -146,9 +146,8 @@ private void put(ByteString key, ByteString value, long ttl, boolean atomic) { ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS(), slowLog); try { while (true) { - RegionStoreClient client = clientBuilder.build(key, backOffer); - slowLog.addProperty("region", client.getRegion().toString()); - try { + try (RegionStoreClient client = clientBuilder.build(key, backOffer)) { + slowLog.addProperty("region", client.getRegion().toString()); client.rawPut(backOffer, key, value, ttl, atomic); RAW_REQUEST_SUCCESS.labels(label).inc(); return; @@ -203,9 +202,8 @@ public ByteString putIfAbsent(ByteString key, ByteString value, long ttl) { ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS(), slowLog); try { while (true) { - RegionStoreClient client = clientBuilder.build(key, backOffer); - slowLog.addProperty("region", client.getRegion().toString()); - try { + try (RegionStoreClient client = clientBuilder.build(key, backOffer)) { + slowLog.addProperty("region", client.getRegion().toString()); ByteString result = client.rawPutIfAbsent(backOffer, key, value, ttl); RAW_REQUEST_SUCCESS.labels(label).inc(); return result; @@ -311,9 +309,8 @@ public ByteString get(ByteString key) { ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVReadTimeoutInMS(), slowLog); try { while (true) { - RegionStoreClient client = clientBuilder.build(key, backOffer); - slowLog.addProperty("region", client.getRegion().toString()); - try { + try (RegionStoreClient client = clientBuilder.build(key, backOffer)) { + slowLog.addProperty("region", client.getRegion().toString()); ByteString result = client.rawGet(backOffer, key); RAW_REQUEST_SUCCESS.labels(label).inc(); return result; @@ -434,9 +431,8 @@ public Long getKeyTTL(ByteString key) { ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVReadTimeoutInMS(), slowLog); try { while (true) { - RegionStoreClient client = clientBuilder.build(key, backOffer); - slowLog.addProperty("region", client.getRegion().toString()); - try { + try (RegionStoreClient client = clientBuilder.build(key, backOffer)) { + slowLog.addProperty("region", client.getRegion().toString()); Long result = client.rawGetKeyTTL(backOffer, key); RAW_REQUEST_SUCCESS.labels(label).inc(); return result; @@ -709,9 +705,8 @@ private void delete(ByteString key, boolean atomic) { ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS(), slowLog); try { while (true) { - RegionStoreClient client = clientBuilder.build(key, backOffer); - slowLog.addProperty("region", client.getRegion().toString()); - try { + try (RegionStoreClient client = clientBuilder.build(key, backOffer)) { + slowLog.addProperty("region", client.getRegion().toString()); client.rawDelete(backOffer, key, atomic); RAW_REQUEST_SUCCESS.labels(label).inc(); return; @@ -887,8 +882,8 @@ private List doSendBatchGet(BackOffer backOffer, List keys, private Pair, List> doSendBatchGetInBatchesWithRetry( BackOffer backOffer, Batch batch) { - RegionStoreClient client = clientBuilder.build(batch.getRegion(), backOffer); - try { + + try (RegionStoreClient client = clientBuilder.build(batch.getRegion(), backOffer)) { List partialResult = client.rawBatchGet(backOffer, batch.getKeys()); return Pair.create(new ArrayList<>(), partialResult); } catch (final TiKVException e) { @@ -939,8 +934,7 @@ private void doSendBatchDelete( private List doSendBatchDeleteInBatchesWithRetry( BackOffer backOffer, Batch batch, boolean atomic) { - RegionStoreClient client = clientBuilder.build(batch.getRegion(), backOffer); - try { + try (RegionStoreClient client = clientBuilder.build(batch.getRegion(), backOffer)) { client.rawBatchDelete(backOffer, batch.getKeys(), atomic); return new ArrayList<>(); } catch (final TiKVException e) { From 251e6e96468e21bf9e3e90dafd85c6b330b4e7ea Mon Sep 17 00:00:00 2001 From: birdstorm Date: Sat, 20 Nov 2021 22:57:51 +0800 Subject: [PATCH 2/5] format Signed-off-by: birdstorm --- src/main/java/org/tikv/common/PDClient.java | 4 ++-- src/main/java/org/tikv/common/operation/PDErrorHandler.java | 3 +-- src/main/java/org/tikv/common/region/RegionManager.java | 6 +++--- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index 9d39348e97a..59e16a5c5a6 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -247,7 +247,7 @@ public Pair getRegionByKey(BackOffer backOffer, Byte GetRegionResponse resp = callWithRetry(backOffer, PDGrpc.getGetRegionMethod(), request, handler); - return Pair.create(decodeRegion(resp.getRegion()), resp.getLeader()); + return new Pair(decodeRegion(resp.getRegion()), resp.getLeader()); } finally { requestTimer.observeDuration(); } @@ -262,7 +262,7 @@ public Pair getRegionByID(BackOffer backOffer, long GetRegionResponse resp = callWithRetry(backOffer, PDGrpc.getGetRegionByIDMethod(), request, handler); - return Pair.create(decodeRegion(resp.getRegion()), resp.getLeader()); + return new Pair(decodeRegion(resp.getRegion()), resp.getLeader()); } private Supplier buildGetStoreReq(long storeId) { diff --git a/src/main/java/org/tikv/common/operation/PDErrorHandler.java b/src/main/java/org/tikv/common/operation/PDErrorHandler.java index 20d49abfc87..6381c43e9e1 100644 --- a/src/main/java/org/tikv/common/operation/PDErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/PDErrorHandler.java @@ -19,9 +19,8 @@ import static org.tikv.common.pd.PDError.buildFromPdpbError; -import java.util.function.Function; - import io.grpc.StatusRuntimeException; +import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.PDClient; diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index c2a7d8d0060..e8c33b17270 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -33,7 +33,6 @@ import org.tikv.common.exception.GrpcException; import org.tikv.common.exception.InvalidStoreException; import org.tikv.common.exception.TiClientInternalException; -import org.tikv.common.exception.TiKVException; import org.tikv.common.util.BackOffer; import org.tikv.common.util.ChannelFactory; import org.tikv.common.util.ConcreteBackOffer; @@ -200,7 +199,8 @@ private TiStore getStoreByIdWithBackOff(long id, BackOffer backOffer) { if (store == null) { store = new TiStore(pdClient.getStore(backOffer, id)); } - // if we did not get store info from pd or the store is already tombstone, remove store from cache + // if we did not get store info from pd or the store is already tombstone, remove store from + // cache if (store.getStore() == null || store.getStore().getState().equals(StoreState.Tombstone)) { return null; } @@ -219,7 +219,7 @@ public TiStore getStoreById(long id) { public TiStore getStoreById(long id, BackOffer backOffer) { TiStore store = getStoreByIdWithBackOff(id, backOffer); - if (store == null || store.getStore() == null) { + if (store == null) { cache.clearAll(); throw new InvalidStoreException(id); } From e54fa15e36fda76d144bc077fbbc067cf552a936 Mon Sep 17 00:00:00 2001 From: birdstorm Date: Sat, 20 Nov 2021 23:09:20 +0800 Subject: [PATCH 3/5] refactor Signed-off-by: birdstorm --- .../org/tikv/common/operation/PDErrorHandler.java | 2 +- .../java/org/tikv/common/region/RegionManager.java | 13 +++++++++---- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/tikv/common/operation/PDErrorHandler.java b/src/main/java/org/tikv/common/operation/PDErrorHandler.java index 6381c43e9e1..421755164ee 100644 --- a/src/main/java/org/tikv/common/operation/PDErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/PDErrorHandler.java @@ -75,11 +75,11 @@ public boolean handleResponseError(BackOffer backOffer, RespT resp) { @Override public boolean handleRequestError(BackOffer backOffer, Exception e) { - backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoPDRPC, e); // store id is not found if (e instanceof StatusRuntimeException && e.getMessage().contains("invalid store ID")) { return false; } + backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoPDRPC, e); client.updateLeaderOrforwardFollower(); return true; } diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index e8c33b17270..1fc8693ae88 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -146,7 +146,7 @@ public Pair getRegionStorePairByKey( ByteString key, TiStoreType storeType, BackOffer backOffer) { TiRegion region = getRegionByKey(key, backOffer); if (!region.isValid()) { - throw new TiClientInternalException("Region invalid: " + region); + throw new TiClientInternalException("Region invalid: " + region.toString()); } TiStore store = null; @@ -199,9 +199,14 @@ private TiStore getStoreByIdWithBackOff(long id, BackOffer backOffer) { if (store == null) { store = new TiStore(pdClient.getStore(backOffer, id)); } - // if we did not get store info from pd or the store is already tombstone, remove store from - // cache - if (store.getStore() == null || store.getStore().getState().equals(StoreState.Tombstone)) { + // if we did not get store info from pd, remove store from cache + if (store.getStore() == null) { + logger.warn(String.format("failed to get store %d from pd", id)); + return null; + } + // if the store is already tombstone, remove store from cache + if (store.getStore().getState().equals(StoreState.Tombstone)) { + logger.warn(String.format("store %d is tombstone", id)); return null; } if (cache.putStore(id, store) && storeChecker != null) { From 07fad513f6bbf405dd318d1aa6bf642294ed073f Mon Sep 17 00:00:00 2001 From: birdstorm Date: Sun, 21 Nov 2021 15:10:48 +0800 Subject: [PATCH 4/5] resolve comments Signed-off-by: birdstorm --- .../java/org/tikv/common/exception/InvalidStoreException.java | 2 +- src/main/java/org/tikv/common/region/RegionManager.java | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/tikv/common/exception/InvalidStoreException.java b/src/main/java/org/tikv/common/exception/InvalidStoreException.java index 4d854762dc1..e97b4c677ee 100644 --- a/src/main/java/org/tikv/common/exception/InvalidStoreException.java +++ b/src/main/java/org/tikv/common/exception/InvalidStoreException.java @@ -18,6 +18,6 @@ public class InvalidStoreException extends TiKVException { public InvalidStoreException(long storeId) { - super(String.format("Invalid storeId = %d", storeId)); + super(String.format("Invalid storeId: %d", storeId)); } } diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 1fc8693ae88..c0fb0aabfe8 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -225,6 +225,7 @@ public TiStore getStoreById(long id) { public TiStore getStoreById(long id, BackOffer backOffer) { TiStore store = getStoreByIdWithBackOff(id, backOffer); if (store == null) { + logger.warn(String.format("failed to fetch store %d, the store may be missing", id)); cache.clearAll(); throw new InvalidStoreException(id); } From fe9c73f5a357897af5afaf3955b41a20e86fe534 Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Sun, 21 Nov 2021 23:56:29 +0800 Subject: [PATCH 5/5] fix test Signed-off-by: marsishandsome --- src/test/java/org/tikv/common/RegionManagerTest.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/test/java/org/tikv/common/RegionManagerTest.java b/src/test/java/org/tikv/common/RegionManagerTest.java index 003830c3eb6..483f8a9e6fb 100644 --- a/src/test/java/org/tikv/common/RegionManagerTest.java +++ b/src/test/java/org/tikv/common/RegionManagerTest.java @@ -161,7 +161,12 @@ public void getStoreById() throws Exception { StoreState.Tombstone, GrpcUtils.makeStoreLabel("k1", "v1"), GrpcUtils.makeStoreLabel("k2", "v2")))); - assertNull(mgr.getStoreById(storeId + 1)); + + try { + mgr.getStoreById(storeId + 1); + fail(); + } catch (Exception ignored) { + } mgr.invalidateStore(storeId); try {