Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/main/java/org/tikv/common/event/CacheInvalidateEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ public String toString() {

public enum CacheType implements Serializable {
REGION_STORE,
STORE,
REGION,
REQ_FAILED,
LEADER
}
Expand Down
71 changes: 62 additions & 9 deletions src/main/java/org/tikv/common/operation/RegionErrorHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
import io.grpc.StatusRuntimeException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.codec.KeyUtils;
import org.tikv.common.event.CacheInvalidateEvent;
import org.tikv.common.event.CacheInvalidateEvent.CacheType;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.TiKVException;
import org.tikv.common.region.RegionErrorReceiver;
Expand All @@ -43,6 +46,11 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
private final Function<RespT, Errorpb.Error> getRegionError;
private final RegionManager regionManager;
private final RegionErrorReceiver recv;
private final List<Function<CacheInvalidateEvent, Void>> cacheInvalidateCallBackList;

private final ExecutorService callBackThreadPool;
private final int INVALID_STORE_ID = 0;
private final int INVALID_REGION_ID = 0;

public RegionErrorHandler(
RegionManager regionManager,
Expand All @@ -51,6 +59,8 @@ public RegionErrorHandler(
this.recv = recv;
this.regionManager = regionManager;
this.getRegionError = getRegionError;
this.cacheInvalidateCallBackList = regionManager.getCacheInvalidateCallbackList();
this.callBackThreadPool = regionManager.getCallBackThreadPool();
}

@Override
Expand Down Expand Up @@ -107,6 +117,7 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) {

if (!retry) {
this.regionManager.invalidateRegion(recv.getRegion());
notifyRegionLeaderError(recv.getRegion());
}

backOffer.doBackOff(backOffFuncType, new GrpcException(error.toString()));
Expand All @@ -116,15 +127,14 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) {
// this error is reported from raftstore:
// store_id requested at the moment is inconsistent with that expected
// Solution:re-fetch from PD
long storeId = recv.getRegion().getLeader().getStoreId();
long storeId = error.getStoreNotMatch().getRequestStoreId();
long actualStoreId = error.getStoreNotMatch().getActualStoreId();
logger.warn(
String.format(
"Store Not Match happened with region id %d, store id %d, actual store id %d",
recv.getRegion().getId(), storeId, actualStoreId));

this.regionManager.invalidateRegion(recv.getRegion());
this.regionManager.invalidateStore(storeId);
// may request store which is not leader.
invalidateRegionStoreCache(recv.getRegion(), storeId);
// assume this is a low probability error, do not retry, just re-split the request by
// throwing it out.
return false;
Expand All @@ -143,8 +153,6 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) {
BackOffFunction.BackOffFuncType.BoServerBusy,
new StatusRuntimeException(
Status.fromCode(Status.Code.UNAVAILABLE).withDescription(error.toString())));
backOffer.doBackOff(
BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(error.getMessage()));
return true;
} else if (error.hasStaleCommand()) {
// this error is reported from raftstore:
Expand Down Expand Up @@ -179,7 +187,7 @@ else if (error.getMessage().contains("TsoBatchUsedUp")) {
logger.warn(String.format("Unknown error %s for region [%s]", error, recv.getRegion()));
// For other errors, we only drop cache here.
// Upper level may split this task.
invalidateRegionStoreCache(recv.getRegion());
invalidateRegionStoreCache(recv.getRegion(), recv.getRegion().getLeader().getStoreId());
// retry if raft proposal is dropped, it indicates the store is in the middle of transition
if (error.getMessage().contains("Raft ProposalDropped")) {
backOffer.doBackOff(
Expand All @@ -196,6 +204,7 @@ else if (error.getMessage().contains("TsoBatchUsedUp")) {
private boolean onRegionEpochNotMatch(BackOffer backOffer, List<Metapb.Region> currentRegions) {
if (currentRegions.size() == 0) {
this.regionManager.onRegionStale(recv.getRegion());
notifyRegionCacheInvalidate(recv.getRegion());
return false;
}

Expand Down Expand Up @@ -229,6 +238,7 @@ private boolean onRegionEpochNotMatch(BackOffer backOffer, List<Metapb.Region> c
}

if (needInvalidateOld) {
notifyRegionCacheInvalidate(recv.getRegion());
this.regionManager.onRegionStale(recv.getRegion());
}

Expand Down Expand Up @@ -271,8 +281,51 @@ public TiRegion getRegion() {
return recv.getRegion();
}

private void invalidateRegionStoreCache(TiRegion ctxRegion) {
private void notifyRegionRequestError(
TiRegion ctxRegion, long storeId, CacheInvalidateEvent.CacheType type) {
CacheInvalidateEvent event;
// When store(region) id is invalid,
// it implies that the error was not caused by store(region) error.
switch (type) {
case REGION:
case LEADER:
event = new CacheInvalidateEvent(ctxRegion.getId(), INVALID_STORE_ID, true, false, type);
break;
case REGION_STORE:
event = new CacheInvalidateEvent(ctxRegion.getId(), storeId, true, true, type);
break;
case REQ_FAILED:
event = new CacheInvalidateEvent(INVALID_REGION_ID, INVALID_STORE_ID, false, false, type);
break;
default:
throw new IllegalArgumentException("Unexpect invalid cache invalid type " + type);
}
if (cacheInvalidateCallBackList != null) {
for (Function<CacheInvalidateEvent, Void> cacheInvalidateCallBack :
cacheInvalidateCallBackList) {
callBackThreadPool.submit(
() -> {
try {
cacheInvalidateCallBack.apply(event);
} catch (Exception e) {
logger.error(String.format("CacheInvalidCallBack failed %s", e));
}
});
}
}
}

private void invalidateRegionStoreCache(TiRegion ctxRegion, long storeId) {
regionManager.invalidateRegion(ctxRegion);
regionManager.invalidateStore(ctxRegion.getLeader().getStoreId());
regionManager.invalidateStore(storeId);
notifyRegionRequestError(ctxRegion, storeId, CacheType.REGION_STORE);
}

private void notifyRegionCacheInvalidate(TiRegion ctxRegion) {
notifyRegionRequestError(ctxRegion, 0, CacheType.REGION);
}

private void notifyRegionLeaderError(TiRegion ctxRegion) {
notifyRegionRequestError(ctxRegion, 0, CacheType.LEADER);
}
}
54 changes: 46 additions & 8 deletions src/main/java/org/tikv/common/region/RegionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,18 @@
import io.prometheus.client.Histogram;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.ReadOnlyPDClient;
import org.tikv.common.TiConfiguration;
import org.tikv.common.event.CacheInvalidateEvent;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.InvalidStoreException;
import org.tikv.common.exception.TiClientInternalException;
Expand Down Expand Up @@ -69,10 +73,36 @@ public class RegionManager {
private final TiConfiguration conf;
private final ScheduledExecutorService executor;
private final StoreHealthyChecker storeChecker;
private final CopyOnWriteArrayList<Function<CacheInvalidateEvent, Void>>
cacheInvalidateCallbackList;
private final ExecutorService callBackThreadPool;
private AtomicInteger tiflashStoreIndex = new AtomicInteger(0);

public RegionManager(
TiConfiguration conf, ReadOnlyPDClient pdClient, ChannelFactory channelFactory) {
this(conf, pdClient, channelFactory, 1);
}

public RegionManager(TiConfiguration conf, ReadOnlyPDClient pdClient) {
this(conf, pdClient, 1);
}

public RegionManager(
TiConfiguration conf, ReadOnlyPDClient pdClient, int callBackExecutorThreadNum) {
this.cache = new RegionCache();
this.pdClient = pdClient;
this.conf = conf;
this.storeChecker = null;
this.executor = null;
this.cacheInvalidateCallbackList = new CopyOnWriteArrayList<>();
this.callBackThreadPool = Executors.newFixedThreadPool(callBackExecutorThreadNum);
}

public RegionManager(
TiConfiguration conf,
ReadOnlyPDClient pdClient,
ChannelFactory channelFactory,
int callBackExecutorThreadNum) {
this.cache = new RegionCache();
this.pdClient = pdClient;
this.conf = conf;
Expand All @@ -83,26 +113,34 @@ public RegionManager(
this.storeChecker = storeChecker;
this.executor = Executors.newScheduledThreadPool(1);
this.executor.scheduleAtFixedRate(storeChecker, period, period, TimeUnit.MILLISECONDS);
}

public RegionManager(TiConfiguration conf, ReadOnlyPDClient pdClient) {
this.cache = new RegionCache();
this.pdClient = pdClient;
this.conf = conf;
this.storeChecker = null;
this.executor = null;
this.cacheInvalidateCallbackList = new CopyOnWriteArrayList<>();
this.callBackThreadPool = Executors.newFixedThreadPool(callBackExecutorThreadNum);
}

public synchronized void close() {
if (this.executor != null) {
this.executor.shutdownNow();
}
this.callBackThreadPool.shutdownNow();
}

public ReadOnlyPDClient getPDClient() {
return this.pdClient;
}

public ExecutorService getCallBackThreadPool() {
return callBackThreadPool;
}

public List<Function<CacheInvalidateEvent, Void>> getCacheInvalidateCallbackList() {
return cacheInvalidateCallbackList;
}

public void addCacheInvalidateCallback(
Function<CacheInvalidateEvent, Void> cacheInvalidateCallback) {
this.cacheInvalidateCallbackList.add(cacheInvalidateCallback);
}

public void invalidateAll() {
cache.invalidateAll();
}
Expand Down
130 changes: 130 additions & 0 deletions src/test/java/org/tikv/common/CacheInvalidCallBackTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright 2022 TiKV Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.tikv.common;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import org.junit.Test;
import org.tikv.common.event.CacheInvalidateEvent;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
import org.tikv.common.region.TiStore;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.kvproto.Errorpb;
import org.tikv.kvproto.Errorpb.EpochNotMatch;
import org.tikv.kvproto.Errorpb.NotLeader;
import org.tikv.kvproto.Errorpb.StoreNotMatch;
import org.tikv.kvproto.Metapb;

public class CacheInvalidCallBackTest extends MockServerTest {

private RegionStoreClient createClient(
String version, Function<CacheInvalidateEvent, Void> cacheInvalidateCallBack) {
Metapb.Store meta =
Metapb.Store.newBuilder()
.setAddress(LOCAL_ADDR + ":" + port)
.setId(1)
.setState(Metapb.StoreState.Up)
.setVersion(version)
.build();
TiStore store = new TiStore(meta);

RegionManager manager = new RegionManager(session.getConf(), session.getPDClient());
manager.addCacheInvalidateCallback(cacheInvalidateCallBack);
RegionStoreClientBuilder builder =
new RegionStoreClientBuilder(
session.getConf(), session.getChannelFactory(), manager, session.getPDClient());

return builder.build(region, store);
}

@Test
public void testcacheInvalidCallBack() {
String version = "3.0.12";
CacheInvalidateCallBack cacheInvalidateCallBack = new CacheInvalidateCallBack();
doRawGetTest(createClient(version, cacheInvalidateCallBack), cacheInvalidateCallBack);
}

public void doRawGetTest(
RegionStoreClient client, CacheInvalidateCallBack cacheInvalidateCallBack) {
server.put("key1", "value1");
Optional<ByteString> value = client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("key1"));
assertEquals(ByteString.copyFromUtf8("value1"), value.get());
try {
server.putError(
"error1", () -> Errorpb.Error.newBuilder().setNotLeader(NotLeader.getDefaultInstance()));
client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("error1"));
fail();
} catch (Exception e) {
assertEquals(1, cacheInvalidateCallBack.cacheInvalidateEvents.size());
}
server.putError(
"failure",
() -> Errorpb.Error.newBuilder().setEpochNotMatch(EpochNotMatch.getDefaultInstance()));
try {
client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("failure"));
fail();
} catch (Exception e) {
sleep(1000);
assertEquals(2, cacheInvalidateCallBack.cacheInvalidateEvents.size());
}
server.putError(
"store_not_match",
() -> Errorpb.Error.newBuilder().setStoreNotMatch(StoreNotMatch.getDefaultInstance()));
try {
client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("failure"));
fail();
} catch (Exception e) {
sleep(1000);
assertEquals(3, cacheInvalidateCallBack.cacheInvalidateEvents.size());
}
server.clearAllMap();
client.close();
}

private void sleep(int time) {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
fail();
}
}

private BackOffer defaultBackOff() {
return ConcreteBackOffer.newCustomBackOff(1000);
}

static class CacheInvalidateCallBack implements Function<CacheInvalidateEvent, Void> {

public List<CacheInvalidateEvent> cacheInvalidateEvents = new ArrayList<>();

@Override
public Void apply(CacheInvalidateEvent cacheInvalidateEvent) {
cacheInvalidateEvents.add(cacheInvalidateEvent);
return null;
}
}
}