Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .ci/integration_test.groovy
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
def call(ghprbActualCommit, ghprbPullId, ghprbPullTitle, ghprbPullLink, ghprbPullDescription, credentialsId) {

def TIDB_BRANCH = "release-5.0"
def TIKV_BRANCH = "release-5.0"
def PD_BRANCH = "release-5.0"
def TIDB_BRANCH = "master"
def TIKV_BRANCH = "master"
def PD_BRANCH = "master"

// parse tidb branch
def m1 = ghprbCommentBody =~ /tidb\s*=\s*([^\s\\]+)(\s|\\|$)/
Expand Down
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,18 @@ The following includes JVM related parameters.
- timeout of scan/delete range grpc request
- default: 20s

#### tikv.importer.max_kv_batch_bytes
- Maximal package size transporting from clients to TiKV Server (ingest API)
- default: 1048576 (1M)

#### tikv.importer.max_kv_batch_size
- Maximal batch size transporting from clients to TiKV Server (ingest API)
- default: 32768 (32K)

#### tikv.scatter_wait_seconds
- time to wait for scattering regions
- default: 300 (5min)

### Metrics Parameter

#### tikv.metrics.enable
Expand Down
2 changes: 1 addition & 1 deletion scripts/proto.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.
#

kvproto_hash=6ed99a08e262d8a32d6355dcba91cf99cb92074a
kvproto_hash=2ac2a7984b2d01b96ed56fd8474f4bf80fa33c51
raft_rs_hash=b9891b673573fad77ebcf9bbe0969cf945841926
tipb_hash=c4d518eb1d60c21f05b028b36729e64610346dac

Expand Down
9 changes: 9 additions & 0 deletions src/main/java/org/tikv/common/ConfigUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ public class ConfigUtils {

public static final String TIKV_ENABLE_ATOMIC_FOR_CAS = "tikv.enable_atomic_for_cas";

public static final String TIKV_IMPORTER_MAX_KV_BATCH_BYTES = "tikv.importer.max_kv_batch_bytes";
public static final String TIKV_IMPORTER_MAX_KV_BATCH_SIZE = "tikv.importer.max_kv_batch_size";

public static final String TIKV_SCATTER_WAIT_SECONDS = "tikv.scatter_wait_seconds";

public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379";
public static final String DEF_TIMEOUT = "200ms";
public static final String DEF_FORWARD_TIMEOUT = "300ms";
Expand Down Expand Up @@ -89,6 +94,10 @@ public class ConfigUtils {
public static final boolean DEF_GRPC_FORWARD_ENABLE = true;
public static final boolean DEF_TIKV_ENABLE_ATOMIC_FOR_CAS = false;

public static final int DEF_TIKV_IMPORTER_MAX_KV_BATCH_BYTES = 1024 * 1024;
public static final int DEF_TIKV_IMPORTER_MAX_KV_BATCH_SIZE = 1024 * 32;
public static final int DEF_TIKV_SCATTER_WAIT_SECONDS = 300;

public static final String NORMAL_COMMAND_PRIORITY = "NORMAL";
public static final String LOW_COMMAND_PRIORITY = "LOW";
public static final String HIGH_COMMAND_PRIORITY = "HIGH";
Expand Down
5 changes: 2 additions & 3 deletions src/main/java/org/tikv/common/PDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.TiConfiguration.KVMode;
import org.tikv.common.codec.Codec.BytesCodec;
import org.tikv.common.codec.CodecDataInput;
import org.tikv.common.codec.CodecDataOutput;
Expand Down Expand Up @@ -230,7 +229,7 @@ private boolean isScatterRegionFinish(GetOperatorResponse resp) {
public Pair<Metapb.Region, Metapb.Peer> getRegionByKey(BackOffer backOffer, ByteString key) {
Histogram.Timer requestTimer = PD_GET_REGION_BY_KEY_REQUEST_LATENCY.startTimer();
try {
if (conf.getKvMode() == KVMode.TXN) {
if (conf.isTxnKVMode()) {
CodecDataOutput cdo = new CodecDataOutput();
BytesCodec.writeBytes(cdo, key.toByteArray());
key = cdo.toByteString();
Expand Down Expand Up @@ -679,7 +678,7 @@ public String toString() {
}

private Metapb.Region decodeRegion(Metapb.Region region) {
final boolean isRawRegion = conf.getKvMode() == KVMode.RAW;
final boolean isRawRegion = conf.isRawKVMode();
Metapb.Region.Builder builder =
Metapb.Region.newBuilder()
.setId(region.getId())
Expand Down
41 changes: 41 additions & 0 deletions src/main/java/org/tikv/common/TiConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ private static void loadFromDefaultProperties() {
setIfMissing(TIKV_GRPC_HEALTH_CHECK_TIMEOUT, DEF_CHECK_HEALTH_TIMEOUT);
setIfMissing(TIKV_HEALTH_CHECK_PERIOD_DURATION, DEF_HEALTH_CHECK_PERIOD_DURATION);
setIfMissing(TIKV_ENABLE_ATOMIC_FOR_CAS, DEF_TIKV_ENABLE_ATOMIC_FOR_CAS);
setIfMissing(TIKV_IMPORTER_MAX_KV_BATCH_BYTES, DEF_TIKV_IMPORTER_MAX_KV_BATCH_BYTES);
setIfMissing(TIKV_IMPORTER_MAX_KV_BATCH_SIZE, DEF_TIKV_IMPORTER_MAX_KV_BATCH_SIZE);
setIfMissing(TIKV_SCATTER_WAIT_SECONDS, DEF_TIKV_SCATTER_WAIT_SECONDS);
}

public static void listAll() {
Expand Down Expand Up @@ -273,6 +276,12 @@ private static ReplicaRead getReplicaRead(String key) {

private boolean enableAtomicForCAS = getBoolean(TIKV_ENABLE_ATOMIC_FOR_CAS);

private int importerMaxKVBatchBytes = getInt(TIKV_IMPORTER_MAX_KV_BATCH_BYTES);

private int importerMaxKVBatchSize = getInt(TIKV_IMPORTER_MAX_KV_BATCH_SIZE);

private int scatterWaitSeconds = getInt(TIKV_SCATTER_WAIT_SECONDS);

public enum KVMode {
TXN,
RAW
Expand Down Expand Up @@ -489,6 +498,14 @@ public KVMode getKvMode() {
return kvMode;
}

public boolean isRawKVMode() {
return getKvMode() == TiConfiguration.KVMode.RAW;
}

public boolean isTxnKVMode() {
return getKvMode() == KVMode.TXN;
}

public TiConfiguration setKvMode(String kvMode) {
this.kvMode = KVMode.valueOf(kvMode);
return this;
Expand Down Expand Up @@ -586,4 +603,28 @@ public boolean isEnableAtomicForCAS() {
public void setEnableAtomicForCAS(boolean enableAtomicForCAS) {
this.enableAtomicForCAS = enableAtomicForCAS;
}

public int getImporterMaxKVBatchBytes() {
return importerMaxKVBatchBytes;
}

public void setImporterMaxKVBatchBytes(int importerMaxKVBatchBytes) {
this.importerMaxKVBatchBytes = importerMaxKVBatchBytes;
}

public int getImporterMaxKVBatchSize() {
return importerMaxKVBatchSize;
}

public void setImporterMaxKVBatchSize(int importerMaxKVBatchSize) {
this.importerMaxKVBatchSize = importerMaxKVBatchSize;
}

public int getScatterWaitSeconds() {
return scatterWaitSeconds;
}

public void setScatterWaitSeconds(int scatterWaitSeconds) {
this.scatterWaitSeconds = scatterWaitSeconds;
}
}
66 changes: 62 additions & 4 deletions src/main/java/org/tikv/common/TiSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.slf4j.LoggerFactory;
import org.tikv.common.catalog.Catalog;
import org.tikv.common.exception.TiKVException;
import org.tikv.common.importer.ImporterStoreClient;
import org.tikv.common.importer.SwitchTiKVModeClient;
import org.tikv.common.key.Key;
import org.tikv.common.meta.TiTimestamp;
import org.tikv.common.region.RegionManager;
Expand Down Expand Up @@ -67,8 +69,10 @@ public class TiSession implements AutoCloseable {
private volatile RegionManager regionManager;
private volatile boolean enableGrpcForward;
private volatile RegionStoreClient.RegionStoreClientBuilder clientBuilder;
private volatile ImporterStoreClient.ImporterStoreClientBuilder importerClientBuilder;
private boolean isClosed = false;
private MetricsServer metricsServer;
private static final int MAX_SPLIT_REGION_STACK_DEPTH = 6;

public TiSession(TiConfiguration conf) {
this.conf = conf;
Expand Down Expand Up @@ -132,6 +136,21 @@ public RegionStoreClient.RegionStoreClientBuilder getRegionStoreClientBuilder()
return res;
}

public ImporterStoreClient.ImporterStoreClientBuilder getImporterRegionStoreClientBuilder() {
ImporterStoreClient.ImporterStoreClientBuilder res = importerClientBuilder;
if (res == null) {
synchronized (this) {
if (importerClientBuilder == null) {
importerClientBuilder =
new ImporterStoreClient.ImporterStoreClientBuilder(
conf, this.channelFactory, this.getRegionManager(), this.getPDClient());
}
res = importerClientBuilder;
}
}
return res;
}

public TiConfiguration getConf() {
return conf;
}
Expand Down Expand Up @@ -322,10 +341,22 @@ public ChannelFactory getChannelFactory() {
return channelFactory;
}

/**
* SwitchTiKVModeClient is used for SST Ingest.
*
* @return a SwitchTiKVModeClient
*/
public SwitchTiKVModeClient getSwitchTiKVModeClient() {
return new SwitchTiKVModeClient(getPDClient(), getImporterRegionStoreClientBuilder());
}

/**
* split region and scatter
*
* @param splitKeys
* @param splitRegionBackoffMS
* @param scatterRegionBackoffMS
* @param scatterWaitMS
*/
public void splitRegionAndScatter(
List<byte[]> splitKeys,
Expand All @@ -340,7 +371,7 @@ public void splitRegionAndScatter(
splitRegion(
splitKeys
.stream()
.map(k -> Key.toRawKey(k).next().toByteString())
.map(k -> Key.toRawKey(k).toByteString())
.collect(Collectors.toList()),
ConcreteBackOffer.newCustomBackOff(splitRegionBackoffMS));

Expand Down Expand Up @@ -375,11 +406,28 @@ public void splitRegionAndScatter(
logger.info("splitRegionAndScatter cost {} seconds", (endMS - startMS) / 1000);
}

/**
* split region and scatter
*
* @param splitKeys
*/
public void splitRegionAndScatter(List<byte[]> splitKeys) {
int splitRegionBackoffMS = BackOffer.SPLIT_REGION_BACKOFF;
int scatterRegionBackoffMS = BackOffer.SCATTER_REGION_BACKOFF;
int scatterWaitMS = conf.getScatterWaitSeconds() * 1000;
splitRegionAndScatter(splitKeys, splitRegionBackoffMS, scatterRegionBackoffMS, scatterWaitMS);
}

private List<Metapb.Region> splitRegion(List<ByteString> splitKeys, BackOffer backOffer) {
return splitRegion(splitKeys, backOffer, 1);
}

private List<Metapb.Region> splitRegion(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use recursive function to retry rather than a loop?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a simple retry, e.g.

  • we need to split 6 keys
  • 2 keys are split successfully, 4 keys are failed
  • retry to split the 4 failed keys

It's easy to use a recursive function to implement this logic.

List<ByteString> splitKeys, BackOffer backOffer, int depth) {
List<Metapb.Region> regions = new ArrayList<>();

Map<TiRegion, List<ByteString>> groupKeys =
groupKeysByRegion(regionManager, splitKeys, backOffer);
groupKeysByRegion(getRegionManager(), splitKeys, backOffer);
for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {

Pair<TiRegion, TiStore> pair =
Expand All @@ -401,12 +449,22 @@ private List<Metapb.Region> splitRegion(List<ByteString> splitKeys, BackOffer ba
List<Metapb.Region> newRegions;
try {
newRegions = getRegionStoreClientBuilder().build(region, store).splitRegion(splits);
// invalidate old region
getRegionManager().invalidateRegion(region);
} catch (final TiKVException e) {
// retry
logger.warn("ReSplitting ranges for splitRegion", e);
clientBuilder.getRegionManager().invalidateRegion(region);
getRegionManager().invalidateRegion(region);
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
newRegions = splitRegion(splits, backOffer);
if (depth >= MAX_SPLIT_REGION_STACK_DEPTH) {
logger.warn(
String.format(
"Skip split region because MAX_SPLIT_REGION_STACK_DEPTH(%d) reached!",
MAX_SPLIT_REGION_STACK_DEPTH));
newRegions = new ArrayList<>();
} else {
newRegions = splitRegion(splits, backOffer, depth + 1);
}
}
logger.info("region id={}, new region size={}", region.getId(), newRegions.size());
regions.addAll(newRegions);
Expand Down
Loading