Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions src/main/java/org/tikv/common/AbstractGRPCClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.grpc.health.v1.HealthCheckRequest;
import io.grpc.health.v1.HealthCheckResponse;
import io.grpc.health.v1.HealthGrpc;
import io.grpc.stub.AbstractFutureStub;
import io.grpc.stub.AbstractStub;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.StreamObserver;
Expand All @@ -38,14 +39,15 @@
import org.tikv.common.util.ChannelFactory;

public abstract class AbstractGRPCClient<
BlockingStubT extends AbstractStub<BlockingStubT>, StubT extends AbstractStub<StubT>>
BlockingStubT extends AbstractStub<BlockingStubT>,
FutureStubT extends AbstractFutureStub<FutureStubT>>
implements AutoCloseable {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
protected final ChannelFactory channelFactory;
protected TiConfiguration conf;
protected long timeout;
protected BlockingStubT blockingStub;
protected StubT asyncStub;
protected FutureStubT asyncStub;

protected AbstractGRPCClient(TiConfiguration conf, ChannelFactory channelFactory) {
this.conf = conf;
Expand All @@ -57,7 +59,7 @@ protected AbstractGRPCClient(
TiConfiguration conf,
ChannelFactory channelFactory,
BlockingStubT blockingStub,
StubT asyncStub) {
FutureStubT asyncStub) {
this.conf = conf;
this.timeout = conf.getTimeout();
this.channelFactory = channelFactory;
Expand Down Expand Up @@ -109,7 +111,7 @@ protected <ReqT, RespT> void callAsyncWithRetry(
.create(handler)
.callWithRetry(
() -> {
StubT stub = getAsyncStub();
FutureStubT stub = getAsyncStub();
ClientCalls.asyncUnaryCall(
stub.getChannel().newCall(method, stub.getCallOptions()),
requestFactory.get(),
Expand All @@ -133,7 +135,7 @@ <ReqT, RespT> StreamObserver<ReqT> callBidiStreamingWithRetry(
.create(handler)
.callWithRetry(
() -> {
StubT stub = getAsyncStub();
FutureStubT stub = getAsyncStub();
return asyncBidiStreamingCall(
stub.getChannel().newCall(method, stub.getCallOptions()), responseObserver);
},
Expand Down Expand Up @@ -175,7 +177,7 @@ public long getTimeout() {

protected abstract BlockingStubT getBlockingStub();

protected abstract StubT getAsyncStub();
protected abstract FutureStubT getAsyncStub();

protected boolean checkHealth(String addressStr, HostMapping hostMapping) {
ManagedChannel channel = channelFactory.getChannel(addressStr, hostMapping);
Expand Down
14 changes: 7 additions & 7 deletions src/main/java/org/tikv/common/PDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
import org.tikv.kvproto.Metapb.Store;
import org.tikv.kvproto.PDGrpc;
import org.tikv.kvproto.PDGrpc.PDBlockingStub;
import org.tikv.kvproto.PDGrpc.PDStub;
import org.tikv.kvproto.PDGrpc.PDFutureStub;
import org.tikv.kvproto.Pdpb;
import org.tikv.kvproto.Pdpb.Error;
import org.tikv.kvproto.Pdpb.ErrorType;
Expand All @@ -92,7 +92,7 @@
import org.tikv.kvproto.Pdpb.TsoRequest;
import org.tikv.kvproto.Pdpb.TsoResponse;

public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDFutureStub>
implements ReadOnlyPDClient {
private static final String TIFLASH_TABLE_SYNC_PROGRESS_PATH = "/tiflash/table/sync";
private static final long MIN_TRY_UPDATE_DURATION = 50;
Expand Down Expand Up @@ -550,7 +550,7 @@ protected PDBlockingStub getBlockingStub() {
}

@Override
protected PDStub getAsyncStub() {
protected PDFutureStub getAsyncStub() {
if (pdClientWrapper == null) {
throw new GrpcException("PDClient may not be initialized");
}
Expand Down Expand Up @@ -644,7 +644,7 @@ private void initCluster() {
static class PDClientWrapper {
private final String leaderInfo;
private final PDBlockingStub blockingStub;
private final PDStub asyncStub;
private final PDFutureStub asyncStub;
private final long createTime;
private final String storeAddress;

Expand All @@ -655,10 +655,10 @@ static class PDClientWrapper {
header.put(TiConfiguration.PD_FORWARD_META_DATA_KEY, addrToUri(leaderInfo).toString());
this.blockingStub =
MetadataUtils.attachHeaders(PDGrpc.newBlockingStub(clientChannel), header);
this.asyncStub = MetadataUtils.attachHeaders(PDGrpc.newStub(clientChannel), header);
this.asyncStub = MetadataUtils.attachHeaders(PDGrpc.newFutureStub(clientChannel), header);
} else {
this.blockingStub = PDGrpc.newBlockingStub(clientChannel);
this.asyncStub = PDGrpc.newStub(clientChannel);
this.asyncStub = PDGrpc.newFutureStub(clientChannel);
}
this.leaderInfo = leaderInfo;
this.storeAddress = storeAddress;
Expand All @@ -677,7 +677,7 @@ PDBlockingStub getBlockingStub() {
return blockingStub;
}

PDStub getAsyncStub() {
PDFutureStub getAsyncStub() {
return asyncStub;
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/tikv/common/TiConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ public class TiConfiguration implements Serializable {

private static final Logger logger = LoggerFactory.getLogger(TiConfiguration.class);
private static final ConcurrentHashMap<String, String> settings = new ConcurrentHashMap<>();
public static final Metadata.Key FORWARD_META_DATA_KEY =
public static final Metadata.Key<String> FORWARD_META_DATA_KEY =
Metadata.Key.of("tikv-forwarded-host", Metadata.ASCII_STRING_MARSHALLER);
public static final Metadata.Key PD_FORWARD_META_DATA_KEY =
public static final Metadata.Key<String> PD_FORWARD_META_DATA_KEY =
Metadata.Key.of("pd-forwarded-host", Metadata.ASCII_STRING_MARSHALLER);

static {
Expand Down
2 changes: 0 additions & 2 deletions src/main/java/org/tikv/common/operation/KVErrorHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,6 @@ public boolean handleResponseError(BackOffer backOffer, RespT resp) {
Errorpb.Error error = regionHandler.getRegionError(resp);
if (error != null) {
return regionHandler.handleRegionError(backOffer, error);
} else {
regionHandler.tryUpdateRegionStore();
}

// Key error handling logic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,10 @@ public boolean handleResponseError(BackOffer backOffer, RespT resp) {
Errorpb.Error error = getRegionError(resp);
if (error != null) {
return handleRegionError(backOffer, error);
} else {
tryUpdateRegionStore();
}
return false;
}

public void tryUpdateRegionStore() {
recv.tryUpdateRegionStore();
}

public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) {
if (error.hasNotLeader()) {
// this error is reported from raftstore:
Expand Down
Loading