Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions docs/en/docs/admin-manual/config/fe-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -2346,8 +2346,6 @@ Default: 3

Is it possible to dynamically configure: true

Is it a configuration item unique to the Master FE node: true

### `enable_storage_policy`

Whether to enable the Storage Policy feature. This feature allows users to separate hot and cold data. This feature is still under development. Recommended for test environments only.
Expand All @@ -2358,3 +2356,13 @@ Is it possible to dynamically configure: true

Is it a configuration item unique to the Master FE node: true

### `enable_fqdn_mode`

This configuration is mainly used in the k8s cluster environment. When enable_fqdn_mode is true, the name of the pod where the be is located will remain unchanged after reconstruction, while the ip can be changed.

Default: false

Is it possible to dynamically configure: false

Is it a configuration item unique to the Master FE node: true

17 changes: 13 additions & 4 deletions docs/zh-CN/docs/admin-manual/config/fe-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -2259,7 +2259,7 @@ load 标签清理器将每隔 `label_clean_interval_second` 运行一次以清

### backend_rpc_timeout_ms

FE向BE的BackendService发送rpc请求时的超时时间,单位:毫秒。
FE向BE的BackendService发送rpc请求时的超时时间,单位:毫秒。

默认值:60000

Expand All @@ -2278,10 +2278,11 @@ load 标签清理器将每隔 `label_clean_interval_second` 运行一次以清
是否为 Master FE 节点独有的配置项:false


### enable_fqdn_mode

FE向BE的BackendService发送rpc请求时的超时时间,单位:毫秒
此配置用于 k8s 部署环境。当 enable_k8s_detect_container_drift_mode 为 true 时,将允许更改 be 或 broker 的重建 pod的 ip

默认值:60000
默认值: false

是否可以动态配置:false

Expand Down Expand Up @@ -2340,7 +2341,6 @@ load 标签清理器将每隔 `label_clean_interval_second` 运行一次以清

是否为 Master FE 节点独有的配置项:true


### `max_replica_count_when_schema_change`

OlapTable在做schema change时,允许的最大副本数,副本数过大会导致FE OOM。
Expand Down Expand Up @@ -2414,3 +2414,12 @@ hive partition 的最大缓存数量。

是否为 Master FE 节点独有的配置项:true

### `enable_fqdn_mode`

此配置用于 k8s 部署环境。当 enable_fqdn_mode 为 true 时,将允许更改 be 的重建 pod的 ip。

默认值: false

是否可以动态配置:false

