diff --git a/dev/proto.sh b/dev/proto.sh index 71e8c125530..a988ce8cd51 100755 --- a/dev/proto.sh +++ b/dev/proto.sh @@ -21,7 +21,7 @@ if [ -d $proto_dir ]; then fi repos=("https://github.com/pingcap/kvproto" "https://github.com/pingcap/raft-rs" "https://github.com/pingcap/tipb") -commits=(58f2ac94aa38f49676dd628fbcc1d669a77a62ac b9891b673573fad77ebcf9bbe0969cf945841926 c4d518eb1d60c21f05b028b36729e64610346dac) +commits=(3056ca36e6f2a71a9fc7ba7135e6b119fd977553 b9891b673573fad77ebcf9bbe0969cf945841926 c4d518eb1d60c21f05b028b36729e64610346dac) for i in "${!repos[@]}"; do repo_name=$(basename ${repos[$i]}) diff --git a/src/main/java/org/tikv/common/ConfigUtils.java b/src/main/java/org/tikv/common/ConfigUtils.java index dd5d1d28bbb..4697db8140a 100644 --- a/src/main/java/org/tikv/common/ConfigUtils.java +++ b/src/main/java/org/tikv/common/ConfigUtils.java @@ -121,6 +121,9 @@ public class ConfigUtils { public static final String TIFLASH_ENABLE = "tiflash.enable"; public static final String TIKV_WARM_UP_ENABLE = "tikv.warm_up.enable"; + + public static final String TIKV_API_VERSION = "tikv.api_version"; + public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379"; public static final String DEF_TIMEOUT = "200ms"; public static final String DEF_TIKV_GRPC_INGEST_TIMEOUT = "200s"; @@ -204,4 +207,6 @@ public class ConfigUtils { public static final int DEF_TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT = 10; public static final int DEF_TIKV_SCAN_REGIONS_LIMIT = 1000; + + public static final int DEF_TIKV_API_VERSION = 1; } diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index c6263b2c9fa..cfafbff4550 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -62,9 +62,7 @@ import org.apache.http.impl.client.HttpClients; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.tikv.common.codec.Codec.BytesCodec; -import org.tikv.common.codec.CodecDataInput; -import org.tikv.common.codec.CodecDataOutput; +import org.tikv.common.apiversion.RequestKeyCodec; import org.tikv.common.codec.KeyUtils; import org.tikv.common.exception.GrpcException; import org.tikv.common.exception.TiClientInternalException; @@ -110,7 +108,9 @@ public class PDClient extends AbstractGRPCClient private static final long MIN_TRY_UPDATE_DURATION = 50; private static final int PAUSE_CHECKER_TIMEOUT = 300; // in seconds private static final int KEEP_CHECKER_PAUSE_PERIOD = PAUSE_CHECKER_TIMEOUT / 5; // in seconds - private final Logger logger = LoggerFactory.getLogger(PDClient.class); + private static final Logger logger = LoggerFactory.getLogger(PDClient.class); + + private final RequestKeyCodec codec; private RequestHeader header; private TsoRequest tsoReq; private volatile PDClientWrapper pdClientWrapper; @@ -130,19 +130,22 @@ public class PDClient extends AbstractGRPCClient .labelNames("cluster") .register(); - private PDClient(TiConfiguration conf, ChannelFactory channelFactory) { + private PDClient(TiConfiguration conf, RequestKeyCodec codec, ChannelFactory channelFactory) { super(conf, channelFactory); initCluster(); + this.codec = codec; this.blockingStub = getBlockingStub(); this.asyncStub = getAsyncStub(); } - public static ReadOnlyPDClient create(TiConfiguration conf, ChannelFactory channelFactory) { - return createRaw(conf, channelFactory); + public static ReadOnlyPDClient create( + TiConfiguration conf, RequestKeyCodec codec, ChannelFactory channelFactory) { + return createRaw(conf, codec, channelFactory); } - static PDClient createRaw(TiConfiguration conf, ChannelFactory channelFactory) { - return new PDClient(conf, channelFactory); + static PDClient createRaw( + TiConfiguration conf, RequestKeyCodec codec, ChannelFactory channelFactory) { + return new PDClient(conf, codec, channelFactory); } public HostMapping getHostMapping() { @@ -313,22 +316,19 @@ public Pair getRegionByKey(BackOffer backOffer, Byte Histogram.Timer requestTimer = PD_GET_REGION_BY_KEY_REQUEST_LATENCY.labels(getClusterId().toString()).startTimer(); try { - if (conf.isTxnKVMode()) { - CodecDataOutput cdo = new CodecDataOutput(); - BytesCodec.writeBytes(cdo, key.toByteArray()); - key = cdo.toByteString(); - } - ByteString queryKey = key; - Supplier request = - () -> GetRegionRequest.newBuilder().setHeader(header).setRegionKey(queryKey).build(); + () -> + GetRegionRequest.newBuilder() + .setHeader(header) + .setRegionKey(codec.encodePdQuery(key)) + .build(); PDErrorHandler handler = new PDErrorHandler<>(getRegionResponseErrorExtractor, this); GetRegionResponse resp = callWithRetry(backOffer, PDGrpc.getGetRegionMethod(), request, handler); - return new Pair(decodeRegion(resp.getRegion()), resp.getLeader()); + return new Pair<>(codec.decodeRegion(resp.getRegion()), resp.getLeader()); } finally { requestTimer.observeDuration(); } @@ -343,7 +343,8 @@ public Pair getRegionByID(BackOffer backOffer, long GetRegionResponse resp = callWithRetry(backOffer, PDGrpc.getGetRegionByIDMethod(), request, handler); - return new Pair(decodeRegion(resp.getRegion()), resp.getLeader()); + return new Pair( + codec.decodeRegion(resp.getRegion()), resp.getLeader()); } @Override @@ -353,18 +354,20 @@ public List scanRegions( // introduce a warm-up timeout for ScanRegions requests PDGrpc.PDBlockingStub stub = getBlockingStub().withDeadlineAfter(conf.getWarmUpTimeout(), TimeUnit.MILLISECONDS); + Pair range = codec.encodePdQueryRange(startKey, endKey); Pdpb.ScanRegionsRequest request = Pdpb.ScanRegionsRequest.newBuilder() .setHeader(header) - .setStartKey(startKey) - .setEndKey(endKey) + .setStartKey(range.first) + .setEndKey(range.second) .setLimit(limit) .build(); Pdpb.ScanRegionsResponse resp = stub.scanRegions(request); if (resp == null) { return null; } - return resp.getRegionsList(); + + return codec.decodePdRegions(resp.getRegionsList()); } private Supplier buildGetStoreReq(long storeId) { @@ -811,49 +814,6 @@ public String toString() { } } - private Metapb.Region decodeRegion(Metapb.Region region) { - final boolean isRawRegion = conf.isRawKVMode(); - Metapb.Region.Builder builder = - Metapb.Region.newBuilder() - .setId(region.getId()) - .setRegionEpoch(region.getRegionEpoch()) - .addAllPeers(region.getPeersList()); - - if (region.getStartKey().isEmpty() || isRawRegion) { - builder.setStartKey(region.getStartKey()); - } else { - if (!conf.isTest()) { - byte[] decodedStartKey = BytesCodec.readBytes(new CodecDataInput(region.getStartKey())); - builder.setStartKey(ByteString.copyFrom(decodedStartKey)); - } else { - try { - byte[] decodedStartKey = BytesCodec.readBytes(new CodecDataInput(region.getStartKey())); - builder.setStartKey(ByteString.copyFrom(decodedStartKey)); - } catch (Exception e) { - builder.setStartKey(region.getStartKey()); - } - } - } - - if (region.getEndKey().isEmpty() || isRawRegion) { - builder.setEndKey(region.getEndKey()); - } else { - if (!conf.isTest()) { - byte[] decodedEndKey = BytesCodec.readBytes(new CodecDataInput(region.getEndKey())); - builder.setEndKey(ByteString.copyFrom(decodedEndKey)); - } else { - try { - byte[] decodedEndKey = BytesCodec.readBytes(new CodecDataInput(region.getEndKey())); - builder.setEndKey(ByteString.copyFrom(decodedEndKey)); - } catch (Exception e) { - builder.setEndKey(region.getEndKey()); - } - } - } - - return builder.build(); - } - public Long getClusterId() { return header.getClusterId(); } @@ -861,4 +821,8 @@ public Long getClusterId() { public List getPdAddrs() { return pdAddrs; } + + public RequestKeyCodec getCodec() { + return codec; + } } diff --git a/src/main/java/org/tikv/common/ReadOnlyPDClient.java b/src/main/java/org/tikv/common/ReadOnlyPDClient.java index d5a7c3aea87..ddf1855e614 100644 --- a/src/main/java/org/tikv/common/ReadOnlyPDClient.java +++ b/src/main/java/org/tikv/common/ReadOnlyPDClient.java @@ -19,6 +19,7 @@ import com.google.protobuf.ByteString; import java.util.List; +import org.tikv.common.apiversion.RequestKeyCodec; import org.tikv.common.meta.TiTimestamp; import org.tikv.common.util.BackOffer; import org.tikv.common.util.Pair; @@ -69,4 +70,6 @@ List scanRegions( TiConfiguration.ReplicaRead getReplicaRead(); Long getClusterId(); + + RequestKeyCodec getCodec(); } diff --git a/src/main/java/org/tikv/common/StoreVersion.java b/src/main/java/org/tikv/common/StoreVersion.java index c15b5695398..a86e7b0f577 100644 --- a/src/main/java/org/tikv/common/StoreVersion.java +++ b/src/main/java/org/tikv/common/StoreVersion.java @@ -27,7 +27,6 @@ import org.tikv.kvproto.Metapb; public class StoreVersion { - private static final int SCALE = 10000; private final Logger logger = LoggerFactory.getLogger(this.getClass()); private int v0 = 9999; diff --git a/src/main/java/org/tikv/common/TiConfiguration.java b/src/main/java/org/tikv/common/TiConfiguration.java index cae0ed40c5e..9f6bb097d59 100644 --- a/src/main/java/org/tikv/common/TiConfiguration.java +++ b/src/main/java/org/tikv/common/TiConfiguration.java @@ -40,6 +40,7 @@ import static org.tikv.common.ConfigUtils.DEF_SHOW_ROWID; import static org.tikv.common.ConfigUtils.DEF_TABLE_SCAN_CONCURRENCY; import static org.tikv.common.ConfigUtils.DEF_TIFLASH_ENABLE; +import static org.tikv.common.ConfigUtils.DEF_TIKV_API_VERSION; import static org.tikv.common.ConfigUtils.DEF_TIKV_BO_REGION_MISS_BASE_IN_MS; import static org.tikv.common.ConfigUtils.DEF_TIKV_CONN_RECYCLE_TIME; import static org.tikv.common.ConfigUtils.DEF_TIKV_ENABLE_ATOMIC_FOR_CAS; @@ -82,6 +83,7 @@ import static org.tikv.common.ConfigUtils.READ_COMMITTED_ISOLATION_LEVEL; import static org.tikv.common.ConfigUtils.SNAPSHOT_ISOLATION_LEVEL; import static org.tikv.common.ConfigUtils.TIFLASH_ENABLE; +import static org.tikv.common.ConfigUtils.TIKV_API_VERSION; import static org.tikv.common.ConfigUtils.TIKV_BATCH_DELETE_CONCURRENCY; import static org.tikv.common.ConfigUtils.TIKV_BATCH_GET_CONCURRENCY; import static org.tikv.common.ConfigUtils.TIKV_BATCH_PUT_CONCURRENCY; @@ -154,6 +156,7 @@ import static org.tikv.common.ConfigUtils.TiKV_CIRCUIT_BREAK_ENABLE; import static org.tikv.common.ConfigUtils.TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS; +import com.google.protobuf.ByteString; import io.grpc.Metadata; import java.io.IOException; import java.io.InputStream; @@ -173,17 +176,19 @@ import org.slf4j.LoggerFactory; import org.tikv.common.pd.PDUtils; import org.tikv.common.replica.ReplicaSelector; +import org.tikv.kvproto.Kvrpcpb; import org.tikv.kvproto.Kvrpcpb.CommandPri; import org.tikv.kvproto.Kvrpcpb.IsolationLevel; public class TiConfiguration implements Serializable { - private static final Logger logger = LoggerFactory.getLogger(TiConfiguration.class); private static final ConcurrentHashMap settings = new ConcurrentHashMap<>(); public static final Metadata.Key FORWARD_META_DATA_KEY = Metadata.Key.of("tikv-forwarded-host", Metadata.ASCII_STRING_MARSHALLER); public static final Metadata.Key PD_FORWARD_META_DATA_KEY = Metadata.Key.of("pd-forwarded-host", Metadata.ASCII_STRING_MARSHALLER); + public static final ByteString API_V2_RAW_PREFIX = ByteString.copyFromUtf8("r"); + public static final ByteString API_V2_TXN_PREFIX = ByteString.copyFromUtf8("x"); static { // priority: system environment > config file > default @@ -296,6 +301,8 @@ private static void loadFromDefaultProperties() { setIfMissing( TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT, DEF_TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT); setIfMissing(TIKV_SCAN_REGIONS_LIMIT, DEF_TIKV_SCAN_REGIONS_LIMIT); + + setIfMissing(TIKV_API_VERSION, DEF_TIKV_API_VERSION); } public static void listAll() { @@ -551,6 +558,8 @@ private static ReplicaRead getReplicaRead(String key) { private int scanRegionsLimit = getInt(TIKV_SCAN_REGIONS_LIMIT); + private ApiVersion apiVersion = ApiVersion.fromInt(getInt(TIKV_API_VERSION)); + public enum KVMode { TXN, RAW @@ -1241,4 +1250,48 @@ public int getScanRegionsLimit() { public void setScanRegionsLimit(int scanRegionsLimit) { this.scanRegionsLimit = scanRegionsLimit; } + + public ApiVersion getApiVersion() { + return apiVersion; + } + + public TiConfiguration setApiVersion(ApiVersion version) { + this.apiVersion = version; + return this; + } + + public enum ApiVersion { + V1, + V2; + + public static ApiVersion fromInt(int version) { + switch (version) { + case 1: + return V1; + case 2: + return V2; + default: + throw new IllegalArgumentException("unknown api version " + version); + } + } + + public boolean isV1() { + return this == V1; + } + + public boolean isV2() { + return this == V2; + } + + public Kvrpcpb.APIVersion toPb() { + switch (this) { + case V1: + return Kvrpcpb.APIVersion.V1; + case V2: + return Kvrpcpb.APIVersion.V2; + default: + throw new IllegalArgumentException("unknown api version " + this); + } + } + } } diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index 845e0ea44b6..1cc3ec684a1 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -33,6 +33,11 @@ import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.tikv.common.apiversion.RequestKeyCodec; +import org.tikv.common.apiversion.RequestKeyV1RawCodec; +import org.tikv.common.apiversion.RequestKeyV1TxnCodec; +import org.tikv.common.apiversion.RequestKeyV2RawCodec; +import org.tikv.common.apiversion.RequestKeyV2TxnCodec; import org.tikv.common.catalog.Catalog; import org.tikv.common.exception.TiKVException; import org.tikv.common.importer.ImporterStoreClient; @@ -67,10 +72,10 @@ * contention */ public class TiSession implements AutoCloseable { - private static final Logger logger = LoggerFactory.getLogger(TiSession.class); private static final Map sessionCachedMap = new HashMap<>(); private final TiConfiguration conf; + private final RequestKeyCodec keyCodec; private final ChannelFactory channelFactory; // below object creation is either heavy or making connection (pd), pending for lazy loading private volatile PDClient client; @@ -118,6 +123,21 @@ public TiSession(TiConfiguration conf) { this.metricsServer = MetricsServer.getInstance(conf); this.conf = conf; + + if (conf.getApiVersion().isV1()) { + if (conf.isRawKVMode()) { + keyCodec = new RequestKeyV1RawCodec(); + } else { + keyCodec = new RequestKeyV1TxnCodec(); + } + } else { + if (conf.isRawKVMode()) { + keyCodec = new RequestKeyV2RawCodec(); + } else { + keyCodec = new RequestKeyV2TxnCodec(); + } + } + if (conf.isTlsEnable()) { if (conf.isJksEnable()) { this.channelFactory = @@ -154,7 +174,7 @@ public TiSession(TiConfiguration conf) { conf.getIdleTimeout()); } - this.client = PDClient.createRaw(conf, channelFactory); + this.client = PDClient.createRaw(conf, keyCodec, channelFactory); this.enableGrpcForward = conf.getEnableGrpcForward(); if (this.enableGrpcForward) { logger.info("enable grpc forward for high available"); @@ -163,7 +183,11 @@ public TiSession(TiConfiguration conf) { warmUp(); } this.circuitBreaker = new CircuitBreakerImpl(conf, client.getClusterId()); - logger.info("TiSession initialized in " + conf.getKvMode() + " mode"); + logger.info( + "TiSession initialized in " + + conf.getKvMode() + + " mode in API version: " + + conf.getApiVersion()); } private static VersionInfo getVersionInfo() { @@ -181,16 +205,14 @@ private static VersionInfo getVersionInfo() { return info; } - private synchronized void warmUp() { + @VisibleForTesting + public synchronized void warmUp() { long warmUpStartTime = System.nanoTime(); BackOffer backOffer = ConcreteBackOffer.newRawKVBackOff(getPDClient().getClusterId()); try { // let JVM ClassLoader load gRPC error related classes // this operation may cost 100ms - Errorpb.Error.newBuilder() - .setNotLeader(Errorpb.NotLeader.newBuilder().build()) - .build() - .toString(); + Errorpb.Error.newBuilder().setNotLeader(Errorpb.NotLeader.newBuilder().build()).build(); this.client = getPDClient(); this.regionManager = getRegionManager(); @@ -356,7 +378,7 @@ public PDClient getPDClient() { if (res == null) { synchronized (this) { if (client == null) { - client = PDClient.createRaw(this.getConf(), channelFactory); + client = PDClient.createRaw(this.getConf(), keyCodec, channelFactory); } res = client; } diff --git a/src/main/java/org/tikv/common/apiversion/CodecUtils.java b/src/main/java/org/tikv/common/apiversion/CodecUtils.java new file mode 100644 index 00000000000..1c6cfea9fa1 --- /dev/null +++ b/src/main/java/org/tikv/common/apiversion/CodecUtils.java @@ -0,0 +1,36 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common.apiversion; + +import com.google.protobuf.ByteString; +import org.tikv.common.codec.Codec.BytesCodec; +import org.tikv.common.codec.CodecDataInput; +import org.tikv.common.codec.CodecDataOutput; + +// TODO(iosmanthus): use ByteString.wrap to avoid once more copying. +class CodecUtils { + public static ByteString encode(ByteString key) { + CodecDataOutput cdo = new CodecDataOutput(); + BytesCodec.writeBytes(cdo, key.toByteArray()); + return cdo.toByteString(); + } + + public static ByteString decode(ByteString key) { + return ByteString.copyFrom(BytesCodec.readBytes(new CodecDataInput(key))); + } +} diff --git a/src/main/java/org/tikv/common/apiversion/RequestKeyCodec.java b/src/main/java/org/tikv/common/apiversion/RequestKeyCodec.java new file mode 100644 index 00000000000..b70e660cd53 --- /dev/null +++ b/src/main/java/org/tikv/common/apiversion/RequestKeyCodec.java @@ -0,0 +1,72 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common.apiversion; + +import com.google.protobuf.ByteString; +import java.util.List; +import java.util.stream.Collectors; +import org.tikv.common.util.Pair; +import org.tikv.kvproto.Kvrpcpb.KvPair; +import org.tikv.kvproto.Kvrpcpb.Mutation; +import org.tikv.kvproto.Metapb; +import org.tikv.kvproto.Pdpb; + +public interface RequestKeyCodec { + ByteString encodeKey(ByteString key); + + default List encodeKeys(List keys) { + return keys.stream().map(this::encodeKey).collect(Collectors.toList()); + } + + default List encodeMutations(List mutations) { + return mutations + .stream() + .map(mut -> Mutation.newBuilder().mergeFrom(mut).setKey(encodeKey(mut.getKey())).build()) + .collect(Collectors.toList()); + } + + ByteString decodeKey(ByteString key); + + default KvPair decodeKvPair(KvPair pair) { + return KvPair.newBuilder().mergeFrom(pair).setKey(decodeKey(pair.getKey())).build(); + } + + default List decodeKvPairs(List pairs) { + return pairs.stream().map(this::decodeKvPair).collect(Collectors.toList()); + } + + Pair encodeRange(ByteString start, ByteString end); + + ByteString encodePdQuery(ByteString key); + + Pair encodePdQueryRange(ByteString start, ByteString end); + + Metapb.Region decodeRegion(Metapb.Region region); + + default List decodePdRegions(List regions) { + return regions + .stream() + .map( + r -> + Pdpb.Region.newBuilder() + .mergeFrom(r) + .setRegion(this.decodeRegion(r.getRegion())) + .build()) + .collect(Collectors.toList()); + } +} diff --git a/src/main/java/org/tikv/common/apiversion/RequestKeyV1Codec.java b/src/main/java/org/tikv/common/apiversion/RequestKeyV1Codec.java new file mode 100644 index 00000000000..9d9a92c6859 --- /dev/null +++ b/src/main/java/org/tikv/common/apiversion/RequestKeyV1Codec.java @@ -0,0 +1,83 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common.apiversion; + +import com.google.protobuf.ByteString; +import java.util.List; +import org.tikv.common.util.Pair; +import org.tikv.kvproto.Kvrpcpb.KvPair; +import org.tikv.kvproto.Kvrpcpb.Mutation; +import org.tikv.kvproto.Metapb.Region; +import org.tikv.kvproto.Pdpb; + +public class RequestKeyV1Codec implements RequestKeyCodec { + @Override + public ByteString encodeKey(ByteString key) { + return key; + } + + @Override + public List encodeKeys(List keys) { + return keys; + } + + @Override + public List encodeMutations(List mutations) { + return mutations; + } + + @Override + public ByteString decodeKey(ByteString key) { + return key; + } + + @Override + public KvPair decodeKvPair(KvPair pair) { + return pair; + } + + @Override + public List decodeKvPairs(List pairs) { + return pairs; + } + + @Override + public Pair encodeRange(ByteString start, ByteString end) { + return Pair.create(start, end); + } + + @Override + public ByteString encodePdQuery(ByteString key) { + return key; + } + + @Override + public Pair encodePdQueryRange(ByteString start, ByteString end) { + return Pair.create(start, end); + } + + @Override + public Region decodeRegion(Region region) { + return region; + } + + @Override + public List decodePdRegions(List regions) { + return regions; + } +} diff --git a/src/main/java/org/tikv/common/apiversion/RequestKeyV1RawCodec.java b/src/main/java/org/tikv/common/apiversion/RequestKeyV1RawCodec.java new file mode 100644 index 00000000000..a72457d0bbe --- /dev/null +++ b/src/main/java/org/tikv/common/apiversion/RequestKeyV1RawCodec.java @@ -0,0 +1,22 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common.apiversion; + +public class RequestKeyV1RawCodec extends RequestKeyV1Codec implements RequestKeyCodec { + public RequestKeyV1RawCodec() {} +} diff --git a/src/main/java/org/tikv/common/apiversion/RequestKeyV1TxnCodec.java b/src/main/java/org/tikv/common/apiversion/RequestKeyV1TxnCodec.java new file mode 100644 index 00000000000..ea3949ddb0c --- /dev/null +++ b/src/main/java/org/tikv/common/apiversion/RequestKeyV1TxnCodec.java @@ -0,0 +1,61 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common.apiversion; + +import com.google.protobuf.ByteString; +import org.tikv.common.util.Pair; +import org.tikv.kvproto.Metapb; + +public class RequestKeyV1TxnCodec extends RequestKeyV1Codec implements RequestKeyCodec { + public RequestKeyV1TxnCodec() {} + + @Override + public ByteString encodePdQuery(ByteString key) { + return CodecUtils.encode(key); + } + + @Override + public Pair encodePdQueryRange(ByteString start, ByteString end) { + if (!start.isEmpty()) { + start = CodecUtils.encode(start); + } + + if (!end.isEmpty()) { + end = CodecUtils.encode(end); + } + + return Pair.create(start, end); + } + + @Override + public Metapb.Region decodeRegion(Metapb.Region region) { + Metapb.Region.Builder builder = Metapb.Region.newBuilder().mergeFrom(region); + ByteString start = region.getStartKey(); + ByteString end = region.getEndKey(); + + if (!start.isEmpty()) { + start = CodecUtils.decode(start); + } + + if (!end.isEmpty()) { + end = CodecUtils.decode(end); + } + + return builder.setStartKey(start).setEndKey(end).build(); + } +} diff --git a/src/main/java/org/tikv/common/apiversion/RequestKeyV2Codec.java b/src/main/java/org/tikv/common/apiversion/RequestKeyV2Codec.java new file mode 100644 index 00000000000..2af25a1ab98 --- /dev/null +++ b/src/main/java/org/tikv/common/apiversion/RequestKeyV2Codec.java @@ -0,0 +1,102 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common.apiversion; + +import com.google.protobuf.ByteString; +import org.tikv.common.util.Pair; +import org.tikv.kvproto.Metapb; +import org.tikv.kvproto.Metapb.Region; + +public class RequestKeyV2Codec implements RequestKeyCodec { + protected static final ByteString RAW_KEY_PREFIX = ByteString.copyFromUtf8("r"); + protected static final ByteString RAW_END_KEY = + ByteString.copyFrom(new byte[] {(byte) (RAW_KEY_PREFIX.toByteArray()[0] + 1)}); + + protected static final ByteString TXN_KEY_PREFIX = ByteString.copyFromUtf8("x"); + protected static final ByteString TXN_END_KEY = + ByteString.copyFrom(new byte[] {(byte) (TXN_KEY_PREFIX.toByteArray()[0] + 1)}); + protected ByteString keyPrefix; + + protected ByteString infiniteEndKey; + + @Override + public ByteString encodeKey(ByteString key) { + return keyPrefix.concat(key); + } + + @Override + public ByteString decodeKey(ByteString key) { + if (key.isEmpty()) { + return key; + } + + if (!key.startsWith(keyPrefix)) { + throw new IllegalArgumentException("key corrupted, wrong prefix"); + } + + return key.substring(1); + } + + @Override + public Pair encodeRange(ByteString start, ByteString end) { + start = encodeKey(start); + + end = end.isEmpty() ? infiniteEndKey : encodeKey(end); + + return Pair.create(start, end); + } + + @Override + public ByteString encodePdQuery(ByteString key) { + return CodecUtils.encode(encodeKey(key)); + } + + @Override + public Pair encodePdQueryRange(ByteString start, ByteString end) { + Pair range = encodeRange(start, end); + return Pair.create(CodecUtils.encode(range.first), CodecUtils.encode(range.second)); + } + + @Override + public Region decodeRegion(Region region) { + Metapb.Region.Builder builder = Metapb.Region.newBuilder().mergeFrom(region); + + ByteString start = region.getStartKey(); + ByteString end = region.getEndKey(); + + if (!start.isEmpty()) { + start = CodecUtils.decode(start); + if (ByteString.unsignedLexicographicalComparator().compare(start, keyPrefix) < 0) { + start = ByteString.EMPTY; + } else { + start = decodeKey(start); + } + } + + if (!end.isEmpty()) { + end = CodecUtils.decode(end); + if (ByteString.unsignedLexicographicalComparator().compare(end, infiniteEndKey) >= 0) { + end = ByteString.EMPTY; + } else { + end = decodeKey(end); + } + } + + return builder.setStartKey(start).setEndKey(end).build(); + } +} diff --git a/src/main/java/org/tikv/common/apiversion/RequestKeyV2RawCodec.java b/src/main/java/org/tikv/common/apiversion/RequestKeyV2RawCodec.java new file mode 100644 index 00000000000..255815d8403 --- /dev/null +++ b/src/main/java/org/tikv/common/apiversion/RequestKeyV2RawCodec.java @@ -0,0 +1,27 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common.apiversion; + +public class RequestKeyV2RawCodec extends RequestKeyV2Codec { + public RequestKeyV2RawCodec() { + super(); + + this.keyPrefix = RAW_KEY_PREFIX; + this.infiniteEndKey = RAW_END_KEY; + } +} diff --git a/src/main/java/org/tikv/common/apiversion/RequestKeyV2TxnCodec.java b/src/main/java/org/tikv/common/apiversion/RequestKeyV2TxnCodec.java new file mode 100644 index 00000000000..9fe0effccd5 --- /dev/null +++ b/src/main/java/org/tikv/common/apiversion/RequestKeyV2TxnCodec.java @@ -0,0 +1,27 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common.apiversion; + +public class RequestKeyV2TxnCodec extends RequestKeyV2Codec { + public RequestKeyV2TxnCodec() { + super(); + + this.keyPrefix = TXN_KEY_PREFIX; + this.infiniteEndKey = TXN_END_KEY; + } +} diff --git a/src/main/java/org/tikv/common/importer/ImporterClient.java b/src/main/java/org/tikv/common/importer/ImporterClient.java index 882880ab00c..6f45a9a4d17 100644 --- a/src/main/java/org/tikv/common/importer/ImporterClient.java +++ b/src/main/java/org/tikv/common/importer/ImporterClient.java @@ -29,8 +29,7 @@ import org.slf4j.LoggerFactory; import org.tikv.common.TiConfiguration; import org.tikv.common.TiSession; -import org.tikv.common.codec.Codec; -import org.tikv.common.codec.CodecDataOutput; +import org.tikv.common.apiversion.RequestKeyCodec; import org.tikv.common.exception.GrpcException; import org.tikv.common.exception.RegionException; import org.tikv.common.exception.TiKVException; @@ -43,18 +42,19 @@ import org.tikv.common.util.Pair; import org.tikv.kvproto.Errorpb.Error; import org.tikv.kvproto.ImportSstpb; +import org.tikv.kvproto.ImportSstpb.RawWriteBatch; import org.tikv.kvproto.Metapb; public class ImporterClient { private static final Logger logger = LoggerFactory.getLogger(ImporterClient.class); - private TiConfiguration tiConf; - private TiSession tiSession; - private ByteString uuid; - private Key minKey; - private Key maxKey; + private final TiConfiguration tiConf; + private final TiSession tiSession; + private final ByteString uuid; + private final Key minKey; + private final Key maxKey; private TiRegion region; - private Long ttl; + private final Long ttl; private boolean deduplicate = false; @@ -63,6 +63,8 @@ public class ImporterClient { private List clientList; private ImporterStoreClient clientLeader; + private final RequestKeyCodec codec; + public ImporterClient( TiSession tiSession, ByteString uuid, Key minKey, Key maxKey, TiRegion region, Long ttl) { this.uuid = uuid; @@ -72,6 +74,7 @@ public ImporterClient( this.maxKey = maxKey; this.region = region; this.ttl = ttl; + this.codec = tiSession.getPDClient().getCodec(); } public boolean isDeduplicate() { @@ -108,9 +111,9 @@ public void write(Iterator> iterator) throws TiKVEx String.format("duplicate key found, key = %s", preKey.toStringUtf8())); } } else { - pairs.add( - ImportSstpb.Pair.newBuilder().setKey(pair.first).setValue(pair.second).build()); - totalBytes += (pair.first.size() + pair.second.size()); + ByteString key = codec.encodeKey(pair.first); + pairs.add(ImportSstpb.Pair.newBuilder().setKey(key).setValue(pair.second).build()); + totalBytes += (key.size() + pair.second.size()); preKey = pair.first; } } @@ -137,19 +140,15 @@ public void write(Iterator> iterator) throws TiKVEx private void init() { long regionId = region.getId(); Metapb.RegionEpoch regionEpoch = region.getRegionEpoch(); + Pair keyRange = + codec.encodePdQueryRange(minKey.toByteString(), maxKey.toByteString()); + ImportSstpb.Range range = - tiConf.isTxnKVMode() - ? ImportSstpb.Range.newBuilder() - .setStart(encode(minKey.toByteString())) - .setEnd(encode(maxKey.toByteString())) - .build() - : ImportSstpb.Range.newBuilder() - .setStart(minKey.toByteString()) - .setEnd(maxKey.toByteString()) - .build(); + ImportSstpb.Range.newBuilder().setStart(keyRange.first).setEnd(keyRange.second).build(); sstMeta = ImportSstpb.SSTMeta.newBuilder() + .setApiVersion(tiConf.getApiVersion().toPb()) .setUuid(uuid) .setRegionId(regionId) .setRegionEpoch(regionEpoch) @@ -170,12 +169,6 @@ private void init() { } } - private ByteString encode(ByteString key) { - CodecDataOutput cdo = new CodecDataOutput(); - Codec.BytesCodec.writeBytes(cdo, key.toByteArray()); - return cdo.toByteString(); - } - private void startWrite() { for (ImporterStoreClient client : clientList) { client.startWrite(); @@ -216,11 +209,14 @@ private void writeBatch(List pairs) { } else { ImportSstpb.RawWriteBatch batch; - if (ttl == null || ttl <= 0) { - batch = ImportSstpb.RawWriteBatch.newBuilder().addAllPairs(pairs).build(); - } else { - batch = ImportSstpb.RawWriteBatch.newBuilder().addAllPairs(pairs).setTtl(ttl).build(); + RawWriteBatch.Builder batchBuilder = RawWriteBatch.newBuilder().addAllPairs(pairs); + if (ttl != null && ttl > 0) { + batchBuilder.setTtl(ttl); + } + if (tiConf.getApiVersion().isV2()) { + batchBuilder.setTs(tiSession.getTimestamp().getVersion()); } + batch = batchBuilder.build(); ImportSstpb.RawWriteRequest request = ImportSstpb.RawWriteRequest.newBuilder().setBatch(batch).build(); diff --git a/src/main/java/org/tikv/common/operation/KVErrorHandler.java b/src/main/java/org/tikv/common/operation/KVErrorHandler.java index 135664530bf..22d05658425 100644 --- a/src/main/java/org/tikv/common/operation/KVErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/KVErrorHandler.java @@ -23,6 +23,7 @@ import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.tikv.common.apiversion.RequestKeyCodec; import org.tikv.common.exception.GrpcException; import org.tikv.common.exception.KeyException; import org.tikv.common.region.RegionErrorReceiver; @@ -44,6 +45,8 @@ public class KVErrorHandler implements ErrorHandler { private final boolean forWrite; private final RegionErrorHandler regionHandler; + private final RequestKeyCodec codec; + public KVErrorHandler( RegionManager regionManager, RegionErrorReceiver recv, @@ -59,6 +62,7 @@ public KVErrorHandler( this.resolveLockResultCallback = resolveLockResultCallback; this.callerStartTS = callerStartTS; this.forWrite = forWrite; + this.codec = regionManager.getPDClient().getCodec(); } private void resolveLock(BackOffer backOffer, Lock lock) { @@ -100,7 +104,7 @@ public boolean handleResponseError(BackOffer backOffer, RespT resp) { Kvrpcpb.KeyError keyError = getKeyError.apply(resp); if (keyError != null) { try { - Lock lock = AbstractLockResolverClient.extractLockFromKeyErr(keyError); + Lock lock = AbstractLockResolverClient.extractLockFromKeyErr(keyError, codec); resolveLock(backOffer, lock); return true; } catch (KeyException e) { diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java index a809d304412..7e7eff2b9dc 100644 --- a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java @@ -212,6 +212,8 @@ private boolean onRegionEpochNotMatch(BackOffer backOffer, List c List newRegions = new ArrayList<>(currentRegions.size()); // If the region epoch is not ahead of TiKV's, replace region meta in region cache. for (Metapb.Region meta : currentRegions) { + // The region needs to be decoded to plain format. + meta = regionManager.getPDClient().getCodec().decodeRegion(meta); TiRegion region = regionManager.createRegion(meta, backOffer); newRegions.add(region); if (recv.getRegion().getVerID() == region.getVerID()) { diff --git a/src/main/java/org/tikv/common/operation/iterator/RawScanIterator.java b/src/main/java/org/tikv/common/operation/iterator/RawScanIterator.java index 09f17c99fb5..5b27132699d 100644 --- a/src/main/java/org/tikv/common/operation/iterator/RawScanIterator.java +++ b/src/main/java/org/tikv/common/operation/iterator/RawScanIterator.java @@ -30,7 +30,6 @@ import org.tikv.kvproto.Kvrpcpb; public class RawScanIterator extends ScanIterator { - private final BackOffer scanBackOffer; public RawScanIterator( diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index 8bbaf91f599..f848de8798d 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory; import org.tikv.common.AbstractGRPCClient; import org.tikv.common.TiConfiguration; +import org.tikv.common.apiversion.RequestKeyCodec; import org.tikv.common.exception.GrpcException; import org.tikv.common.log.SlowLog; import org.tikv.common.log.SlowLogSpan; @@ -66,6 +67,7 @@ public abstract class AbstractRegionStoreClient .register(); protected final RegionManager regionManager; + protected final RequestKeyCodec codec; protected TiRegion region; protected TiStore store; @@ -84,6 +86,7 @@ protected AbstractRegionStoreClient( this.region = region; this.regionManager = regionManager; this.store = store; + this.codec = regionManager.getPDClient().getCodec(); if (this.store.getProxyStore() != null) { this.timeout = conf.getForwardTimeout(); } @@ -183,6 +186,14 @@ protected Kvrpcpb.Context makeContext( return addTraceId(context, slowLog); } + protected Kvrpcpb.Context makeContext() { + return region.getLeaderContext(); + } + + protected Kvrpcpb.Context makeContext(Metapb.Peer peer) { + return region.getReplicaContext(peer); + } + private void updateClientStub() { String addressStr = store.getStore().getAddress(); long deadline = timeout; @@ -294,8 +305,8 @@ private Metapb.Peer switchLeaderStore(BackOffer backOffer) { TikvGrpc.newFutureStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS); Kvrpcpb.RawGetRequest rawGetRequest = Kvrpcpb.RawGetRequest.newBuilder() - .setContext(region.getReplicaContext(peer)) - .setKey(key) + .setContext(makeContext(peer)) + .setKey(codec.encodeKey(key)) .build(); ListenableFuture task = stub.rawGet(rawGetRequest); responses.add(new SwitchLeaderTask(task, peer)); @@ -356,8 +367,8 @@ private TiStore switchProxyStore(BackOffer backOffer) { header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress()); Kvrpcpb.RawGetRequest rawGetRequest = Kvrpcpb.RawGetRequest.newBuilder() - .setContext(region.getReplicaContext(region.getLeader())) - .setKey(key) + .setContext(makeContext()) + .setKey(codec.encodeKey(key)) .build(); ListenableFuture task = MetadataUtils.attachHeaders(stub, header).rawGet(rawGetRequest); diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java index 66daeaa828c..42576fa7ea7 100644 --- a/src/main/java/org/tikv/common/region/RegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java @@ -43,6 +43,8 @@ import java.util.Queue; import java.util.Set; import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.PDClient; @@ -125,7 +127,6 @@ /** Note that RegionStoreClient itself is not thread-safe */ public class RegionStoreClient extends AbstractRegionStoreClient { - private static final Logger logger = LoggerFactory.getLogger(RegionStoreClient.class); @VisibleForTesting public final AbstractLockResolverClient lockResolverClient; private final TiStoreType storeType; @@ -236,7 +237,7 @@ public ByteString get(BackOffer backOffer, ByteString key, long version) GetRequest.newBuilder() .setContext( makeContext(getResolvedLocks(version), this.storeType, backOffer.getSlowLog())) - .setKey(key) + .setKey(codec.encodeKey(key)) .setVersion(version) .build(); @@ -275,14 +276,14 @@ private void handleGetResponse(GetResponse resp) throws TiClientInternalExceptio } } - public List batchGet(BackOffer backOffer, Iterable keys, long version) { + public List batchGet(BackOffer backOffer, List keys, long version) { boolean forWrite = false; Supplier request = () -> BatchGetRequest.newBuilder() .setContext( makeContext(getResolvedLocks(version), this.storeType, backOffer.getSlowLog())) - .addAllKeys(keys) + .addAllKeys(codec.encodeKeys(keys)) .setVersion(version) .build(); KVErrorHandler handler = @@ -315,7 +316,7 @@ private List handleBatchGetResponse( for (KvPair pair : resp.getPairsList()) { if (pair.hasError()) { if (pair.getError().hasLocked()) { - Lock lock = new Lock(pair.getError().getLocked()); + Lock lock = new Lock(pair.getError().getLocked(), codec); locks.add(lock); } else { throw new KeyException(pair.getError()); @@ -329,9 +330,9 @@ private List handleBatchGetResponse( addResolvedLocks(version, resolveLockResult.getResolvedLocks()); // resolveLocks already retried, just throw error to upper logic. throw new TiKVException("locks not resolved, retry"); - } else { - return resp.getPairsList(); } + + return codec.decodeKvPairs(resp.getPairsList()); } public List scan( @@ -347,7 +348,7 @@ public List scan( .setContext( makeContext( getResolvedLocks(version), this.storeType, backOffer.getSlowLog())) - .setStartKey(startKey) + .setStartKey(codec.encodeKey(startKey)) .setVersion(version) .setKeyOnly(keyOnly) .setLimit(getConf().getScanBatchSize()) @@ -389,7 +390,7 @@ private List doScan(ScanResponse resp) { List newKvPairs = new ArrayList<>(); for (KvPair kvPair : kvPairs) { if (kvPair.hasError()) { - Lock lock = AbstractLockResolverClient.extractLockFromKeyErr(kvPair.getError()); + Lock lock = AbstractLockResolverClient.extractLockFromKeyErr(kvPair.getError(), codec); newKvPairs.add( KvPair.newBuilder() .setError(kvPair.getError()) @@ -397,7 +398,7 @@ private List doScan(ScanResponse resp) { .setKey(lock.getKey()) .build()); } else { - newKvPairs.add(kvPair); + newKvPairs.add(codec.decodeKvPair(kvPair)); } } return Collections.unmodifiableList(newKvPairs); @@ -420,11 +421,7 @@ public List scan(BackOffer backOffer, ByteString startKey, long version) * @throws RegionException region error occurs */ public void prewrite( - BackOffer backOffer, - ByteString primary, - Iterable mutations, - long startTs, - long lockTTL) + BackOffer backOffer, ByteString primary, List mutations, long startTs, long lockTTL) throws TiClientInternalException, KeyException, RegionException { this.prewrite(backOffer, primary, mutations, startTs, lockTTL, false); } @@ -437,7 +434,7 @@ public void prewrite( public void prewrite( BackOffer bo, ByteString primaryLock, - Iterable mutations, + List mutations, long startTs, long ttl, boolean skipConstraintCheck) @@ -450,8 +447,8 @@ public void prewrite( ? PrewriteRequest.newBuilder() .setContext(makeContext(storeType, bo.getSlowLog())) .setStartVersion(startTs) - .setPrimaryLock(primaryLock) - .addAllMutations(mutations) + .setPrimaryLock(codec.encodeKey(primaryLock)) + .addAllMutations(codec.encodeMutations(mutations)) .setLockTtl(ttl) .setSkipConstraintCheck(skipConstraintCheck) .setMinCommitTs(startTs) @@ -510,7 +507,7 @@ private boolean isPrewriteSuccess(BackOffer backOffer, PrewriteResponse resp, lo for (KeyError err : resp.getErrorsList()) { if (err.hasLocked()) { isSuccess = false; - Lock lock = new Lock(err.getLocked()); + Lock lock = new Lock(err.getLocked(), codec); locks.add(lock); } else { throw new KeyException(err.toString()); @@ -595,7 +592,10 @@ public void commit(BackOffer backOffer, Iterable keys, long startTs, CommitRequest.newBuilder() .setStartVersion(startTs) .setCommitVersion(commitTs) - .addAllKeys(keys) + .addAllKeys( + StreamSupport.stream(keys.spliterator(), false) + .map(codec::encodeKey) + .collect(Collectors.toList())) .setContext(makeContext(storeType, backOffer.getSlowLog())) .build(); KVErrorHandler handler = @@ -714,7 +714,7 @@ private List handleCopResponse( } if (response.hasLocked()) { - Lock lock = new Lock(response.getLocked()); + Lock lock = new Lock(response.getLocked(), codec); logger.debug(String.format("coprocessor encounters locks: %s", lock)); ResolveLockResult resolveLockResult = lockResolverClient.resolveLocks( @@ -817,12 +817,12 @@ public Iterator coprocessStreaming( * @param splitKeys is the split points for a specific region. * @return a split region info. */ - public List splitRegion(Iterable splitKeys) { + public List splitRegion(List splitKeys) { Supplier request = () -> SplitRegionRequest.newBuilder() .setContext(makeContext(storeType, SlowLogEmptyImpl.INSTANCE)) - .addAllSplitKeys(splitKeys) + .addAllSplitKeys(codec.encodeKeys(splitKeys)) .setIsRawKv(conf.isRawKVMode()) .build(); @@ -852,11 +852,13 @@ public List splitRegion(Iterable splitKeys) { if (resp.hasRegionError()) { throw new TiClientInternalException( String.format( - "failed to split region %d because %s", - region.getId(), resp.getRegionError().toString())); + "failed to split region %d because %s", region.getId(), resp.getRegionError())); } - return resp.getRegionsList(); + if (conf.getApiVersion().isV1()) { + return resp.getRegionsList(); + } + return resp.getRegionsList().stream().map(codec::decodeRegion).collect(Collectors.toList()); } // APIs for Raw Scan/Put/Get/Delete @@ -870,7 +872,7 @@ public Optional rawGet(BackOffer backOffer, ByteString key) { () -> RawGetRequest.newBuilder() .setContext(makeContext(storeType, backOffer.getSlowLog())) - .setKey(key) + .setKey(codec.encodeKey(key)) .build(); RegionErrorHandler handler = new RegionErrorHandler( @@ -912,7 +914,7 @@ public Optional rawGetKeyTTL(BackOffer backOffer, ByteString key) { () -> RawGetKeyTTLRequest.newBuilder() .setContext(makeContext(storeType, backOffer.getSlowLog())) - .setKey(key) + .setKey(codec.encodeKey(key)) .build(); RegionErrorHandler handler = new RegionErrorHandler( @@ -954,7 +956,7 @@ public void rawDelete(BackOffer backOffer, ByteString key, boolean atomicForCAS) () -> RawDeleteRequest.newBuilder() .setContext(makeContext(storeType, backOffer.getSlowLog())) - .setKey(key) + .setKey(codec.encodeKey(key)) .setForCas(atomicForCAS) .build(); @@ -993,7 +995,7 @@ public void rawPut( () -> RawPutRequest.newBuilder() .setContext(makeContext(storeType, backOffer.getSlowLog())) - .setKey(key) + .setKey(codec.encodeKey(key)) .setValue(value) .setTtl(ttl) .setForCas(atomicForCAS) @@ -1040,7 +1042,7 @@ public void rawCompareAndSet( () -> RawCASRequest.newBuilder() .setContext(makeContext(storeType, backOffer.getSlowLog())) - .setKey(key) + .setKey(codec.encodeKey(key)) .setValue(value) .setPreviousValue(prevValue.orElse(ByteString.EMPTY)) .setPreviousNotExist(!prevValue.isPresent()) @@ -1096,7 +1098,7 @@ public List rawBatchGet(BackOffer backoffer, List keys) { () -> RawBatchGetRequest.newBuilder() .setContext(makeContext(storeType, backoffer.getSlowLog())) - .addAllKeys(keys) + .addAllKeys(codec.encodeKeys(keys)) .build(); RegionErrorHandler handler = new RegionErrorHandler( @@ -1117,7 +1119,8 @@ private List handleRawBatchGet(RawBatchGetResponse resp) { if (resp.hasRegionError()) { throw new RegionException(resp.getRegionError()); } - return resp.getPairsList(); + + return codec.decodeKvPairs(resp.getPairsList()); } public void rawBatchPut( @@ -1156,7 +1159,7 @@ public void rawBatchPut(BackOffer backOffer, Batch batch, long ttl, boolean atom for (int i = 0; i < batch.getKeys().size(); i++) { pairs.add( KvPair.newBuilder() - .setKey(batch.getKeys().get(i)) + .setKey(codec.encodeKey(batch.getKeys().get(i))) .setValue(batch.getValues().get(i)) .build()); } @@ -1191,7 +1194,7 @@ public void rawBatchDelete(BackOffer backoffer, List keys, boolean a () -> RawBatchDeleteRequest.newBuilder() .setContext(makeContext(storeType, backoffer.getSlowLog())) - .addAllKeys(keys) + .addAllKeys(codec.encodeKeys(keys)) .setForCas(atomicForCAS) .build(); RegionErrorHandler handler = @@ -1234,13 +1237,16 @@ public List rawScan(BackOffer backOffer, ByteString key, int limit, bool GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_scan", clusterId.toString()).startTimer(); try { Supplier factory = - () -> - RawScanRequest.newBuilder() - .setContext(makeContext(storeType, backOffer.getSlowLog())) - .setStartKey(key) - .setKeyOnly(keyOnly) - .setLimit(limit) - .build(); + () -> { + Pair range = codec.encodeRange(key, ByteString.EMPTY); + return RawScanRequest.newBuilder() + .setContext(makeContext(storeType, backOffer.getSlowLog())) + .setStartKey(range.first) + .setEndKey(range.second) + .setKeyOnly(keyOnly) + .setLimit(limit) + .build(); + }; RegionErrorHandler handler = new RegionErrorHandler( @@ -1265,7 +1271,7 @@ private List rawScanHelper(RawScanResponse resp) { if (resp.hasRegionError()) { throw new RegionException(resp.getRegionError()); } - return resp.getKvsList(); + return codec.decodeKvPairs(resp.getKvsList()); } /** @@ -1283,12 +1289,14 @@ public void rawDeleteRange(BackOffer backOffer, ByteString startKey, ByteString .startTimer(); try { Supplier factory = - () -> - RawDeleteRangeRequest.newBuilder() - .setContext(makeContext(storeType, backOffer.getSlowLog())) - .setStartKey(startKey) - .setEndKey(endKey) - .build(); + () -> { + Pair range = codec.encodeRange(startKey, endKey); + return RawDeleteRangeRequest.newBuilder() + .setContext(makeContext(storeType, backOffer.getSlowLog())) + .setStartKey(range.first) + .setEndKey(range.second) + .build(); + }; RegionErrorHandler handler = new RegionErrorHandler( diff --git a/src/main/java/org/tikv/common/region/TiRegion.java b/src/main/java/org/tikv/common/region/TiRegion.java index b24c5f66eb4..3c0ce8e48d4 100644 --- a/src/main/java/org/tikv/common/region/TiRegion.java +++ b/src/main/java/org/tikv/common/region/TiRegion.java @@ -169,6 +169,7 @@ private Kvrpcpb.Context getContext( Kvrpcpb.Context.Builder builder = Kvrpcpb.Context.newBuilder(); builder + .setApiVersion(conf.getApiVersion().toPb()) .setIsolationLevel(this.isolationLevel) .setPriority(this.commandPri) .setRegionId(meta.getId()) diff --git a/src/main/java/org/tikv/txn/AbstractLockResolverClient.java b/src/main/java/org/tikv/txn/AbstractLockResolverClient.java index 1914fc41cee..2adc87a78f4 100644 --- a/src/main/java/org/tikv/txn/AbstractLockResolverClient.java +++ b/src/main/java/org/tikv/txn/AbstractLockResolverClient.java @@ -22,6 +22,7 @@ import org.tikv.common.StoreVersion; import org.tikv.common.TiConfiguration; import org.tikv.common.Version; +import org.tikv.common.apiversion.RequestKeyCodec; import org.tikv.common.exception.KeyException; import org.tikv.common.region.RegionManager; import org.tikv.common.region.RegionStoreClient; @@ -39,9 +40,9 @@ public interface AbstractLockResolverClient { /** transaction involves keys exceed this threshold can be treated as `big transaction`. */ long BIG_TXN_THRESHOLD = 16; - static Lock extractLockFromKeyErr(Kvrpcpb.KeyError keyError) { + static Lock extractLockFromKeyErr(Kvrpcpb.KeyError keyError, RequestKeyCodec codec) { if (keyError.hasLocked()) { - return new Lock(keyError.getLocked()); + return new Lock(keyError.getLocked(), codec); } if (keyError.hasConflict()) { @@ -49,7 +50,7 @@ static Lock extractLockFromKeyErr(Kvrpcpb.KeyError keyError) { throw new KeyException( String.format( "scan meet key conflict on primary key %s at commit ts %s", - conflict.getPrimary(), conflict.getConflictTs())); + codec.decodeKey(conflict.getPrimary()), conflict.getConflictTs())); } if (!keyError.getRetryable().isEmpty()) { diff --git a/src/main/java/org/tikv/txn/Lock.java b/src/main/java/org/tikv/txn/Lock.java index 2562d70a005..6dbb9735267 100644 --- a/src/main/java/org/tikv/txn/Lock.java +++ b/src/main/java/org/tikv/txn/Lock.java @@ -18,6 +18,7 @@ package org.tikv.txn; import com.google.protobuf.ByteString; +import org.tikv.common.apiversion.RequestKeyCodec; import org.tikv.kvproto.Kvrpcpb; public class Lock { @@ -30,10 +31,10 @@ public class Lock { private final Kvrpcpb.Op lockType; private final long lockForUpdateTs; - public Lock(Kvrpcpb.LockInfo l) { + public Lock(Kvrpcpb.LockInfo l, RequestKeyCodec codec) { txnID = l.getLockVersion(); - key = l.getKey(); - primary = l.getPrimaryLock(); + key = codec.decodeKey(l.getKey()); + primary = codec.decodeKey(l.getPrimaryLock()); ttl = l.getLockTtl() == 0 ? DEFAULT_LOCK_TTL : l.getLockTtl(); txnSize = l.getTxnSize(); lockType = l.getLockType(); diff --git a/src/main/java/org/tikv/txn/LockResolverClientV4.java b/src/main/java/org/tikv/txn/LockResolverClientV4.java index 4a723537c51..e433df42011 100644 --- a/src/main/java/org/tikv/txn/LockResolverClientV4.java +++ b/src/main/java/org/tikv/txn/LockResolverClientV4.java @@ -39,8 +39,12 @@ import org.tikv.common.exception.RegionException; import org.tikv.common.exception.TiClientInternalException; import org.tikv.common.operation.KVErrorHandler; -import org.tikv.common.region.*; +import org.tikv.common.region.AbstractRegionStoreClient; +import org.tikv.common.region.RegionManager; +import org.tikv.common.region.RegionStoreClient; +import org.tikv.common.region.TiRegion; import org.tikv.common.region.TiRegion.RegionVerID; +import org.tikv.common.region.TiStore; import org.tikv.common.util.BackOffer; import org.tikv.common.util.ChannelFactory; import org.tikv.common.util.TsoUtils; @@ -167,10 +171,10 @@ private void resolvePessimisticLock(BackOffer bo, Lock lock, Set cl Supplier factory = () -> Kvrpcpb.PessimisticRollbackRequest.newBuilder() - .setContext(region.getLeaderContext()) + .setContext(makeContext()) + .addKeys(codec.encodeKey(lock.getKey())) .setStartVersion(lock.getTxnID()) .setForUpdateTs(forUpdateTS) - .addKeys(lock.getKey()) .build(); KVErrorHandler handler = @@ -286,7 +290,7 @@ private TxnStatus getTxnStatus( TiRegion primaryKeyRegion = regionManager.getRegionByKey(primary); return Kvrpcpb.CheckTxnStatusRequest.newBuilder() .setContext(primaryKeyRegion.getLeaderContext()) - .setPrimaryKey(primary) + .setPrimaryKey(codec.encodeKey(primary)) .setLockTs(txnID) .setCallerStartTs(callerStartTS) .setCurrentTs(currentTS) @@ -362,7 +366,7 @@ private void resolveLock( Kvrpcpb.ResolveLockRequest.Builder builder = Kvrpcpb.ResolveLockRequest.newBuilder() - .setContext(region.getLeaderContext()) + .setContext(makeContext()) .setStartVersion(lock.getTxnID()); if (txnStatus.isCommitted()) { @@ -373,7 +377,7 @@ private void resolveLock( if (lock.getTxnSize() < BIG_TXN_THRESHOLD) { // Only resolve specified keys when it is a small transaction, // prevent from scanning the whole region in this case. - builder.addKeys(lock.getKey()); + builder.addKeys(codec.encodeKey(lock.getKey())); } Supplier factory = builder::build; diff --git a/src/test/java/org/tikv/BaseRawKVTest.java b/src/test/java/org/tikv/BaseRawKVTest.java index 9b8a9042f3c..f2d12dc5cbd 100644 --- a/src/test/java/org/tikv/BaseRawKVTest.java +++ b/src/test/java/org/tikv/BaseRawKVTest.java @@ -18,13 +18,13 @@ package org.tikv; import org.tikv.common.PDClient; +import org.tikv.common.StoreConfig; import org.tikv.common.StoreVersion; import org.tikv.common.TiConfiguration; import org.tikv.common.TiSession; import org.tikv.util.TestUtils; public class BaseRawKVTest { - protected boolean tikvVersionNewerThan(String expectedVersion) { TiConfiguration conf = createTiConfiguration(); TiSession session = TiSession.create(conf); @@ -43,6 +43,15 @@ protected TiConfiguration createTiConfiguration() { conf.setEnableAtomicForCAS(true); conf.setEnableGrpcForward(false); conf.setEnableAtomicForCAS(true); + conf.setRawKVScanTimeoutInMS(1000000000); + + conf.setWarmUpEnable(false); + try (TiSession session = TiSession.create(conf)) { + PDClient pdClient = session.getPDClient(); + conf.setApiVersion(StoreConfig.acquireApiVersion(pdClient)); + } catch (Exception ignore) { + } + return conf; } } diff --git a/src/test/java/org/tikv/BaseTxnKVTest.java b/src/test/java/org/tikv/BaseTxnKVTest.java index 82fa45135f9..965cd61a1d5 100644 --- a/src/test/java/org/tikv/BaseTxnKVTest.java +++ b/src/test/java/org/tikv/BaseTxnKVTest.java @@ -17,7 +17,9 @@ package org.tikv; +import org.tikv.common.StoreConfig; import org.tikv.common.TiConfiguration; +import org.tikv.common.TiSession; import org.tikv.util.TestUtils; public class BaseTxnKVTest { @@ -30,6 +32,11 @@ protected TiConfiguration createTiConfiguration() { : TiConfiguration.createDefault(pdAddrsStr); conf.setTest(true); conf.setEnableGrpcForward(false); + + try (TiSession session = TiSession.create(conf)) { + conf.setApiVersion(StoreConfig.acquireApiVersion(session.getPDClient())); + } catch (Exception ignore) { + } return conf; } } diff --git a/src/test/java/org/tikv/common/ApiVersionTest.java b/src/test/java/org/tikv/common/ApiVersionTest.java new file mode 100644 index 00000000000..b6ab167f77f --- /dev/null +++ b/src/test/java/org/tikv/common/ApiVersionTest.java @@ -0,0 +1,153 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common; + +import com.google.protobuf.ByteString; +import java.util.Optional; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.tikv.common.TiConfiguration.ApiVersion; +import org.tikv.raw.RawKVClient; + +public class ApiVersionTest { + private static final Logger logger = LoggerFactory.getLogger(ApiVersionTest.class); + + private TiConfiguration createConfiguration() { + TiConfiguration conf = TiConfiguration.createRawDefault(); + + conf.setTest(true); + conf.setEnableAtomicForCAS(true); + conf.setWarmUpEnable(false); + + return conf; + } + + private RawKVClient createRawClient(ApiVersion apiVersion) { + TiConfiguration conf = createConfiguration(); + conf.setApiVersion(apiVersion); + return TiSession.create(conf).createRawClient(); + } + + private ApiVersion getClusterApiVersion() { + return StoreConfig.acquireApiVersion(TiSession.create(createConfiguration()).getPDClient()); + } + + private boolean getClusterEnabledTtl() { + return StoreConfig.ifTllEnable(TiSession.create(createConfiguration()).getPDClient()); + } + + private boolean minTiKVVersion(String version) { + return StoreVersion.minTiKVVersion( + version, TiSession.create(createConfiguration()).getPDClient()); + } + + @Test + public void testAccessV2Cluster() { + Assume.assumeTrue(getClusterApiVersion().isV2()); + + Assert.assertTrue(getClusterEnabledTtl()); + + // V1 client can't access V2 cluster + RawKVClient client = createRawClient(ApiVersion.V1); + try { + client.get(ByteString.EMPTY); + Assert.fail("Should not be able to access V2 cluster with V1 client"); + } catch (Exception e) { + Assert.assertNotNull(e); + } + + try { + client.put(ByteString.EMPTY, ByteString.EMPTY, 10); + Assert.fail("Should not be able to access V2 cluster with V1 client using TTL"); + } catch (Exception e) { + Assert.assertNotNull(e); + } + + // V2 client can access V2 cluster + client = createRawClient(ApiVersion.V2); + client.putIfAbsent(ByteString.EMPTY, ByteString.EMPTY); + client.put(ByteString.EMPTY, ByteString.EMPTY, 10); + Optional result = client.get(ByteString.EMPTY); + Assert.assertTrue(result.isPresent()); + result.ifPresent(value -> Assert.assertEquals(ByteString.EMPTY, value)); + + client.delete(ByteString.EMPTY); + } + + @Test + public void testAccessV1Cluster() { + Assume.assumeTrue(minTiKVVersion("6.0.0")); + Assume.assumeTrue(getClusterApiVersion().isV1()); + Assume.assumeFalse(getClusterEnabledTtl()); + + // V1 client can access V1 cluster's raw data, no ttl allowed + RawKVClient client = createRawClient(ApiVersion.V1); + client.put(ByteString.EMPTY, ByteString.EMPTY); + Optional result = client.get(ByteString.EMPTY); + Assert.assertTrue(result.isPresent()); + result.ifPresent(value -> Assert.assertEquals(ByteString.EMPTY, value)); + client.delete(ByteString.EMPTY); + + try { + client.put(ByteString.EMPTY, ByteString.EMPTY, 10); + Assert.fail("Should not be able to access V1 cluster without TTL"); + } catch (Exception e) { + Assert.assertNotNull(e); + } + + // V2 client can't access V1 cluster + client = createRawClient(ApiVersion.V2); + try { + client.put(ByteString.EMPTY, ByteString.EMPTY); + Assert.fail("Should not be able to access V1 cluster with V2 Client"); + } catch (Exception e) { + Assert.assertNotNull(e); + } + } + + @Test + public void testAccessV1ClusterWithTtl() throws InterruptedException { + Assume.assumeTrue(minTiKVVersion("6.0.0")); + Assume.assumeTrue(getClusterApiVersion().isV1()); + Assume.assumeTrue(getClusterEnabledTtl()); + + // V1 client can access V1 cluster's raw data, ttl allowed + RawKVClient client = createRawClient(ApiVersion.V1); + client.put(ByteString.EMPTY, ByteString.EMPTY, 5); + Optional result = client.get(ByteString.EMPTY); + Assert.assertTrue(result.isPresent()); + result.ifPresent(value -> Assert.assertEquals(ByteString.EMPTY, value)); + + logger.info("Waiting for ttl to expire"); + Thread.sleep(5000); + + Assert.assertFalse(client.get(ByteString.EMPTY).isPresent()); + + // V2 client can't access V1 cluster with TTL + client = createRawClient(ApiVersion.V2); + try { + client.put(ByteString.EMPTY, ByteString.EMPTY, 5); + Assert.fail("Should not be able to access V1 cluster with TTL with V2 Client"); + } catch (Exception e) { + Assert.assertNotNull(e); + } + } +} diff --git a/src/test/java/org/tikv/common/KVMockServer.java b/src/test/java/org/tikv/common/KVMockServer.java index 18de79819ef..69d8a55ee0e 100644 --- a/src/test/java/org/tikv/common/KVMockServer.java +++ b/src/test/java/org/tikv/common/KVMockServer.java @@ -34,12 +34,14 @@ import io.grpc.stub.StreamObserver; import java.io.IOException; import java.net.ServerSocket; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.SortedMap; import java.util.TreeMap; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,8 +51,6 @@ import org.tikv.kvproto.Errorpb; import org.tikv.kvproto.Errorpb.EpochNotMatch; import org.tikv.kvproto.Errorpb.Error; -import org.tikv.kvproto.Errorpb.NotLeader; -import org.tikv.kvproto.Errorpb.ServerIsBusy; import org.tikv.kvproto.Kvrpcpb; import org.tikv.kvproto.Kvrpcpb.Context; import org.tikv.kvproto.TikvGrpc; @@ -63,7 +63,9 @@ public class KVMockServer extends TikvGrpc.TikvImplBase { private TiRegion region; private State state = State.Normal; private final TreeMap dataMap = new TreeMap<>(); - private final Map errorMap = new HashMap<>(); + private final Map> regionErrMap = new HashMap<>(); + + private final Map> keyErrMap = new HashMap<>(); // for KV error public static final int ABORT = 1; @@ -111,21 +113,28 @@ public void put(String key, ByteString data) { put(ByteString.copyFromUtf8(key), data); } - public void putError(String key, int code) { - errorMap.put(ByteString.copyFromUtf8(key), code); + public void putError(String key, Supplier builder) { + regionErrMap.put(toRawKey(key.getBytes(StandardCharsets.UTF_8)), builder); } public void clearAllMap() { dataMap.clear(); - errorMap.clear(); + regionErrMap.clear(); } - private void verifyContext(Context context) throws Exception { - if (context.getRegionId() != region.getId() - || !context.getRegionEpoch().equals(region.getRegionEpoch()) - || !context.getPeer().equals(region.getLeader())) { + private Errorpb.Error verifyContext(Context context) throws Exception { + if (context.getRegionId() != region.getId() || !context.getPeer().equals(region.getLeader())) { throw new Exception("context doesn't match"); } + + Errorpb.Error.Builder errBuilder = Errorpb.Error.newBuilder(); + + if (!context.getRegionEpoch().equals(region.getRegionEpoch())) { + return errBuilder + .setEpochNotMatch(EpochNotMatch.newBuilder().addCurrentRegions(region.getMeta()).build()) + .build(); + } + return null; } @Override @@ -138,18 +147,24 @@ public void rawGet( throw new Exception(State.Fail.toString()); default: } - verifyContext(request.getContext()); - ByteString key = request.getKey(); - + Key key = toRawKey(request.getKey()); Kvrpcpb.RawGetResponse.Builder builder = Kvrpcpb.RawGetResponse.newBuilder(); - Integer errorCode = errorMap.remove(key); - Errorpb.Error.Builder errBuilder = Errorpb.Error.newBuilder(); - if (errorCode != null) { - setErrorInfo(errorCode, errBuilder); - builder.setRegionError(errBuilder.build()); + + Error e = verifyContext(request.getContext()); + if (e != null) { + responseObserver.onNext(builder.setRegionError(e).build()); + responseObserver.onCompleted(); + return; + } + + Supplier errProvider = regionErrMap.get(key); + if (errProvider != null) { + Error.Builder eb = errProvider.get(); + if (eb != null) { + builder.setRegionError(eb.build()); + } } else { - Key rawKey = toRawKey(key); - ByteString value = dataMap.get(rawKey); + ByteString value = dataMap.get(key); if (value == null) { value = ByteString.EMPTY; } @@ -158,25 +173,34 @@ public void rawGet( responseObserver.onNext(builder.build()); responseObserver.onCompleted(); } catch (Exception e) { + logger.error("internal error", e); responseObserver.onError(Status.INTERNAL.asRuntimeException()); } } - /** */ + @Override public void rawPut( org.tikv.kvproto.Kvrpcpb.RawPutRequest request, io.grpc.stub.StreamObserver responseObserver) { try { - verifyContext(request.getContext()); - ByteString key = request.getKey(); - + Key key = toRawKey(request.getKey()); Kvrpcpb.RawPutResponse.Builder builder = Kvrpcpb.RawPutResponse.newBuilder(); - Integer errorCode = errorMap.remove(key); - Errorpb.Error.Builder errBuilder = Errorpb.Error.newBuilder(); - if (errorCode != null) { - setErrorInfo(errorCode, errBuilder); - builder.setRegionError(errBuilder.build()); + + Error e = verifyContext(request.getContext()); + if (e != null) { + responseObserver.onNext(builder.setRegionError(e).build()); + responseObserver.onCompleted(); + return; + } + + Supplier errProvider = regionErrMap.get(key); + if (errProvider != null) { + Error.Builder eb = errProvider.get(); + if (eb != null) { + builder.setRegionError(eb.build()); + } } + responseObserver.onNext(builder.build()); responseObserver.onCompleted(); } catch (Exception e) { @@ -184,40 +208,27 @@ public void rawPut( } } - private void setErrorInfo(int errorCode, Errorpb.Error.Builder errBuilder) { - if (errorCode == NOT_LEADER) { - errBuilder.setNotLeader(Errorpb.NotLeader.getDefaultInstance()); - } else if (errorCode == REGION_NOT_FOUND) { - errBuilder.setRegionNotFound(Errorpb.RegionNotFound.getDefaultInstance()); - } else if (errorCode == KEY_NOT_IN_REGION) { - errBuilder.setKeyNotInRegion(Errorpb.KeyNotInRegion.getDefaultInstance()); - } else if (errorCode == STALE_EPOCH) { - errBuilder.setEpochNotMatch(Errorpb.EpochNotMatch.getDefaultInstance()); - } else if (errorCode == STALE_COMMAND) { - errBuilder.setStaleCommand(Errorpb.StaleCommand.getDefaultInstance()); - } else if (errorCode == SERVER_IS_BUSY) { - errBuilder.setServerIsBusy(Errorpb.ServerIsBusy.getDefaultInstance()); - } else if (errorCode == STORE_NOT_MATCH) { - errBuilder.setStoreNotMatch(Errorpb.StoreNotMatch.getDefaultInstance()); - } else if (errorCode == RAFT_ENTRY_TOO_LARGE) { - errBuilder.setRaftEntryTooLarge(Errorpb.RaftEntryTooLarge.getDefaultInstance()); - } - } - - /** */ + @Override public void rawDelete( org.tikv.kvproto.Kvrpcpb.RawDeleteRequest request, io.grpc.stub.StreamObserver responseObserver) { try { - verifyContext(request.getContext()); - ByteString key = request.getKey(); - + Key key = toRawKey(request.getKey()); Kvrpcpb.RawDeleteResponse.Builder builder = Kvrpcpb.RawDeleteResponse.newBuilder(); - Integer errorCode = errorMap.remove(key); - Errorpb.Error.Builder errBuilder = Errorpb.Error.newBuilder(); - if (errorCode != null) { - setErrorInfo(errorCode, errBuilder); - builder.setRegionError(errBuilder.build()); + + Error e = verifyContext(request.getContext()); + if (e != null) { + responseObserver.onNext(builder.setRegionError(e).build()); + responseObserver.onCompleted(); + return; + } + + Supplier errProvider = regionErrMap.get(key); + if (errProvider != null) { + Error.Builder eb = errProvider.get(); + if (eb != null) { + builder.setRegionError(eb.build()); + } } responseObserver.onNext(builder.build()); responseObserver.onCompleted(); @@ -231,24 +242,24 @@ public void kvGet( org.tikv.kvproto.Kvrpcpb.GetRequest request, io.grpc.stub.StreamObserver responseObserver) { try { - verifyContext(request.getContext()); if (request.getVersion() == 0) { throw new Exception(); } - ByteString key = request.getKey(); - + Key key = toRawKey(request.getKey()); Kvrpcpb.GetResponse.Builder builder = Kvrpcpb.GetResponse.newBuilder(); - Integer errorCode = errorMap.remove(key); - Kvrpcpb.KeyError.Builder errBuilder = Kvrpcpb.KeyError.newBuilder(); - if (errorCode != null) { - if (errorCode == ABORT) { - errBuilder.setAbort("ABORT"); - } else if (errorCode == RETRY) { - errBuilder.setRetryable("Retry"); - } - builder.setError(errBuilder); + + Error e = verifyContext(request.getContext()); + if (e != null) { + responseObserver.onNext(builder.setRegionError(e).build()); + responseObserver.onCompleted(); + return; + } + + Supplier errProvider = keyErrMap.remove(key); + if (errProvider != null) { + builder.setError(errProvider.get().build()); } else { - ByteString value = dataMap.get(toRawKey(key)); + ByteString value = dataMap.get(key); builder.setValue(value); } responseObserver.onNext(builder.build()); @@ -263,23 +274,27 @@ public void kvScan( org.tikv.kvproto.Kvrpcpb.ScanRequest request, io.grpc.stub.StreamObserver responseObserver) { try { - verifyContext(request.getContext()); if (request.getVersion() == 0) { throw new Exception(); } - ByteString key = request.getStartKey(); - + Key key = toRawKey(request.getStartKey()); Kvrpcpb.ScanResponse.Builder builder = Kvrpcpb.ScanResponse.newBuilder(); - Error.Builder errBuilder = Error.newBuilder(); - Integer errorCode = errorMap.remove(key); - if (errorCode != null) { - if (errorCode == ABORT) { - errBuilder.setServerIsBusy(Errorpb.ServerIsBusy.getDefaultInstance()); + + Error e = verifyContext(request.getContext()); + if (e != null) { + responseObserver.onNext(builder.setRegionError(e).build()); + responseObserver.onCompleted(); + return; + } + + Supplier errProvider = regionErrMap.get(key); + if (errProvider != null) { + Error.Builder eb = errProvider.get(); + if (eb != null) { + builder.setRegionError(eb.build()); } - builder.setRegionError(errBuilder.build()); } else { - ByteString startKey = request.getStartKey(); - SortedMap kvs = dataMap.tailMap(toRawKey(startKey)); + SortedMap kvs = dataMap.tailMap(key); builder.addAllPairs( kvs.entrySet() .stream() @@ -303,27 +318,33 @@ public void kvBatchGet( org.tikv.kvproto.Kvrpcpb.BatchGetRequest request, io.grpc.stub.StreamObserver responseObserver) { try { - verifyContext(request.getContext()); if (request.getVersion() == 0) { throw new Exception(); } List keys = request.getKeysList(); Kvrpcpb.BatchGetResponse.Builder builder = Kvrpcpb.BatchGetResponse.newBuilder(); - Error.Builder errBuilder = Error.newBuilder(); + Error e = verifyContext(request.getContext()); + if (e != null) { + responseObserver.onNext(builder.setRegionError(e).build()); + responseObserver.onCompleted(); + return; + } + ImmutableList.Builder resultList = ImmutableList.builder(); for (ByteString key : keys) { - Integer errorCode = errorMap.remove(key); - if (errorCode != null) { - if (errorCode == ABORT) { - errBuilder.setServerIsBusy(Errorpb.ServerIsBusy.getDefaultInstance()); + Key rawKey = toRawKey(key); + Supplier errProvider = regionErrMap.get(rawKey); + if (errProvider != null) { + Error.Builder eb = errProvider.get(); + if (eb != null) { + builder.setRegionError(eb.build()); + break; } - builder.setRegionError(errBuilder.build()); - break; - } else { - ByteString value = dataMap.get(toRawKey(key)); - resultList.add(Kvrpcpb.KvPair.newBuilder().setKey(key).setValue(value).build()); } + + ByteString value = dataMap.get(rawKey); + resultList.add(Kvrpcpb.KvPair.newBuilder().setKey(key).setValue(value).build()); } builder.addAllPairs(resultList.build()); responseObserver.onNext(builder.build()); @@ -338,8 +359,6 @@ public void coprocessor( org.tikv.kvproto.Coprocessor.Request requestWrap, io.grpc.stub.StreamObserver responseObserver) { try { - verifyContext(requestWrap.getContext()); - DAGRequest request = DAGRequest.parseFrom(requestWrap.getData()); if (request.getStartTsFallback() == 0) { throw new Exception(); @@ -348,33 +367,33 @@ public void coprocessor( List keyRanges = requestWrap.getRangesList(); Coprocessor.Response.Builder builderWrap = Coprocessor.Response.newBuilder(); - SelectResponse.Builder builder = SelectResponse.newBuilder(); - org.tikv.kvproto.Errorpb.Error.Builder errBuilder = - org.tikv.kvproto.Errorpb.Error.newBuilder(); + Error e = verifyContext(requestWrap.getContext()); + if (e != null) { + responseObserver.onNext(builderWrap.setRegionError(e).build()); + responseObserver.onCompleted(); + return; + } + SelectResponse.Builder builder = SelectResponse.newBuilder(); for (Coprocessor.KeyRange keyRange : keyRanges) { - Integer errorCode = errorMap.remove(keyRange.getStart()); - if (errorCode != null) { - if (STALE_EPOCH == errorCode) { - errBuilder.setEpochNotMatch(EpochNotMatch.getDefaultInstance()); - } else if (NOT_LEADER == errorCode) { - errBuilder.setNotLeader(NotLeader.getDefaultInstance()); - } else { - errBuilder.setServerIsBusy(ServerIsBusy.getDefaultInstance()); + Key startKey = toRawKey(keyRange.getStart()); + Supplier errProvider = regionErrMap.get(startKey); + if (errProvider != null) { + Error.Builder eb = errProvider.get(); + if (eb != null) { + builderWrap.setRegionError(eb.build()); + break; } - builderWrap.setRegionError(errBuilder.build()); - break; - } else { - ByteString startKey = keyRange.getStart(); - SortedMap kvs = dataMap.tailMap(toRawKey(startKey)); - builder.addAllChunks( - kvs.entrySet() - .stream() - .filter(Objects::nonNull) - .filter(kv -> kv.getKey().compareTo(toRawKey(keyRange.getEnd())) <= 0) - .map(kv -> Chunk.newBuilder().setRowsData(kv.getValue()).build()) - .collect(Collectors.toList())); } + + SortedMap kvs = dataMap.tailMap(startKey); + builder.addAllChunks( + kvs.entrySet() + .stream() + .filter(Objects::nonNull) + .filter(kv -> kv.getKey().compareTo(toRawKey(keyRange.getEnd())) <= 0) + .map(kv -> Chunk.newBuilder().setRowsData(kv.getValue()).build()) + .collect(Collectors.toList())); } responseObserver.onNext(builderWrap.setData(builder.build().toByteString()).build()); diff --git a/src/test/java/org/tikv/common/PDClientV2MockTest.java b/src/test/java/org/tikv/common/PDClientV2MockTest.java new file mode 100644 index 00000000000..1a5278cb6ef --- /dev/null +++ b/src/test/java/org/tikv/common/PDClientV2MockTest.java @@ -0,0 +1,111 @@ +/* + * Copyright 2017 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common; + +import com.google.protobuf.ByteString; +import java.util.List; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.tikv.common.codec.Codec.BytesCodec; +import org.tikv.common.codec.CodecDataOutput; +import org.tikv.common.util.ConcreteBackOffer; +import org.tikv.common.util.Pair; +import org.tikv.kvproto.Metapb; +import org.tikv.kvproto.Pdpb; +import org.tikv.kvproto.Pdpb.GetRegionResponse; +import org.tikv.kvproto.Pdpb.Region; +import org.tikv.kvproto.Pdpb.ScanRegionsResponse; + +public class PDClientV2MockTest extends PDMockServerTest { + @Before + public void init() throws Exception { + upgradeToV2Cluster(); + } + + private PDClient createClient() { + return session.getPDClient(); + } + + public static ByteString encode(ByteString key) { + CodecDataOutput cdo = new CodecDataOutput(); + BytesCodec.writeBytes(cdo, key.toByteArray()); + return cdo.toByteString(); + } + + private GetRegionResponse makeGetRegionResponse(String start, String end) { + return GrpcUtils.makeGetRegionResponse(leader.getClusterId(), makeRegion(start, end)); + } + + private Metapb.Region makeRegion(String start, String end) { + Pair range = + session + .getPDClient() + .getCodec() + .encodePdQueryRange(ByteString.copyFromUtf8(start), ByteString.copyFromUtf8(end)); + return GrpcUtils.makeRegion( + 1, + range.first, + range.second, + GrpcUtils.makeRegionEpoch(2, 3), + GrpcUtils.makePeer(1, 10), + GrpcUtils.makePeer(2, 20)); + } + + @Test + public void testGetRegionById() throws Exception { + String start = "getRegionById"; + String end = "getRegionByIdEnd"; + leader.addGetRegionByIDListener(request -> makeGetRegionResponse(start, end)); + try (PDClient client = createClient()) { + Metapb.Region r = client.getRegionByID(ConcreteBackOffer.newRawKVBackOff(), 1).first; + Assert.assertEquals(start, r.getStartKey().toStringUtf8()); + Assert.assertEquals(end, r.getEndKey().toStringUtf8()); + } + + leader.addGetRegionByIDListener(request -> makeGetRegionResponse(start, "")); + try (PDClient client = createClient()) { + Metapb.Region r = client.getRegionByID(ConcreteBackOffer.newRawKVBackOff(), 1).first; + Assert.assertEquals(start, r.getStartKey().toStringUtf8()); + Assert.assertEquals("", r.getEndKey().toStringUtf8()); + } + } + + @Test + public void testScanRegions() throws Exception { + String start = "scanRegions"; + String end = "scanRegionsEnd"; + + leader.addScanRegionsListener( + request -> + ScanRegionsResponse.newBuilder() + .addRegions(Pdpb.Region.newBuilder().setRegion(makeRegion(start, end)).build()) + .build()); + + try (PDClient client = createClient()) { + List regions = + client.scanRegions( + ConcreteBackOffer.newRawKVBackOff(), ByteString.EMPTY, ByteString.EMPTY, 1); + + for (Region r : regions) { + Assert.assertEquals(start, r.getRegion().getStartKey().toStringUtf8()); + Assert.assertEquals(end, r.getRegion().getEndKey().toStringUtf8()); + } + } + } +} diff --git a/src/test/java/org/tikv/common/PDMockServer.java b/src/test/java/org/tikv/common/PDMockServer.java index 7980e0703f3..c081eb382c0 100644 --- a/src/test/java/org/tikv/common/PDMockServer.java +++ b/src/test/java/org/tikv/common/PDMockServer.java @@ -37,6 +37,8 @@ import org.tikv.kvproto.Pdpb.GetRegionResponse; import org.tikv.kvproto.Pdpb.GetStoreRequest; import org.tikv.kvproto.Pdpb.GetStoreResponse; +import org.tikv.kvproto.Pdpb.ScanRegionsRequest; +import org.tikv.kvproto.Pdpb.ScanRegionsResponse; import org.tikv.kvproto.Pdpb.TsoRequest; import org.tikv.kvproto.Pdpb.TsoResponse; @@ -50,6 +52,8 @@ public class PDMockServer extends PDGrpc.PDImplBase { private Function getRegionListener; private Function getRegionByIDListener; + private Function scanRegionsListener; + public void addGetMembersListener(Function func) { getMembersListener = func; } @@ -126,6 +130,20 @@ public void getStore(GetStoreRequest request, StreamObserver r } } + public void addScanRegionsListener(Function func) { + scanRegionsListener = func; + } + + @Override + public void scanRegions(ScanRegionsRequest request, StreamObserver resp) { + try { + resp.onNext(Optional.ofNullable(scanRegionsListener.apply(request)).get()); + resp.onCompleted(); + } catch (Exception e) { + resp.onError(Status.INTERNAL.asRuntimeException()); + } + } + public void start(long clusterId) throws IOException { int port; try (ServerSocket s = new ServerSocket(0)) { diff --git a/src/test/java/org/tikv/common/PDMockServerTest.java b/src/test/java/org/tikv/common/PDMockServerTest.java index 81821885b06..08018d0d495 100644 --- a/src/test/java/org/tikv/common/PDMockServerTest.java +++ b/src/test/java/org/tikv/common/PDMockServerTest.java @@ -22,6 +22,7 @@ import java.util.List; import org.junit.After; import org.junit.Before; +import org.tikv.common.TiConfiguration.ApiVersion; public abstract class PDMockServerTest { protected static final String LOCAL_ADDR = "127.0.0.1"; @@ -35,6 +36,21 @@ public void setup() throws IOException { setup(LOCAL_ADDR); } + void upgradeToV2Cluster() throws Exception { + if (session == null) { + throw new IllegalStateException("Cluster is not initialized"); + } + + if (session.getConf().getApiVersion().isV2()) { + return; + } + + TiConfiguration conf = session.getConf().setApiVersion(ApiVersion.V2); + session.close(); + + session = TiSession.create(conf); + } + void setup(String addr) throws IOException { int[] ports = new int[3]; for (int i = 0; i < ports.length; i++) { @@ -62,6 +78,7 @@ void setup(String addr) throws IOException { conf.setWarmUpEnable(false); conf.setTimeout(2000); conf.setEnableGrpcForward(true); + session = TiSession.create(conf); } diff --git a/src/test/java/org/tikv/common/RegionErrorTest.java b/src/test/java/org/tikv/common/RegionErrorTest.java new file mode 100644 index 00000000000..5341cbff608 --- /dev/null +++ b/src/test/java/org/tikv/common/RegionErrorTest.java @@ -0,0 +1,84 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common; + +import com.google.protobuf.ByteString; +import java.util.Optional; +import java.util.stream.Collectors; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.tikv.common.region.TiRegion; +import org.tikv.common.region.TiStore; +import org.tikv.kvproto.Metapb; +import org.tikv.raw.RawKVClient; + +public class RegionErrorTest extends MockThreeStoresTest { + @Before + public void init() throws Exception { + upgradeToV2Cluster(); + } + + private RawKVClient createClient() { + return session.createRawClient(); + } + + @Test + public void testOnEpochNotMatch() { + try (RawKVClient client = createClient()) { + // Construct a key that is less than the prefix of RAW API v2; + ByteString key = ByteString.copyFromUtf8("key-test-epoch-not-match"); + ByteString value = ByteString.copyFromUtf8("value"); + + ByteString requestKey = client.getSession().getPDClient().getCodec().encodeKey(key); + put(requestKey, value); + + Assert.assertEquals(Optional.of(value), client.get(key)); + + Metapb.Region newMeta = + Metapb.Region.newBuilder() + .mergeFrom(this.region.getMeta()) + .setRegionEpoch(Metapb.RegionEpoch.newBuilder().setConfVer(2).setVersion(3)) + .setStartKey(PDClientV2MockTest.encode(requestKey)) + .setEndKey(PDClientV2MockTest.encode(requestKey.concat(ByteString.copyFromUtf8("0")))) + .build(); + + // Increase the region epoch for the cluster, + // this will cause the cluster return an EpochNotMatch region error. + TiRegion newRegion = + new TiRegion( + this.region.getConf(), + newMeta, + this.region.getLeader(), + this.region.getPeersList(), + stores.stream().map(TiStore::new).collect(Collectors.toList())); + + // Update the region of each server + for (KVMockServer server : servers) { + server.setRegion(newRegion); + } + + // Forbid the client get region from PD leader. + leader.addGetRegionListener(request -> null); + + // The get should success since the region cache + // will be updated the currentRegions of `EpochNotMatch` error. + Assert.assertEquals(Optional.of(value), client.get(key)); + } + } +} diff --git a/src/test/java/org/tikv/common/RegionStoreClientTest.java b/src/test/java/org/tikv/common/RegionStoreClientTest.java index e3aff1243e1..1a03ad80e26 100644 --- a/src/test/java/org/tikv/common/RegionStoreClientTest.java +++ b/src/test/java/org/tikv/common/RegionStoreClientTest.java @@ -32,6 +32,10 @@ import org.tikv.common.region.TiStore; import org.tikv.common.util.BackOffer; import org.tikv.common.util.ConcreteBackOffer; +import org.tikv.kvproto.Errorpb; +import org.tikv.kvproto.Errorpb.EpochNotMatch; +import org.tikv.kvproto.Errorpb.NotLeader; +import org.tikv.kvproto.Errorpb.ServerIsBusy; import org.tikv.kvproto.Kvrpcpb; import org.tikv.kvproto.Metapb; @@ -75,12 +79,15 @@ public void doRawGetTest(RegionStoreClient client) { Optional value = client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("key1")); assertEquals(ByteString.copyFromUtf8("value1"), value.get()); - server.putError("error1", KVMockServer.NOT_LEADER); + server.putError( + "error1", () -> Errorpb.Error.newBuilder().setNotLeader(NotLeader.getDefaultInstance())); // since not_leader is retryable, so the result should be correct. value = client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("key1")); assertEquals(ByteString.copyFromUtf8("value1"), value.get()); - server.putError("failure", KVMockServer.STALE_EPOCH); + server.putError( + "failure", + () -> Errorpb.Error.newBuilder().setEpochNotMatch(EpochNotMatch.getDefaultInstance())); try { // since stale epoch is not retryable, so the test should fail. client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("failure")); @@ -102,7 +109,9 @@ public void doGetTest(RegionStoreClient client) { ByteString value = client.get(defaultBackOff(), ByteString.copyFromUtf8("key1"), 1); assertEquals(ByteString.copyFromUtf8("value1"), value); - server.putError("error1", KVMockServer.ABORT); + server.putError( + "error1", + () -> Errorpb.Error.newBuilder().setServerIsBusy(ServerIsBusy.getDefaultInstance())); try { client.get(defaultBackOff(), ByteString.copyFromUtf8("error1"), 1); fail(); @@ -134,7 +143,9 @@ public void doBatchGetTest(RegionStoreClient client) { assertEquals( kv.getKey().toStringUtf8().replace("key", "value"), kv.getValue().toStringUtf8())); - server.putError("error1", KVMockServer.ABORT); + server.putError( + "error1", + () -> Errorpb.Error.newBuilder().setServerIsBusy(ServerIsBusy.getDefaultInstance())); try { client.batchGet( defaultBackOff(), @@ -165,7 +176,9 @@ public void doScanTest(RegionStoreClient client) { assertEquals( kv.getKey().toStringUtf8().replace("key", "value"), kv.getValue().toStringUtf8())); - server.putError("error1", KVMockServer.ABORT); + server.putError( + "error1", + () -> Errorpb.Error.newBuilder().setServerIsBusy(ServerIsBusy.getDefaultInstance())); try { client.scan(defaultBackOff(), ByteString.copyFromUtf8("error1"), 1); fail(); diff --git a/src/test/java/org/tikv/common/StoreConfig.java b/src/test/java/org/tikv/common/StoreConfig.java new file mode 100644 index 00000000000..48acb7880d9 --- /dev/null +++ b/src/test/java/org/tikv/common/StoreConfig.java @@ -0,0 +1,72 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import java.util.List; +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.tikv.common.TiConfiguration.ApiVersion; +import org.tikv.common.util.BackOffer; +import org.tikv.common.util.ConcreteBackOffer; +import org.tikv.kvproto.Metapb.Store; + +public class StoreConfig { + private static final Logger logger = LoggerFactory.getLogger(StoreConfig.class); + + private static JsonObject getConfig(PDClient client) { + BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF); + List stores = client.getAllStores(backOffer); + if (stores.isEmpty()) { + throw new IllegalStateException("No store found"); + } + + Store store = stores.get(0); + String statusAddr = store.getStatusAddress(); + String api = "http://" + statusAddr + "/config"; + try (CloseableHttpClient httpClient = HttpClients.createDefault()) { + HttpGet request = new HttpGet(api); + try (CloseableHttpResponse response = httpClient.execute(request)) { + HttpEntity entity = response.getEntity(); + String content = EntityUtils.toString(entity); + return new Gson().fromJson(content, JsonObject.class); + } + } catch (Exception e) { + logger.error("Failed to get store api version: ", e); + throw new IllegalStateException(e); + } + } + + public static ApiVersion acquireApiVersion(PDClient client) { + JsonElement version = getConfig(client).get("storage").getAsJsonObject().get("api-version"); + return version == null ? ApiVersion.V1 : ApiVersion.fromInt(version.getAsInt()); + } + + public static boolean ifTllEnable(PDClient client) { + JsonElement ttlEnabled = getConfig(client).get("storage").getAsJsonObject().get("enable-ttl"); + return ttlEnabled != null && ttlEnabled.getAsBoolean(); + } +} diff --git a/src/test/java/org/tikv/common/TiConfigurationTest.java b/src/test/java/org/tikv/common/TiConfigurationTest.java index a6d19c7a482..368aaea1e6c 100644 --- a/src/test/java/org/tikv/common/TiConfigurationTest.java +++ b/src/test/java/org/tikv/common/TiConfigurationTest.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.tikv.common.ConfigUtils.TIKV_GRPC_HEALTH_CHECK_TIMEOUT; import static org.tikv.common.ConfigUtils.TIKV_HEALTH_CHECK_PERIOD_DURATION; @@ -27,6 +28,8 @@ import java.io.ObjectOutputStream; import org.junit.Assert; import org.junit.Test; +import org.tikv.common.TiConfiguration.ApiVersion; +import org.tikv.kvproto.Kvrpcpb.APIVersion; public class TiConfigurationTest { @@ -112,4 +115,17 @@ public void serializeTest() throws IOException { oos.flush(); } } + + @Test + public void testApiVersion() { + TiConfiguration conf = TiConfiguration.createDefault(); + assertTrue(conf.getApiVersion().isV1()); + assertTrue(conf.setApiVersion(ApiVersion.V2).getApiVersion().isV2()); + } + + @Test + public void testApiVersionToPb() { + assertEquals(APIVersion.V1, ApiVersion.V1.toPb()); + assertEquals(APIVersion.V2, ApiVersion.V2.toPb()); + } } diff --git a/src/test/java/org/tikv/common/apiversion/RequestKeyCodecTest.java b/src/test/java/org/tikv/common/apiversion/RequestKeyCodecTest.java new file mode 100644 index 00000000000..24cbf3012a2 --- /dev/null +++ b/src/test/java/org/tikv/common/apiversion/RequestKeyCodecTest.java @@ -0,0 +1,180 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common.apiversion; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; +import java.util.stream.Collectors; +import org.junit.Test; +import org.tikv.common.util.Pair; +import org.tikv.kvproto.Metapb.Region; +import org.tikv.kvproto.Pdpb; + +public class RequestKeyCodecTest { + @Test + public void testV1RawCodec() { + RequestKeyCodec v1 = new RequestKeyV1RawCodec(); + ByteString key = ByteString.copyFromUtf8("testV1RawCodec"); + + assertEquals(key, v1.encodeKey(key)); + assertEquals(key, v1.decodeKey(v1.encodeKey(key))); + + assertEquals(key, v1.encodePdQuery(key)); + + ByteString start = ByteString.copyFromUtf8("testV1RawCodec_start"); + ByteString end = ByteString.copyFromUtf8("testV1RawCodec_end"); + Pair range = v1.encodeRange(start, end); + assertEquals(start, range.first); + assertEquals(end, range.second); + + range = v1.encodePdQueryRange(start, end); + assertEquals(start, range.first); + assertEquals(end, range.second); + + Region region = Region.newBuilder().setStartKey(start).setEndKey(end).build(); + assertEquals(region, v1.decodeRegion(region)); + + assertEquals( + ImmutableList.of(region), + v1.decodePdRegions(ImmutableList.of(Pdpb.Region.newBuilder().setRegion(region).build())) + .stream() + .map(Pdpb.Region::getRegion) + .collect(Collectors.toList())); + } + + @Test + public void testV1TxnCodec() { + RequestKeyCodec v1 = new RequestKeyV1TxnCodec(); + + ByteString key = ByteString.copyFromUtf8("testV1TxnCodec"); + + assertEquals(CodecUtils.encode(key), v1.encodePdQuery(key)); + + ByteString start = ByteString.copyFromUtf8("testV1TxnCodec_start"); + ByteString end = ByteString.copyFromUtf8("testV1TxnCodec_end"); + + // Test start and end are both non-empty. + Pair range = v1.encodePdQueryRange(start, end); + assertEquals(CodecUtils.encode(start), range.first); + assertEquals(CodecUtils.encode(end), range.second); + + Region region = + Region.newBuilder() + .setStartKey(CodecUtils.encode(start)) + .setEndKey(CodecUtils.encode(end)) + .build(); + Region decoded = v1.decodeRegion(region); + assertEquals(start, decoded.getStartKey()); + assertEquals(end, decoded.getEndKey()); + + // Test start is empty. + start = ByteString.EMPTY; + region = + Region.newBuilder() + .setStartKey(CodecUtils.encode(start)) + .setEndKey(CodecUtils.encode(end)) + .build(); + decoded = v1.decodeRegion(region); + assertEquals(start, decoded.getStartKey()); + assertEquals(end, decoded.getEndKey()); + + range = v1.encodePdQueryRange(start, end); + assertEquals(ByteString.EMPTY, range.first); + assertEquals(CodecUtils.encode(end), range.second); + + // Test end is empty. + end = ByteString.EMPTY; + region = + Region.newBuilder() + .setStartKey(CodecUtils.encode(start)) + .setEndKey(CodecUtils.encode(end)) + .build(); + decoded = v1.decodeRegion(region); + assertEquals(start, decoded.getStartKey()); + assertEquals(ByteString.EMPTY, decoded.getEndKey()); + + range = v1.encodePdQueryRange(start, end); + assertEquals(start, range.first); + assertEquals(ByteString.EMPTY, range.second); + } + + @Test + public void testV2Codec() { + testV2Codec(new RequestKeyV2RawCodec()); + testV2Codec(new RequestKeyV2TxnCodec()); + } + + void testV2Codec(RequestKeyV2Codec v2) { + ByteString key = ByteString.copyFromUtf8("testV2RawCodec"); + + assertEquals(key, v2.decodeKey(v2.encodeKey(key))); + assertEquals(CodecUtils.encode(v2.encodeKey(key)), v2.encodePdQuery(key)); + + ByteString start = ByteString.copyFromUtf8("testV1TxnCodec_start"); + ByteString end = ByteString.copyFromUtf8("testV1TxnCodec_end"); + + // Test start and end are both non-empty. + Pair range = v2.encodePdQueryRange(start, end); + assertEquals(CodecUtils.encode(v2.encodeKey(start)), range.first); + assertEquals(CodecUtils.encode(v2.encodeKey(end)), range.second); + + Region region = + Region.newBuilder() + .setStartKey(CodecUtils.encode(v2.encodeKey(start))) + .setEndKey(CodecUtils.encode(v2.encodeKey(end))) + .build(); + Region decoded = v2.decodeRegion(region); + assertEquals(start, decoded.getStartKey()); + assertEquals(end, decoded.getEndKey()); + + // Test start is empty. + start = ByteString.EMPTY; + region = + Region.newBuilder() + .setStartKey(CodecUtils.encode(v2.encodeKey(start))) + .setEndKey(CodecUtils.encode(v2.encodeKey(end))) + .build(); + decoded = v2.decodeRegion(region); + assertEquals(start, decoded.getStartKey()); + assertEquals(end, decoded.getEndKey()); + + range = v2.encodePdQueryRange(start, end); + assertEquals(CodecUtils.encode(v2.encodeKey(start)), range.first); + assertEquals(CodecUtils.encode(v2.encodeKey(end)), range.second); + + // Test end is empty. + end = ByteString.EMPTY; + range = v2.encodeRange(start, end); + assertEquals(v2.encodeKey(start), range.first); + assertArrayEquals( + new byte[] {(byte) (v2.encodeKey(ByteString.EMPTY).byteAt(0) + 1)}, + range.second.toByteArray()); + + region = + Region.newBuilder() + .setStartKey(CodecUtils.encode(range.first)) + .setEndKey(CodecUtils.encode(range.second)) + .build(); + decoded = v2.decodeRegion(region); + assertEquals(start, decoded.getStartKey()); + assertEquals(ByteString.EMPTY, decoded.getEndKey()); + } +} diff --git a/src/test/java/org/tikv/common/importer/RawKVIngestTest.java b/src/test/java/org/tikv/common/importer/RawKVIngestTest.java index a8e5f94cf08..e25567dd32f 100644 --- a/src/test/java/org/tikv/common/importer/RawKVIngestTest.java +++ b/src/test/java/org/tikv/common/importer/RawKVIngestTest.java @@ -38,7 +38,6 @@ import org.tikv.util.TestUtils; public class RawKVIngestTest extends BaseRawKVTest { - private TiSession session; private static final int KEY_NUMBER = 16; diff --git a/src/test/java/org/tikv/raw/RawKVClientTest.java b/src/test/java/org/tikv/raw/RawKVClientTest.java index 98e4c0d620a..ee20df3ee55 100644 --- a/src/test/java/org/tikv/raw/RawKVClientTest.java +++ b/src/test/java/org/tikv/raw/RawKVClientTest.java @@ -50,6 +50,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.BaseRawKVTest; +import org.tikv.common.StoreConfig; import org.tikv.common.TiConfiguration; import org.tikv.common.TiSession; import org.tikv.common.codec.KeyUtils; @@ -326,7 +327,7 @@ public void batchPutTest() { @Test public void deleteRangeTest() { - client.deleteRange(ByteString.EMPTY, ByteString.EMPTY); + checkDeleteRange(ByteString.EMPTY, ByteString.EMPTY); } @Test @@ -535,7 +536,6 @@ private void baseTest( rawDeleteTest(deleteCases, benchmark); } - // TODO: check whether cluster supports ttl long ttl = 10; rawTTLTest(10, ttl, benchmark); @@ -900,7 +900,7 @@ private void rawDeleteRangeTest(boolean benchmark) { } public void rawTTLTest(int cases, long ttl, boolean benchmark) { - Assume.assumeTrue(tikvVersionNewerThan("v5.0.0")); + Assume.assumeTrue(StoreConfig.ifTllEnable(session.getPDClient())); logger.info("ttl testing"); if (benchmark) { for (int i = 0; i < cases; i++) {