Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,8 @@ DEFINE_mInt32(memtable_flush_running_count_limit, "2");

DEFINE_Int32(load_process_max_memory_limit_percent, "50"); // 50%

DEFINE_mInt32(thrift_client_open_num_tries, "1");

// If the memory consumption of load jobs exceed load_process_max_memory_limit,
// all load jobs will hang there to wait for memtable flush. We should have a
// soft limit which can trigger the memtable flush for the load channel who
Expand Down
4 changes: 4 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,10 @@ DECLARE_mInt32(memtable_flush_running_count_limit);

DECLARE_Int32(load_process_max_memory_limit_percent); // 50%

// Number of open tries, default 1 means only try to open once.
// Retry the Open num_retries time waiting 100 milliseconds between retries.
DECLARE_mInt32(thrift_client_open_num_tries);

// If the memory consumption of load jobs exceed load_process_max_memory_limit,
// all load jobs will hang there to wait for memtable flush. We should have a
// soft limit which can trigger the memtable flush for the load channel who
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/client_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ Status ClientCacheHelper::_create_client(const TNetworkAddress& hostport,

client_impl->set_conn_timeout(config::thrift_connect_timeout_seconds * 1000);

Status status = client_impl->open();
Status status = client_impl->open_with_retry(config::thrift_client_open_num_tries, 100);

if (!status.ok()) {
*client_key = nullptr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,19 @@ public static String getHostnameByIp(String ip) {
return hostName;
}

public static String getIpByHost(String host) throws UnknownHostException {
InetAddress inetAddress = InetAddress.getByName(host);
return inetAddress.getHostAddress();
public static String getIpByHost(String host, int retryTimes) throws UnknownHostException {
InetAddress inetAddress;
while (true) {
try {
inetAddress = InetAddress.getByName(host);
return inetAddress.getHostAddress();
} catch (UnknownHostException e) {
LOG.warn("NetUtils.getIpByHost failed, hostanme: {}, remaining retryTimes: {}", host, retryTimes);
if (retryTimes-- <= 0) {
throw e;
}
}
}
}

// This is the implementation is inspired by Apache camel project:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,28 +112,23 @@ public void removeProxy(TNetworkAddress address) {
}

private BackendServiceClient getProxy(TNetworkAddress address) throws UnknownHostException {
String realIp = NetUtils.getIpByHost(address.getHostname());
BackendServiceClientExtIp serviceClientExtIp = serviceMap.get(address);
if (serviceClientExtIp != null && serviceClientExtIp.realIp.equals(realIp)
&& serviceClientExtIp.client.isNormalState()) {
if (serviceClientExtIp != null && serviceClientExtIp.client.isNormalState()) {
return serviceClientExtIp.client;
}

// not exist, create one and return.
// not exist or not normal state, create one and return.
BackendServiceClient removedClient = null;
String realIp = NetUtils.getIpByHost(address.getHostname(), 2);
lock.lock();
try {
serviceClientExtIp = serviceMap.get(address);
if (serviceClientExtIp != null && !serviceClientExtIp.realIp.equals(realIp)) {
LOG.warn("Cached ip changed ,before ip: {}, curIp: {}", serviceClientExtIp.realIp, realIp);
serviceMap.remove(address);
removedClient = serviceClientExtIp.client;
serviceClientExtIp = null;
}
if (serviceClientExtIp != null && !serviceClientExtIp.client.isNormalState()) {
// At this point we cannot judge the progress of reconnecting the underlying channel.
// In the worst case, it may take two minutes. But we can't stand the connection refused
// for two minutes, so rebuild the channel directly.
LOG.warn("BackendServiceClient is not normal state, hostname: {}, before ip: {}, curIp: {}",
address.getHostname(), serviceClientExtIp.realIp, realIp);
serviceMap.remove(address);
removedClient = serviceClientExtIp.client;
serviceClientExtIp = null;
Expand All @@ -151,6 +146,40 @@ private BackendServiceClient getProxy(TNetworkAddress address) throws UnknownHos
}
}

public String checkProxyIP(TNetworkAddress address) {
String realIp;
try {
realIp = NetUtils.getIpByHost(address.getHostname(), 0);
} catch (UnknownHostException e) {
return null; // skip
}

BackendServiceClientExtIp serviceClientExtIp = serviceMap.get(address);
if (serviceClientExtIp != null) {
if (!serviceClientExtIp.realIp.equals(realIp)) {
BackendServiceClient removedClient = null;
lock.lock();
try {
serviceClientExtIp = serviceMap.get(address);
if (serviceClientExtIp != null && !serviceClientExtIp.realIp.equals(realIp)) {
LOG.warn("Cached ip changed ,before ip: {}, curIp: {}", serviceClientExtIp.realIp, realIp);
serviceMap.remove(address);
removedClient = serviceClientExtIp.client;
BackendServiceClient client = new BackendServiceClient(address, grpcThreadPool);
serviceMap.put(address, new BackendServiceClientExtIp(realIp, client));
}
} finally {
lock.unlock();
if (removedClient != null) {
removedClient.shutdown();
}
}
}
}

return realIp;
}

public Future<InternalService.PExecPlanFragmentResult> execPlanFragmentsAsync(TNetworkAddress address,
TExecPlanFragmentParamsList paramsList, boolean twoPhaseExecution) throws TException, RpcException {
InternalService.PExecPlanFragmentRequest.Builder builder =
Expand Down
18 changes: 15 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public class Backend implements Writable {
private long id;
@SerializedName("host")
private volatile String host;
@SerializedName("ip")
private volatile String ip;
private String version;

@SerializedName("heartbeatPort")
Expand Down Expand Up @@ -145,6 +147,7 @@ public class Backend implements Writable {

public Backend() {
this.host = "";
this.ip = "";
this.version = "";
this.lastUpdateMs = 0;
this.lastStartTime = 0;
Expand All @@ -162,6 +165,7 @@ public Backend() {
public Backend(long id, String host, int heartbeatPort) {
this.id = id;
this.host = host;
this.ip = "";
this.version = "";
this.heartbeatPort = heartbeatPort;
this.bePort = -1;
Expand Down Expand Up @@ -221,6 +225,10 @@ public String getHost() {
return host;
}

public String getIP() {
return ip;
}

public String getVersion() {
return version;
}
Expand Down Expand Up @@ -326,6 +334,10 @@ public void setHost(String host) {
this.host = host;
}

public void setIP(String ip) {
this.ip = ip;
}

public void setAlive(boolean isAlive) {
this.isAlive.set(isAlive);
}
Expand Down Expand Up @@ -715,9 +727,9 @@ public boolean equals(Object obj) {

@Override
public String toString() {
return "Backend [id=" + id + ", host=" + host + ", heartbeatPort=" + heartbeatPort + ", alive=" + isAlive.get()
+ ", lastStartTime=" + TimeUtils.longToTimeString(lastStartTime) + ", process epoch=" + lastStartTime
+ ", tags: " + tagMap + "]";
return "Backend [id=" + id + ", host=" + host + ", ip=" + ip + ", heartbeatPort=" + heartbeatPort + ", alive="
+ isAlive.get() + ", lastStartTime=" + TimeUtils.longToTimeString(lastStartTime) + ", process epoch="
+ lastStartTime + ", tags: " + tagMap + "]";
}

public String getHealthyStatus() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.persist.HbPackage;
import org.apache.doris.resource.Tag;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FeDiskInfo;
import org.apache.doris.service.FrontendOptions;
Expand Down Expand Up @@ -106,7 +107,12 @@ protected void runAfterCatalogReady() {
List<TFrontendInfo> feInfos = Env.getCurrentEnv().getFrontendInfos();
List<Future<HeartbeatResponse>> hbResponses = Lists.newArrayList();
// send backend heartbeat
BackendServiceProxy proxy = BackendServiceProxy.getInstance();
for (Backend backend : nodeMgr.getIdToBackend().values()) {
String ip = proxy.checkProxyIP(new TNetworkAddress(backend.getHost(), backend.getBrpcPort()));
if (ip != null) {
backend.setIP(ip);
}
BackendHeartbeatHandler handler = new BackendHeartbeatHandler(backend, feInfos);
hbResponses.add(executor.submit(handler));
}
Expand Down Expand Up @@ -219,7 +225,8 @@ public HeartbeatResponse call() {
long backendId = backend.getId();
HeartbeatService.Client client = null;

TNetworkAddress beAddr = new TNetworkAddress(backend.getHost(), backend.getHeartbeatPort());
TNetworkAddress beAddr = new TNetworkAddress(
backend.getIP().equals("") ? backend.getHost() : backend.getIP(), backend.getHeartbeatPort());
boolean ok = false;
try {
TMasterInfo copiedMasterInfo = new TMasterInfo(masterInfo.get());
Expand Down