Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
1c97d49
api-v2: enable api-v2
iosmanthus Mar 24, 2022
0d485f2
Merge branch 'master' of github.com:tikv/client-java into api-v2
iosmanthus Mar 25, 2022
f45d583
attach api version to context and fix lock resolver api version
iosmanthus Mar 25, 2022
8ff27f2
./dev/javafmt
iosmanthus Mar 28, 2022
7bb666b
fix api v2 config test
iosmanthus Mar 28, 2022
790ffd2
add topb test
iosmanthus Mar 28, 2022
c5e502e
fix ingest interface for rawkv
iosmanthus Apr 6, 2022
8a11a88
fix ingest interface for rawkv
iosmanthus Apr 12, 2022
ec2d127
Merge branch 'master' of github.com:tikv/client-java into api-v2
iosmanthus Apr 15, 2022
9b0363c
Merge branch 'master' of github.com:tikv/client-java into api-v2
iosmanthus Apr 19, 2022
f9f4cfe
add api version test
iosmanthus Apr 19, 2022
e5ff7e0
add license header for new files
iosmanthus Apr 19, 2022
3e63dc4
fix ApiVersionTest for older version of TiKV
iosmanthus Apr 20, 2022
e44dee7
Merge branch 'api-v2' of github.com:iosmanthus/client-java; branch 'm…
iosmanthus May 12, 2022
04fa138
fix kvproto wrong commit
iosmanthus May 17, 2022
66d07c4
wip: add mock test for error handler for EpochNotMatch region error i…
iosmanthus May 23, 2022
153ec6f
Merge branch 'master' of github.com:tikv/client-java into api-v2
iosmanthus May 24, 2022
6771d13
add mock test for EpochNotMatch in API v2
iosmanthus May 24, 2022
b7c54f8
fix license header
iosmanthus May 24, 2022
72eb371
add some comments for epoch not match mock test
iosmanthus May 24, 2022
8443588
git fire!: add tests for scanRegions
iosmanthus May 25, 2022
62879c6
add tests for scanRegions
iosmanthus May 26, 2022
edbb9f5
add mock test for ApiV2 PD client
iosmanthus May 26, 2022
8b22d77
revert src/main/java/org/tikv/common/operation/iterator/RawScanIterat…
iosmanthus May 26, 2022
34a81a0
refactor: introduce RequestKeyCodec to reduce if
iosmanthus May 30, 2022
a72a6cf
./dev/javafmt
iosmanthus May 30, 2022
de87cbb
remove codec in TiConfiguration
iosmanthus May 30, 2022
6d43080
remove decodeRegion in PDClient
iosmanthus May 30, 2022
ef71788
remove extra ;
iosmanthus May 30, 2022
d252888
Merge branch 'api-v2' of github.com:iosmanthus/client-java into api-v2
iosmanthus May 30, 2022
fec799c
add batch encode method for RequestKeyCodec
iosmanthus May 31, 2022
0a64f84
print stack trace for unstable tests
iosmanthus May 31, 2022
cedc40d
fix wrong base class for PDClientV2MockTest
iosmanthus May 31, 2022
afd5858
wip: add tests for RequestKeyCodec
iosmanthus May 31, 2022
7c02154
./dev/javafmt
iosmanthus May 31, 2022
0e0bd46
complete RequestKeyCodecTest
iosmanthus Jun 1, 2022
1db88aa
remove if in scanRegions
iosmanthus Jun 1, 2022
629d4fa
change Pair type to org.tikv.common.util.Pair
iosmanthus Jun 2, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dev/proto.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ if [ -d $proto_dir ]; then
fi

repos=("https://github.com/pingcap/kvproto" "https://github.com/pingcap/raft-rs" "https://github.com/pingcap/tipb")
commits=(58f2ac94aa38f49676dd628fbcc1d669a77a62ac b9891b673573fad77ebcf9bbe0969cf945841926 c4d518eb1d60c21f05b028b36729e64610346dac)
commits=(3056ca36e6f2a71a9fc7ba7135e6b119fd977553 b9891b673573fad77ebcf9bbe0969cf945841926 c4d518eb1d60c21f05b028b36729e64610346dac)

