diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index 1cc3ec684a1..0838932ade6 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -175,6 +175,11 @@ public TiSession(TiConfiguration conf) { } this.client = PDClient.createRaw(conf, keyCodec, channelFactory); + if (conf.getApiVersion().isV2() && !StoreVersion.minTiKVVersion(Version.API_V2, client)) { + throw new IllegalStateException( + "With API v2, store versions should not older than " + Version.API_V2); + } + this.enableGrpcForward = conf.getEnableGrpcForward(); if (this.enableGrpcForward) { logger.info("enable grpc forward for high available"); diff --git a/src/main/java/org/tikv/common/Version.java b/src/main/java/org/tikv/common/Version.java index 06760340943..1c39580eed7 100644 --- a/src/main/java/org/tikv/common/Version.java +++ b/src/main/java/org/tikv/common/Version.java @@ -25,4 +25,6 @@ public class Version { public static final String RESOLVE_LOCK_V4 = "4.0.0"; public static final String BATCH_WRITE = "3.0.14"; + + public static final String API_V2 = "6.1.0"; } diff --git a/src/test/java/org/tikv/common/ApiVersionTest.java b/src/test/java/org/tikv/common/ApiVersionTest.java index b6ab167f77f..830cc43ac79 100644 --- a/src/test/java/org/tikv/common/ApiVersionTest.java +++ b/src/test/java/org/tikv/common/ApiVersionTest.java @@ -150,4 +150,15 @@ public void testAccessV1ClusterWithTtl() throws InterruptedException { Assert.assertNotNull(e); } } + + @Test + public void testAccessOldVersionClusterWithV2() { + Assume.assumeFalse(minTiKVVersion("6.1.0")); + + try (RawKVClient client = createRawClient(ApiVersion.V2)) { + Assert.fail("Should not create V2 client while store version is less than 6.1.0"); + } catch (Exception e) { + Assert.assertNotNull(e); + } + } } diff --git a/src/test/java/org/tikv/common/MockThreeStoresTest.java b/src/test/java/org/tikv/common/MockThreeStoresTest.java index 43063be7d17..a88ecec8eb8 100644 --- a/src/test/java/org/tikv/common/MockThreeStoresTest.java +++ b/src/test/java/org/tikv/common/MockThreeStoresTest.java @@ -30,6 +30,7 @@ import org.tikv.common.region.TiStore; import org.tikv.kvproto.Metapb; import org.tikv.kvproto.Pdpb; +import org.tikv.kvproto.Pdpb.GetAllStoresResponse; public class MockThreeStoresTest extends PDMockServerTest { @@ -68,17 +69,17 @@ public void setup() throws IOException { ImmutableList.of( Metapb.Store.newBuilder() .setAddress("127.0.0.1:" + ports[0]) - .setVersion("5.0.0") + .setVersion(Version.API_V2) .setId(0x1) .build(), Metapb.Store.newBuilder() .setAddress("127.0.0.1:" + ports[1]) - .setVersion("5.0.0") + .setVersion(Version.API_V2) .setId(0x2) .build(), Metapb.Store.newBuilder() .setAddress("127.0.0.1:" + ports[2]) - .setVersion("5.0.0") + .setVersion(Version.API_V2) .setId(0x3) .build()); @@ -94,6 +95,10 @@ public void setup() throws IOException { int i = (int) request.getStoreId() - 1; return Pdpb.GetStoreResponse.newBuilder().setStore(stores.get(i)).build(); }); + server.addGetAllStoresListener( + request -> { + return GetAllStoresResponse.newBuilder().addAllStores(stores).build(); + }); } this.region = diff --git a/src/test/java/org/tikv/common/PDClientV2MockTest.java b/src/test/java/org/tikv/common/PDClientV2MockTest.java index 1a5278cb6ef..305cd101c38 100644 --- a/src/test/java/org/tikv/common/PDClientV2MockTest.java +++ b/src/test/java/org/tikv/common/PDClientV2MockTest.java @@ -17,6 +17,7 @@ package org.tikv.common; +import com.google.common.collect.ImmutableList; import com.google.protobuf.ByteString; import java.util.List; import org.junit.Assert; @@ -27,7 +28,10 @@ import org.tikv.common.util.ConcreteBackOffer; import org.tikv.common.util.Pair; import org.tikv.kvproto.Metapb; +import org.tikv.kvproto.Metapb.Store; +import org.tikv.kvproto.Metapb.StoreState; import org.tikv.kvproto.Pdpb; +import org.tikv.kvproto.Pdpb.GetAllStoresResponse; import org.tikv.kvproto.Pdpb.GetRegionResponse; import org.tikv.kvproto.Pdpb.Region; import org.tikv.kvproto.Pdpb.ScanRegionsResponse; @@ -35,6 +39,18 @@ public class PDClientV2MockTest extends PDMockServerTest { @Before public void init() throws Exception { + leader.addGetAllStoresListener( + request -> { + return GetAllStoresResponse.newBuilder() + .addAllStores( + ImmutableList.of( + Store.newBuilder() + .setId(0x1) + .setState(StoreState.Up) + .setVersion(Version.API_V2) + .build())) + .build(); + }); upgradeToV2Cluster(); } diff --git a/src/test/java/org/tikv/common/PDMockServer.java b/src/test/java/org/tikv/common/PDMockServer.java index c081eb382c0..723034f1e33 100644 --- a/src/test/java/org/tikv/common/PDMockServer.java +++ b/src/test/java/org/tikv/common/PDMockServer.java @@ -30,6 +30,8 @@ import java.util.Optional; import java.util.function.Function; import org.tikv.kvproto.PDGrpc; +import org.tikv.kvproto.Pdpb.GetAllStoresRequest; +import org.tikv.kvproto.Pdpb.GetAllStoresResponse; import org.tikv.kvproto.Pdpb.GetMembersRequest; import org.tikv.kvproto.Pdpb.GetMembersResponse; import org.tikv.kvproto.Pdpb.GetRegionByIDRequest; @@ -54,6 +56,8 @@ public class PDMockServer extends PDGrpc.PDImplBase { private Function scanRegionsListener; + private Function getAllStoresListener; + public void addGetMembersListener(Function func) { getMembersListener = func; } @@ -144,6 +148,20 @@ public void scanRegions(ScanRegionsRequest request, StreamObserver func) { + getAllStoresListener = func; + } + + @Override + public void getAllStores(GetAllStoresRequest request, StreamObserver resp) { + try { + resp.onNext(Optional.ofNullable(getAllStoresListener.apply(request)).get()); + resp.onCompleted(); + } catch (Exception e) { + resp.onError(Status.INTERNAL.asRuntimeException()); + } + } + public void start(long clusterId) throws IOException { int port; try (ServerSocket s = new ServerSocket(0)) {