Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.9</version>
<version>3.10</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
87 changes: 87 additions & 0 deletions src/main/java/org/tikv/common/DefaultHostMapping.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.tikv.common;

import static org.tikv.common.pd.PDUtils.addrToUri;

import com.google.common.annotations.Beta;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.kv.GetResponse;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultHostMapping implements HostMapping {
private static final String NETWORK_MAPPING_PATH = "/client/url-mapping";
private final Client etcdClient;
private final String networkMappingName;
private final ConcurrentMap<String, String> hostMapping;
private final Logger logger = LoggerFactory.getLogger(DefaultHostMapping.class);

public DefaultHostMapping(Client etcdClient, String networkMappingName) {
this.etcdClient = etcdClient;
this.networkMappingName = networkMappingName;
this.hostMapping = new ConcurrentHashMap<>();
}

private ByteSequence hostToNetworkMappingKey(String host) {
String path = NETWORK_MAPPING_PATH + "/" + networkMappingName + "/" + host;
return ByteSequence.from(path, StandardCharsets.UTF_8);
}

@Beta
private String getMappedHostFromPD(String host) {
ByteSequence hostKey = hostToNetworkMappingKey(host);
for (int i = 0; i < 5; i++) {
CompletableFuture<GetResponse> future = etcdClient.getKVClient().get(hostKey);
try {
GetResponse resp = future.get();
List<KeyValue> kvs = resp.getKvs();
if (kvs.size() != 1) {
break;
}
return kvs.get(0).getValue().toString(StandardCharsets.UTF_8);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
logger.info("failed to get mapped Host from PD: " + host, e);
break;
} catch (Exception ignore) {
// ignore
break;
}
}
return host;
}

public URI getMappedURI(URI uri) {
if (networkMappingName.isEmpty()) {
return uri;
}
return addrToUri(
hostMapping.computeIfAbsent(uri.getHost(), this::getMappedHostFromPD)
+ ":"
+ uri.getPort());
}
}
70 changes: 3 additions & 67 deletions src/main/java/org/tikv/common/HostMapping.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,73 +15,9 @@

package org.tikv.common;

import static org.tikv.common.pd.PDUtils.addrToUri;