for i in "${!repos[@]}"; do
repo_name=$(basename ${repos[$i]})
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/org/tikv/common/ConfigUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ public class ConfigUtils {

public static final String TIFLASH_ENABLE = "tiflash.enable";
public static final String TIKV_WARM_UP_ENABLE = "tikv.warm_up.enable";

public static final String TIKV_API_VERSION = "tikv.api_version";

public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379";
public static final String DEF_TIMEOUT = "200ms";
public static final String DEF_TIKV_GRPC_INGEST_TIMEOUT = "200s";
Expand Down Expand Up @@ -204,4 +207,6 @@ public class ConfigUtils {
public static final int DEF_TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT = 10;

public static final int DEF_TIKV_SCAN_REGIONS_LIMIT = 1000;

public static final int DEF_TIKV_API_VERSION = 1;
}
94 changes: 29 additions & 65 deletions src/main/java/org/tikv/common/PDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@
import org.apache.http.impl.client.HttpClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.codec.Codec.BytesCodec;
import org.tikv.common.codec.CodecDataInput;
import org.tikv.common.codec.CodecDataOutput;
import org.tikv.common.apiversion.RequestKeyCodec;
import org.tikv.common.codec.KeyUtils;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.TiClientInternalException;
Expand Down Expand Up @@ -110,7 +108,9 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDFutureStub>
private static final long MIN_TRY_UPDATE_DURATION = 50;
private static final int PAUSE_CHECKER_TIMEOUT = 300; // in seconds
private static final int KEEP_CHECKER_PAUSE_PERIOD = PAUSE_CHECKER_TIMEOUT / 5; // in seconds
private final Logger logger = LoggerFactory.getLogger(PDClient.class);
private static final Logger logger = LoggerFactory.getLogger(PDClient.class);

private final RequestKeyCodec codec;
private RequestHeader header;
private TsoRequest tsoReq;
private volatile PDClientWrapper pdClientWrapper;
Expand All @@ -130,19 +130,22 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDFutureStub>
.labelNames("cluster")
.register();

private PDClient(TiConfiguration conf, ChannelFactory channelFactory) {
private PDClient(TiConfiguration conf, RequestKeyCodec codec, ChannelFactory channelFactory) {
super(conf, channelFactory);
initCluster();
this.codec = codec;
this.blockingStub = getBlockingStub();
this.asyncStub = getAsyncStub();
}

public static ReadOnlyPDClient create(TiConfiguration conf, ChannelFactory channelFactory) {
return createRaw(conf, channelFactory);
public static ReadOnlyPDClient create(
TiConfiguration conf, RequestKeyCodec codec, ChannelFactory channelFactory) {
return createRaw(conf, codec, channelFactory);
}

static PDClient createRaw(TiConfiguration conf, ChannelFactory channelFactory) {
return new PDClient(conf, channelFactory);
static PDClient createRaw(
TiConfiguration conf, RequestKeyCodec codec, ChannelFactory channelFactory) {
return new PDClient(conf, codec, channelFactory);
}

public HostMapping getHostMapping() {
Expand Down Expand Up @@ -313,22 +316,19 @@ public Pair<Metapb.Region, Metapb.Peer> getRegionByKey(BackOffer backOffer, Byte
Histogram.Timer requestTimer =
PD_GET_REGION_BY_KEY_REQUEST_LATENCY.labels(getClusterId().toString()).startTimer();
try {
if (conf.isTxnKVMode()) {
CodecDataOutput cdo = new CodecDataOutput();
BytesCodec.writeBytes(cdo, key.toByteArray());
key = cdo.toByteString();
}
ByteString queryKey = key;

Supplier<GetRegionRequest> request =
() -> GetRegionRequest.newBuilder().setHeader(header).setRegionKey(queryKey).build();
() ->
GetRegionRequest.newBuilder()
.setHeader(header)
.setRegionKey(codec.encodePdQuery(key))
.build();

PDErrorHandler<GetRegionResponse> handler =
new PDErrorHandler<>(getRegionResponseErrorExtractor, this);

GetRegionResponse resp =
callWithRetry(backOffer, PDGrpc.getGetRegionMethod(), request, handler);
return new Pair<Metapb.Region, Metapb.Peer>(decodeRegion(resp.getRegion()), resp.getLeader());
return new Pair<>(codec.decodeRegion(resp.getRegion()), resp.getLeader());
} finally {
requestTimer.observeDuration();
}
Expand All @@ -343,7 +343,8 @@ public Pair<Metapb.Region, Metapb.Peer> getRegionByID(BackOffer backOffer, long

GetRegionResponse resp =
callWithRetry(backOffer, PDGrpc.getGetRegionByIDMethod(), request, handler);
return new Pair<Metapb.Region, Metapb.Peer>(decodeRegion(resp.getRegion()), resp.getLeader());
return new Pair<Metapb.Region, Metapb.Peer>(
codec.decodeRegion(resp.getRegion()), resp.getLeader());
}

@Override
Expand All @@ -353,18 +354,20 @@ public List<Pdpb.Region> scanRegions(
// introduce a warm-up timeout for ScanRegions requests
PDGrpc.PDBlockingStub stub =
getBlockingStub().withDeadlineAfter(conf.getWarmUpTimeout(), TimeUnit.MILLISECONDS);
Pair<ByteString, ByteString> range = codec.encodePdQueryRange(startKey, endKey);
Pdpb.ScanRegionsRequest request =
Pdpb.ScanRegionsRequest.newBuilder()
.setHeader(header)
.setStartKey(startKey)
.setEndKey(endKey)
.setStartKey(range.first)
.setEndKey(range.second)
.setLimit(limit)
.build();
Pdpb.ScanRegionsResponse resp = stub.scanRegions(request);
if (resp == null) {
return null;
}
return resp.getRegionsList();

return codec.decodePdRegions(resp.getRegionsList());
}

private Supplier<GetStoreRequest> buildGetStoreReq(long storeId) {
Expand Down Expand Up @@ -811,54 +814,15 @@ public String toString() {
}
}

private Metapb.Region decodeRegion(Metapb.Region region) {
final boolean isRawRegion = conf.isRawKVMode();
Metapb.Region.Builder builder =
Metapb.Region.newBuilder()
.setId(region.getId())
.setRegionEpoch(region.getRegionEpoch())
.addAllPeers(region.getPeersList());

if (region.getStartKey().isEmpty() || isRawRegion) {
builder.setStartKey(region.getStartKey());
} else {
if (!conf.isTest()) {
byte[] decodedStartKey = BytesCodec.readBytes(new CodecDataInput(region.getStartKey()));
builder.setStartKey(ByteString.copyFrom(decodedStartKey));
} else {
try {
byte[] decodedStartKey = BytesCodec.readBytes(new CodecDataInput(region.getStartKey()));
builder.setStartKey(ByteString.copyFrom(decodedStartKey));
} catch (Exception e) {
builder.setStartKey(region.getStartKey());
}
}
}

if (region.getEndKey().isEmpty() || isRawRegion) {
builder.setEndKey(region.getEndKey());
} else {
if (!conf.isTest()) {
byte[] decodedEndKey = BytesCodec.readBytes(new CodecDataInput(region.getEndKey()));
builder.setEndKey(ByteString.copyFrom(decodedEndKey));
} else {
try {
byte[] decodedEndKey = BytesCodec.readBytes(new CodecDataInput(region.getEndKey()));
builder.setEndKey(ByteString.copyFrom(decodedEndKey));
} catch (Exception e) {
builder.setEndKey(region.getEndKey());
}
}
}

return builder.build();
}

public Long getClusterId() {
return header.getClusterId();
}

public List<URI> getPdAddrs() {
return pdAddrs;
}

public RequestKeyCodec getCodec() {
return codec;
}
}
3 changes: 3 additions & 0 deletions src/main/java/org/tikv/common/ReadOnlyPDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.protobuf.ByteString;
import java.util.List;
import org.tikv.common.apiversion.RequestKeyCodec;
import org.tikv.common.meta.TiTimestamp;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.Pair;
Expand Down Expand Up @@ -69,4 +70,6 @@ List<Pdpb.Region> scanRegions(
TiConfiguration.ReplicaRead getReplicaRead();

Long getClusterId();

RequestKeyCodec getCodec();
}
1 change: 0 additions & 1 deletion src/main/java/org/tikv/common/StoreVersion.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.tikv.kvproto.Metapb;

public class StoreVersion {

private static final int SCALE = 10000;
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private int v0 = 9999;
Expand Down
55 changes: 54 additions & 1 deletion src/main/java/org/tikv/common/TiConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import static org.tikv.common.ConfigUtils.DEF_SHOW_ROWID;
import static org.tikv.common.ConfigUtils.DEF_TABLE_SCAN_CONCURRENCY;
import static org.tikv.common.ConfigUtils.DEF_TIFLASH_ENABLE;
import static org.tikv.common.ConfigUtils.DEF_TIKV_API_VERSION;
import static org.tikv.common.ConfigUtils.DEF_TIKV_BO_REGION_MISS_BASE_IN_MS;
import static org.tikv.common.ConfigUtils.DEF_TIKV_CONN_RECYCLE_TIME;
import static org.tikv.common.ConfigUtils.DEF_TIKV_ENABLE_ATOMIC_FOR_CAS;
Expand Down Expand Up @@ -82,6 +83,7 @@
import static org.tikv.common.ConfigUtils.READ_COMMITTED_ISOLATION_LEVEL;
import static org.tikv.common.ConfigUtils.SNAPSHOT_ISOLATION_LEVEL;
import static org.tikv.common.ConfigUtils.TIFLASH_ENABLE;
import static org.tikv.common.ConfigUtils.TIKV_API_VERSION;
import static org.tikv.common.ConfigUtils.TIKV_BATCH_DELETE_CONCURRENCY;
import static org.tikv.common.ConfigUtils.TIKV_BATCH_GET_CONCURRENCY;
import static org.tikv.common.ConfigUtils.TIKV_BATCH_PUT_CONCURRENCY;
Expand Down Expand Up @@ -154,6 +156,7 @@
import static org.tikv.common.ConfigUtils.TiKV_CIRCUIT_BREAK_ENABLE;
import static org.tikv.common.ConfigUtils.TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS;

import com.google.protobuf.ByteString;
import io.grpc.Metadata;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -173,17 +176,19 @@
import org.slf4j.LoggerFactory;
import org.tikv.common.pd.PDUtils;
import org.tikv.common.replica.ReplicaSelector;
import org.tikv.kvproto.Kvrpcpb;
import org.tikv.kvproto.Kvrpcpb.CommandPri;
import org.tikv.kvproto.Kvrpcpb.IsolationLevel;

public class TiConfiguration implements Serializable {

private static final Logger logger = LoggerFactory.getLogger(TiConfiguration.class);
private static final ConcurrentHashMap<String, String> settings = new ConcurrentHashMap<>();
public static final Metadata.Key<String> FORWARD_META_DATA_KEY =
Metadata.Key.of("tikv-forwarded-host", Metadata.ASCII_STRING_MARSHALLER);
public static final Metadata.Key<String> PD_FORWARD_META_DATA_KEY =
Metadata.Key.of("pd-forwarded-host", Metadata.ASCII_STRING_MARSHALLER);
public static final ByteString API_V2_RAW_PREFIX = ByteString.copyFromUtf8("r");
public static final ByteString API_V2_TXN_PREFIX = ByteString.copyFromUtf8("x");

static {
// priority: system environment > config file > default
Expand Down Expand Up @@ -296,6 +301,8 @@ private static void loadFromDefaultProperties() {
setIfMissing(
TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT, DEF_TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT);
setIfMissing(TIKV_SCAN_REGIONS_LIMIT, DEF_TIKV_SCAN_REGIONS_LIMIT);

setIfMissing(TIKV_API_VERSION, DEF_TIKV_API_VERSION);
}

public static void listAll() {
Expand Down Expand Up @@ -551,6 +558,8 @@ private static ReplicaRead getReplicaRead(String key) {

private int scanRegionsLimit = getInt(TIKV_SCAN_REGIONS_LIMIT);

private ApiVersion apiVersion = ApiVersion.fromInt(getInt(TIKV_API_VERSION));

public enum KVMode {
TXN,
RAW
Expand Down Expand Up @@ -1241,4 +1250,48 @@ public int getScanRegionsLimit() {
public void setScanRegionsLimit(int scanRegionsLimit) {
this.scanRegionsLimit = scanRegionsLimit;
}

public ApiVersion getApiVersion() {
return apiVersion;
}

public TiConfiguration setApiVersion(ApiVersion version) {
this.apiVersion = version;
return this;
}

public enum ApiVersion {
V1,
V2;

public static ApiVersion fromInt(int version) {
switch (version) {
case 1:
return V1;
case 2:
return V2;
default:
throw new IllegalArgumentException("unknown api version " + version);
}
}

public boolean isV1() {
return this == V1;
}

public boolean isV2() {
return this == V2;
}

public Kvrpcpb.APIVersion toPb() {
switch (this) {
case V1:
return Kvrpcpb.APIVersion.V1;
case V2:
return Kvrpcpb.APIVersion.V2;
default:
throw new IllegalArgumentException("unknown api version " + this);
}
}
}
}
Loading