Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ dependencies {
compile group: 'joda-time', name: 'joda-time', version:'2.9.9'
compile group: 'org.joda', name: 'joda-convert', version:'1.9.2'
testCompile group: 'io.grpc', name: 'grpc-testing', version:'1.7.0'
//remove unused hadoop dependencies
/*compile group: 'org.apache.logging.log4j', name: 'log4j-api', version:'2.8.1'
compile group: 'org.apache.logging.log4j', name: 'log4j-core', version:'2.8.1'
compile group: 'org.apache.spark', name: 'spark-core_2.11', version:'2.3.2'
Expand Down
3 changes: 2 additions & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#Wed Feb 13 17:14:42 CST 2019
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-4.10.2-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-4.10.2-all.zip
17 changes: 10 additions & 7 deletions src/main/java/org/tikv/common/AbstractGRPCClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,24 @@
import org.tikv.common.policy.RetryPolicy;
import org.tikv.common.streaming.StreamingResponse;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ChannelFactory;

public abstract class AbstractGRPCClient<
BlockingStubT extends AbstractStub<BlockingStubT>, StubT extends AbstractStub<StubT>>
implements AutoCloseable {
protected final Logger logger = Logger.getLogger(this.getClass());
protected final TiConfiguration conf;
protected final ChannelFactory channelFactory;
protected TiSession session;
protected TiConfiguration conf;

protected AbstractGRPCClient(TiConfiguration conf, ChannelFactory channelFactory) {
this.conf = conf;
this.channelFactory = channelFactory;
protected AbstractGRPCClient(TiSession session) {
this.session = session;
this.conf = session.getConf();
}

protected TiConfiguration getConf() {
public TiSession getSession() {
return session;
}

public TiConfiguration getConf() {
return conf;
}

Expand Down
23 changes: 10 additions & 13 deletions src/main/java/org/tikv/common/PDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.tikv.common.TiConfiguration.KVMode;
import org.tikv.common.codec.Codec.BytesCodec;
import org.tikv.common.codec.CodecDataOutput;
import org.tikv.common.exception.GrpcException;
Expand All @@ -42,15 +41,13 @@
import org.tikv.common.operation.PDErrorHandler;
import org.tikv.common.region.TiRegion;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ChannelFactory;
import org.tikv.common.util.FutureObserver;
import org.tikv.kvproto.Metapb.Store;
import org.tikv.kvproto.PDGrpc;
import org.tikv.kvproto.PDGrpc.PDBlockingStub;
import org.tikv.kvproto.PDGrpc.PDStub;
import org.tikv.kvproto.Pdpb.*;

/** PDClient is thread-safe and suggested to be shared threads */
public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
implements ReadOnlyPDClient {
private RequestHeader header;
Expand All @@ -76,7 +73,7 @@ public TiTimestamp getTimestamp(BackOffer backOffer) {
@Override
public TiRegion getRegionByKey(BackOffer backOffer, ByteString key) {
Supplier<GetRegionRequest> request;
if (conf.getKvMode() == KVMode.RAW) {
if (conf.getKvMode().equalsIgnoreCase("RAW")) {
request = () -> GetRegionRequest.newBuilder().setHeader(header).setRegionKey(key).build();
} else {
CodecDataOutput cdo = new CodecDataOutput();
Expand Down Expand Up @@ -198,8 +195,8 @@ public void close() {
}
}

public static ReadOnlyPDClient create(TiConfiguration conf, ChannelFactory channelFactory) {
return createRaw(conf, channelFactory);
public static ReadOnlyPDClient create(TiSession session) {
return createRaw(session);
}

@VisibleForTesting
Expand Down Expand Up @@ -250,7 +247,7 @@ void close() {}

public GetMembersResponse getMembers(HostAndPort url) {
try {
ManagedChannel probChan = channelFactory.getChannel(url.getHostText() + ":" + url.getPort());
ManagedChannel probChan = session.getChannel(url.getHostText() + ":" + url.getPort());
PDGrpc.PDBlockingStub stub = PDGrpc.newBlockingStub(probChan);
GetMembersRequest request =
GetMembersRequest.newBuilder().setHeader(RequestHeader.getDefaultInstance()).build();
Expand Down Expand Up @@ -282,7 +279,7 @@ private boolean createLeaderWrapper(String leaderUrlStr) {
}

// create new Leader
ManagedChannel clientChannel = channelFactory.getChannel(leaderUrlStr);
ManagedChannel clientChannel = session.getChannel(leaderUrlStr);
leaderWrapper =
new LeaderWrapper(
leaderUrlStr,
Expand Down Expand Up @@ -333,13 +330,13 @@ protected PDStub getAsyncStub() {
.withDeadlineAfter(getConf().getTimeout(), getConf().getTimeoutUnit());
}

private PDClient(TiConfiguration conf, ChannelFactory channelFactory) {
super(conf, channelFactory);
private PDClient(TiSession session) {
super(session);
}

private void initCluster() {
GetMembersResponse resp = null;
List<HostAndPort> pdAddrs = getConf().getPdAddrs();
List<HostAndPort> pdAddrs = getSession().getConf().getPdAddrs();
for (HostAndPort u : pdAddrs) {
resp = getMembers(u);
if (resp != null) {
Expand Down Expand Up @@ -369,10 +366,10 @@ private void initCluster() {
TimeUnit.MINUTES);
}

static PDClient createRaw(TiConfiguration conf, ChannelFactory channelFactory) {
static PDClient createRaw(TiSession session) {
PDClient client = null;
try {
client = new PDClient(conf, channelFactory);
client = new PDClient(session);
client.initCluster();
} catch (Exception e) {
if (client != null) {
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/org/tikv/common/ReadOnlyPDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,7 @@ public interface ReadOnlyPDClient {
Store getStore(BackOffer backOffer, long storeId);

Future<Store> getStoreAsync(BackOffer backOffer, long storeId);

/** Get associated session * @return the session associated to client */
TiSession getSession();
}
108 changes: 108 additions & 0 deletions src/main/java/org/tikv/common/Snapshot.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright 2017 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.tikv.common;

import static org.tikv.common.util.KeyRangeUtils.makeRange;

import com.google.common.collect.Range;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.tikv.common.exception.TiClientInternalException;
import org.tikv.common.key.Key;
import org.tikv.common.meta.TiTimestamp;
import org.tikv.common.operation.iterator.ConcreteScanIterator;
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.TiRegion;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.common.util.Pair;
import org.tikv.kvproto.Kvrpcpb.KvPair;
import org.tikv.kvproto.Metapb.Store;

public class Snapshot {
private final TiTimestamp timestamp;
private final TiSession session;
private final TiConfiguration conf;

public Snapshot(TiTimestamp timestamp, TiSession session) {
this.timestamp = timestamp;
this.session = session;
this.conf = session.getConf();
}

public TiSession getSession() {
return session;
}

public long getVersion() {
return timestamp.getVersion();
}

public TiTimestamp getTimestamp() {
return timestamp;
}

public byte[] get(byte[] key) {
ByteString keyString = ByteString.copyFrom(key);
ByteString value = get(keyString);
return value.toByteArray();
}

public ByteString get(ByteString key) {
Pair<TiRegion, Store> pair = session.getRegionManager().getRegionStorePairByKey(key);
RegionStoreClient client = RegionStoreClient.create(pair.first, pair.second, getSession());
// TODO: Need to deal with lock error after grpc stable
return client.get(ConcreteBackOffer.newGetBackOff(), key, timestamp.getVersion());
}

public Iterator<KvPair> scan(ByteString startKey) {
return new ConcreteScanIterator(startKey, session, timestamp.getVersion());
}

// TODO: Need faster implementation, say concurrent version
// Assume keys sorted
public List<KvPair> batchGet(List<ByteString> keys) {
TiRegion curRegion = null;
Range<Key> curKeyRange = null;
Pair<TiRegion, Store> lastPair;
List<ByteString> keyBuffer = new ArrayList<>();
List<KvPair> result = new ArrayList<>(keys.size());
BackOffer backOffer = ConcreteBackOffer.newBatchGetMaxBackOff();
for (ByteString key : keys) {
if (curRegion == null || !curKeyRange.contains(Key.toRawKey(key))) {
Pair<TiRegion, Store> pair = session.getRegionManager().getRegionStorePairByKey(key);
lastPair = pair;
curRegion = pair.first;
curKeyRange = makeRange(curRegion.getStartKey(), curRegion.getEndKey());

try (RegionStoreClient client =
RegionStoreClient.create(lastPair.first, lastPair.second, getSession())) {
List<KvPair> partialResult =
client.batchGet(backOffer, keyBuffer, timestamp.getVersion());
// TODO: Add lock check
result.addAll(partialResult);
} catch (Exception e) {
throw new TiClientInternalException("Error Closing Store client.", e);
}
keyBuffer = new ArrayList<>();
keyBuffer.add(key);
}
}
return result;
}
}
15 changes: 5 additions & 10 deletions src/main/java/org/tikv/common/TiConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class TiConfiguration implements Serializable {
private static final IsolationLevel DEF_ISOLATION_LEVEL = IsolationLevel.RC;
private static final boolean DEF_SHOW_ROWID = false;
private static final String DEF_DB_PREFIX = "";
private static final KVMode DEF_KV_MODE = KVMode.TXN;
private static final String DEF_KV_MODE = "KV";
private static final int DEF_RAW_CLIENT_CONCURRENCY = 200;

private int timeout = DEF_TIMEOUT;
Expand All @@ -63,14 +63,9 @@ public class TiConfiguration implements Serializable {
private int maxRequestKeyRangeSize = MAX_REQUEST_KEY_RANGE_SIZE;
private boolean showRowId = DEF_SHOW_ROWID;
private String dbPrefix = DEF_DB_PREFIX;
private KVMode kvMode = DEF_KV_MODE;
private String kvMode = DEF_KV_MODE;
private int rawClientConcurrency = DEF_RAW_CLIENT_CONCURRENCY;

public enum KVMode {
TXN,
RAW
}

public static TiConfiguration createDefault(String pdAddrsStr) {
Objects.requireNonNull(pdAddrsStr, "pdAddrsStr is null");
TiConfiguration conf = new TiConfiguration();
Expand All @@ -82,7 +77,7 @@ public static TiConfiguration createRawDefault(String pdAddrsStr) {
Objects.requireNonNull(pdAddrsStr, "pdAddrsStr is null");
TiConfiguration conf = new TiConfiguration();
conf.pdAddrs = strToHostAndPort(pdAddrsStr);
conf.kvMode = KVMode.RAW;
conf.kvMode = "RAW";
return conf;
}

Expand Down Expand Up @@ -234,12 +229,12 @@ public void setDBPrefix(String dbPrefix) {
this.dbPrefix = dbPrefix;
}

public KVMode getKvMode() {
public String getKvMode() {
return kvMode;
}

public void setKvMode(String kvMode) {
this.kvMode = KVMode.valueOf(kvMode);
this.kvMode = kvMode;
}

public int getRawClientConcurrency() {
Expand Down
Loading