是否为 Master FE 节点独有的配置项:true
34 changes: 19 additions & 15 deletions fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.system.SystemInfoService.HostInfo;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
Expand Down Expand Up @@ -125,7 +125,7 @@ public synchronized void process(List<AlterClause> alterClauses, String clusterN
if (!Strings.isNullOrEmpty(destClusterName) && Env.getCurrentEnv().getCluster(destClusterName) == null) {
throw new DdlException("Cluster: " + destClusterName + " does not exist.");
}
Env.getCurrentSystemInfo().addBackends(addBackendClause.getHostPortPairs(), addBackendClause.isFree(),
Env.getCurrentSystemInfo().addBackends(addBackendClause.getHostInfos(), addBackendClause.isFree(),
addBackendClause.getDestCluster(), addBackendClause.getTagMap());
} else if (alterClause instanceof DropBackendClause) {
// drop backend
Expand All @@ -136,7 +136,7 @@ public synchronized void process(List<AlterClause> alterClauses, String clusterN
+ "All data on this backend will be discarded permanently. "
+ "If you insist, use DROPP instead of DROP");
}
Env.getCurrentSystemInfo().dropBackends(dropBackendClause.getHostPortPairs());
Env.getCurrentSystemInfo().dropBackends(dropBackendClause.getHostInfos());
} else if (alterClause instanceof DecommissionBackendClause) {
// decommission
DecommissionBackendClause decommissionBackendClause = (DecommissionBackendClause) alterClause;
Expand Down Expand Up @@ -197,7 +197,7 @@ private boolean checkTablets(Long beId, List<Long> backendTabletIds) {

private List<Backend> checkDecommission(DecommissionBackendClause decommissionBackendClause)
throws DdlException {
return checkDecommission(decommissionBackendClause.getHostPortPairs());
return checkDecommission(decommissionBackendClause.getHostInfos());
}

/*
Expand All @@ -206,15 +206,18 @@ private List<Backend> checkDecommission(DecommissionBackendClause decommissionBa
* 2. after decommission, the remaining backend num should meet the replication num.
* 3. after decommission, The remaining space capacity can store data on decommissioned backends.
*/
public static List<Backend> checkDecommission(List<Pair<String, Integer>> hostPortPairs)
public static List<Backend> checkDecommission(List<HostInfo> hostInfos)
throws DdlException {
SystemInfoService infoService = Env.getCurrentSystemInfo();
List<Backend> decommissionBackends = Lists.newArrayList();
// check if exist
for (Pair<String, Integer> pair : hostPortPairs) {
Backend backend = infoService.getBackendWithHeartbeatPort(pair.first, pair.second);
for (HostInfo hostInfo : hostInfos) {
Backend backend = infoService.getBackendWithHeartbeatPort(hostInfo.getIp(), hostInfo.getHostName(),
hostInfo.getPort());
if (backend == null) {
throw new DdlException("Backend does not exist[" + pair.first + ":" + pair.second + "]");
throw new DdlException("Backend does not exist["
+ (Config.enable_fqdn_mode ? hostInfo.getHostName() : hostInfo.getIp())
+ ":" + hostInfo.getPort() + "]");
}
if (backend.isDecommissioned()) {
// already under decommission, ignore it
Expand All @@ -232,22 +235,23 @@ public static List<Backend> checkDecommission(List<Pair<String, Integer>> hostPo
@Override
public synchronized void cancel(CancelStmt stmt) throws DdlException {
CancelAlterSystemStmt cancelAlterSystemStmt = (CancelAlterSystemStmt) stmt;
cancelAlterSystemStmt.getHostPortPairs();

SystemInfoService infoService = Env.getCurrentSystemInfo();
// check if backends is under decommission
List<Backend> backends = Lists.newArrayList();
List<Pair<String, Integer>> hostPortPairs = cancelAlterSystemStmt.getHostPortPairs();
for (Pair<String, Integer> pair : hostPortPairs) {
List<HostInfo> hostInfos = cancelAlterSystemStmt.getHostInfos();
for (HostInfo hostInfo : hostInfos) {
// check if exist
Backend backend = infoService.getBackendWithHeartbeatPort(pair.first, pair.second);
Backend backend = infoService.getBackendWithHeartbeatPort(hostInfo.getIp(), hostInfo.getHostName(),
hostInfo.getPort());
if (backend == null) {
throw new DdlException("Backend does not exists[" + pair.first + "]");
throw new DdlException("Backend does not exist["
+ (Config.enable_fqdn_mode && hostInfo.getHostName() != null ? hostInfo.getHostName() :
hostInfo.getIp()) + ":" + hostInfo.getPort() + "]");
}

if (!backend.isDecommissioned()) {
// it's ok. just log
LOG.info("backend is not decommissioned[{}]", pair.first);
LOG.info("backend is not decommissioned[{}]", backend.getId());
continue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,19 @@

import org.apache.doris.alter.AlterOpType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Pair;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.system.SystemInfoService.HostInfo;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.commons.lang.NotImplementedException;

import java.util.LinkedList;
import java.util.List;
import java.util.Map;

public class BackendClause extends AlterClause {
protected List<String> hostPorts;
protected List<Pair<String, Integer>> hostPortPairs;
protected List<HostInfo> hostInfos;

public static final String MUTLI_TAG_DISABLED_MSG = "Not support multi tags for Backend now. "
+ "You can set 'enable_multi_tags=true' in fe.conf to enable this feature.";
Expand All @@ -41,21 +41,20 @@ public class BackendClause extends AlterClause {
protected BackendClause(List<String> hostPorts) {
super(AlterOpType.ALTER_OTHER);
this.hostPorts = hostPorts;
this.hostPortPairs = new LinkedList<Pair<String, Integer>>();
this.hostInfos = Lists.newArrayList();
}

public List<Pair<String, Integer>> getHostPortPairs() {
return hostPortPairs;
public List<HostInfo> getHostInfos() {
return hostInfos;
}

@Override
public void analyze(Analyzer analyzer) throws AnalysisException {
for (String hostPort : hostPorts) {
Pair<String, Integer> pair = SystemInfoService.validateHostAndPort(hostPort);
hostPortPairs.add(pair);
HostInfo hostInfo = SystemInfoService.getIpHostAndPort(hostPort, true);
hostInfos.add(hostInfo);
}

Preconditions.checkState(!hostPortPairs.isEmpty());
Preconditions.checkState(!hostInfos.isEmpty());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,37 @@
package org.apache.doris.analysis;

import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.Config;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.system.SystemInfoService.HostInfo;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;

import java.util.LinkedList;
import java.util.List;

public class CancelAlterSystemStmt extends CancelStmt {

protected List<String> hostPorts;
private List<Pair<String, Integer>> hostPortPairs;
private List<HostInfo> hostInfos;

public CancelAlterSystemStmt(List<String> hostPorts) {
this.hostPorts = hostPorts;
this.hostPortPairs = new LinkedList<Pair<String, Integer>>();
this.hostInfos = Lists.newArrayList();
}

public List<Pair<String, Integer>> getHostPortPairs() {
return hostPortPairs;
public List<HostInfo> getHostInfos() {
return hostInfos;
}

@Override
public void analyze(Analyzer analyzer) throws AnalysisException {
for (String hostPort : hostPorts) {
Pair<String, Integer> pair = SystemInfoService.validateHostAndPort(hostPort);
this.hostPortPairs.add(pair);
HostInfo hostInfo = SystemInfoService.getIpHostAndPort(hostPort,
!Config.enable_fqdn_mode);
this.hostInfos.add(hostInfo);
}

Preconditions.checkState(!this.hostPortPairs.isEmpty());
Preconditions.checkState(!this.hostInfos.isEmpty());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@

package org.apache.doris.analysis;

import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.system.SystemInfoService.HostInfo;

import com.google.common.base.Preconditions;

import java.util.List;

public class DropBackendClause extends BackendClause {
Expand All @@ -36,6 +43,20 @@ public boolean isForce() {
return force;
}

@Override
public void analyze(Analyzer analyzer) throws AnalysisException {
if (Config.enable_fqdn_mode) {
for (String hostPort : hostPorts) {
HostInfo hostInfo = SystemInfoService.getIpHostAndPort(hostPort,
!Config.enable_fqdn_mode);
hostInfos.add(hostInfo);
}
Preconditions.checkState(!hostInfos.isEmpty());
} else {
super.analyze(analyzer);
}
}

@Override
public String toSql() {
StringBuilder sb = new StringBuilder();
Expand Down
7 changes: 7 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@
import org.apache.doris.statistics.StatisticsRepository;
import org.apache.doris.statistics.StatisticsTaskScheduler;
import org.apache.doris.system.Backend;
import org.apache.doris.system.FQDNManager;
import org.apache.doris.system.Frontend;
import org.apache.doris.system.HeartbeatMgr;
import org.apache.doris.system.SystemInfoService;
Expand Down Expand Up @@ -446,6 +447,8 @@ public class Env {

private ExternalMetaCacheMgr extMetaCacheMgr;

private FQDNManager fqdnManager;

public List<Frontend> getFrontends(FrontendNodeType nodeType) {
if (nodeType == null) {
// get all
Expand Down Expand Up @@ -647,6 +650,7 @@ private Env(boolean isCheckpointCatalog) {
this.analysisJobScheduler = new AnalysisJobScheduler();
this.statisticsCache = new StatisticsCache();
this.extMetaCacheMgr = new ExternalMetaCacheMgr();
this.fqdnManager = new FQDNManager(systemInfo);
}

public static void destroyCheckpoint() {
Expand Down Expand Up @@ -1431,6 +1435,9 @@ private void startMasterOnlyDaemonThreads() {
this.statisticsJobScheduler.start();
this.statisticsTaskScheduler.start();
new InternalSchemaInitializer().start();
if (Config.enable_fqdn_mode) {
fqdnManager.start();
}
}

// start threads that should running on all FE
Expand Down
8 changes: 8 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -1925,5 +1925,13 @@ public class Config extends ConfigBase {
*/
@ConfField(mutable = true, masterOnly = true)
public static boolean enable_storage_policy = false;

/**
* This config is mainly used in the k8s cluster environment.
* When enable_fqdn_mode is true, the name of the pod where be is located will remain unchanged
* after reconstruction, while the ip can be changed.
*/
@ConfField(mutable = false, masterOnly = true)
public static boolean enable_fqdn_mode = false;
}

Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class FeConstants {

public static int heartbeat_interval_second = 5;
public static int checkpoint_interval_second = 60; // 1 minutes
public static int ip_check_interval_second = 5;

// dpp version
public static String dpp_version = "3_2_0";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,11 @@ public static List<List<String>> getClusterBackendInfos(String clusterName) {
backendInfo.add(backend.getOwnerClusterName());
backendInfo.add(backend.getHost());
if (Strings.isNullOrEmpty(clusterName)) {
backendInfo.add(NetUtils.getHostnameByIp(backend.getHost()));
if (backend.getHostName() != null) {
backendInfo.add(backend.getHostName());
} else {
backendInfo.add(NetUtils.getHostnameByIp(backend.getHost()));
}
backendInfo.add(String.valueOf(backend.getHeartbeatPort()));
backendInfo.add(String.valueOf(backend.getBePort()));
backendInfo.add(String.valueOf(backend.getHttpPort()));
Expand Down
Loading