import com.google.common.annotations.Beta;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.kv.GetResponse;
import java.io.Serializable;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HostMapping {
private static final String NETWORK_MAPPING_PATH = "/client/url-mapping";
private final Client etcdClient;
private final String networkMappingName;
private final ConcurrentMap<String, String> hostMapping;
private final Logger logger = LoggerFactory.getLogger(HostMapping.class);

public HostMapping(Client etcdClient, String networkMappingName) {
this.etcdClient = etcdClient;
this.networkMappingName = networkMappingName;
this.hostMapping = new ConcurrentHashMap<>();
}

private ByteSequence hostToNetworkMappingKey(String host) {
String path = NETWORK_MAPPING_PATH + "/" + networkMappingName + "/" + host;
return ByteSequence.from(path, StandardCharsets.UTF_8);
}

@Beta
private String getMappedHostFromPD(String host) {
ByteSequence hostKey = hostToNetworkMappingKey(host);
for (int i = 0; i < 5; i++) {
CompletableFuture<GetResponse> future = etcdClient.getKVClient().get(hostKey);
try {
GetResponse resp = future.get();
List<KeyValue> kvs = resp.getKvs();
if (kvs.size() != 1) {
break;
}
return kvs.get(0).getValue().toString(StandardCharsets.UTF_8);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
logger.info("failed to get mapped Host from PD: " + host, e);
break;
} catch (Exception ignore) {
// ignore
break;
}
}
return host;
}

public URI getMappedURI(URI uri) {
if (networkMappingName.isEmpty()) {
return uri;
}
return addrToUri(
hostMapping.computeIfAbsent(uri.getHost(), this::getMappedHostFromPD)
+ ":"
+ uri.getPort());
}
public interface HostMapping extends Serializable {
URI getMappedURI(URI uri);
}
62 changes: 39 additions & 23 deletions src/main/java/org/tikv/common/PDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -49,18 +50,20 @@
import org.slf4j.LoggerFactory;
import org.tikv.common.TiConfiguration.KVMode;
import org.tikv.common.codec.Codec.BytesCodec;
import org.tikv.common.codec.CodecDataInput;
import org.tikv.common.codec.CodecDataOutput;
import org.tikv.common.codec.KeyUtils;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.TiClientInternalException;
import org.tikv.common.meta.TiTimestamp;
import org.tikv.common.operation.NoopHandler;
import org.tikv.common.operation.PDErrorHandler;
import org.tikv.common.region.TiRegion;
import org.tikv.common.util.BackOffFunction.BackOffFuncType;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ChannelFactory;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.common.util.Pair;
import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.Metapb.Store;
import org.tikv.kvproto.PDGrpc;
import org.tikv.kvproto.PDGrpc.PDBlockingStub;
Expand Down Expand Up @@ -145,7 +148,7 @@ public TiTimestamp getTimestamp(BackOffer backOffer) {
*
* @param region represents a region info
*/
void scatterRegion(TiRegion region, BackOffer backOffer) {
void scatterRegion(Metapb.Region region, BackOffer backOffer) {
Supplier<ScatterRegionRequest> request =
() ->
ScatterRegionRequest.newBuilder().setHeader(header).setRegionId(region.getId()).build();
Expand All @@ -169,7 +172,7 @@ void scatterRegion(TiRegion region, BackOffer backOffer) {
*
* @param region
*/
void waitScatterRegionFinish(TiRegion region, BackOffer backOffer) {
void waitScatterRegionFinish(Metapb.Region region, BackOffer backOffer) {
for (; ; ) {
GetOperatorResponse resp = getOperator(region.getId());
if (resp != null) {
Expand Down Expand Up @@ -222,7 +225,7 @@ private boolean isScatterRegionFinish(GetOperatorResponse resp) {
}

@Override
public TiRegion getRegionByKey(BackOffer backOffer, ByteString key) {
public Pair<Metapb.Region, Metapb.Peer> getRegionByKey(BackOffer backOffer, ByteString key) {
Histogram.Timer requestTimer = PD_GET_REGION_BY_KEY_REQUEST_LATENCY.startTimer();
try {
if (conf.getKvMode() == KVMode.TXN) {
Expand All @@ -240,37 +243,22 @@ public TiRegion getRegionByKey(BackOffer backOffer, ByteString key) {

GetRegionResponse resp =
callWithRetry(backOffer, PDGrpc.getGetRegionMethod(), request, handler);
return new TiRegion(
resp.getRegion(),
resp.getLeader(),
null,
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode(),
conf.getReplicaSelector());
return new Pair<Metapb.Region, Metapb.Peer>(decodeRegion(resp.getRegion()), resp.getLeader());
} finally {
requestTimer.observeDuration();
}
}

@Override
public TiRegion getRegionByID(BackOffer backOffer, long id) {
public Pair<Metapb.Region, Metapb.Peer> getRegionByID(BackOffer backOffer, long id) {
Supplier<GetRegionByIDRequest> request =
() -> GetRegionByIDRequest.newBuilder().setHeader(header).setRegionId(id).build();
PDErrorHandler<GetRegionResponse> handler =
new PDErrorHandler<>(getRegionResponseErrorExtractor, this);

GetRegionResponse resp =
callWithRetry(backOffer, PDGrpc.getGetRegionByIDMethod(), request, handler);
// Instead of using default leader instance, explicitly set no leader to null
return new TiRegion(
resp.getRegion(),
resp.getLeader(),
null,
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode(),
conf.getReplicaSelector());
return new Pair<Metapb.Region, Metapb.Peer>(decodeRegion(resp.getRegion()), resp.getLeader());
}

private Supplier<GetStoreRequest> buildGetStoreReq(long storeId) {
Expand Down Expand Up @@ -567,12 +555,15 @@ private void initCluster() {
.setDaemon(true)
.build()))
.build();
this.hostMapping = new HostMapping(this.etcdClient, conf.getNetworkMappingName());
this.hostMapping =
Optional.ofNullable(getConf().getHostMapping())
.orElseGet(() -> new DefaultHostMapping(this.etcdClient, conf.getNetworkMappingName()));
for (URI u : pdAddrs) {
resp = getMembers(u);
if (resp != null) {
break;
}
logger.info("Could not get leader member with pd: " + u);
}
checkNotNull(resp, "Failed to init client for PD cluster.");
long clusterId = resp.getHeader().getClusterId();
Expand Down Expand Up @@ -666,4 +657,29 @@ public String toString() {
return "[leaderInfo: " + leaderInfo + "]";
}
}

private Metapb.Region decodeRegion(Metapb.Region region) {
final boolean isRawRegion = conf.getKvMode() == KVMode.RAW;
Metapb.Region.Builder builder =
Metapb.Region.newBuilder()
.setId(region.getId())
.setRegionEpoch(region.getRegionEpoch())
.addAllPeers(region.getPeersList());

if (region.getStartKey().isEmpty() || isRawRegion) {
builder.setStartKey(region.getStartKey());
} else {
byte[] decodedStartKey = BytesCodec.readBytes(new CodecDataInput(region.getStartKey()));
builder.setStartKey(ByteString.copyFrom(decodedStartKey));
}

if (region.getEndKey().isEmpty() || isRawRegion) {
builder.setEndKey(region.getEndKey());
} else {
byte[] decodedEndKey = BytesCodec.readBytes(new CodecDataInput(region.getEndKey()));
builder.setEndKey(ByteString.copyFrom(decodedEndKey));
}

return builder.build();
}
}
7 changes: 4 additions & 3 deletions src/main/java/org/tikv/common/ReadOnlyPDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
import com.google.protobuf.ByteString;
import java.util.List;
import org.tikv.common.meta.TiTimestamp;
import org.tikv.common.region.TiRegion;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.Pair;
import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.Metapb.Store;

/** Readonly PD client including only reading related interface Supposed for TiDB-like use cases */
Expand All @@ -37,15 +38,15 @@ public interface ReadOnlyPDClient {
* @param key key in bytes for locating a region
* @return the region whose startKey and endKey range covers the given key
*/
TiRegion getRegionByKey(BackOffer backOffer, ByteString key);
Pair<Metapb.Region, Metapb.Peer> getRegionByKey(BackOffer backOffer, ByteString key);

/**
* Get Region by Region Id
*
* @param id Region Id
* @return the region corresponding to the given Id
*/
TiRegion getRegionByID(BackOffer backOffer, long id);
Pair<Metapb.Region, Metapb.Peer> getRegionByID(BackOffer backOffer, long id);

HostMapping getHostMapping();

Expand Down
9 changes: 9 additions & 0 deletions src/main/java/org/tikv/common/TiConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ private static ReplicaRead getReplicaRead(String key) {
private int grpcHealthCheckTimeout = getInt(TIKV_GRPC_HEALTH_CHECK_TIMEOUT);

private final String networkMappingName = get(TIKV_NETWORK_MAPPING_NAME);
private HostMapping hostMapping = null;

public enum KVMode {
TXN,
Expand Down Expand Up @@ -542,6 +543,14 @@ public String getNetworkMappingName() {
return this.networkMappingName;
}

public HostMapping getHostMapping() {
return hostMapping;
}

public void setHostMapping(HostMapping mapping) {
this.hostMapping = mapping;
}

public boolean getEnableGrpcForward() {
return this.enableGrpcForward;
}
Expand Down
Loading