Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3042,6 +3042,15 @@ public static int metaServiceRpcRetryTimes() {
@ConfField(mutable = true, description = {"存算分离模式下,当tablet分布的be异常,是否立即映射tablet到新的be上,默认true"})
public static boolean enable_immediate_be_assign = true;

@ConfField(mutable = true, description = {"存算分离模式下是否启用自动启停功能,默认true",
"Whether to enable the automatic start-stop feature in cloud model, default is true."})
public static boolean enable_auto_start_for_cloud_cluster = true;

@ConfField(mutable = true, description = {"存算分离模式下自动启停等待cluster唤醒退避重试次数,默认300次大约5分钟",
"The automatic start-stop wait time for cluster wake-up backoff retry count in the cloud "
+ "model is set to 300 times, which is approximately 5 minutes by default."})
public static int auto_start_wait_to_resume_times = 300;

// ATTN: DONOT add any config not related to cloud mode here
// ATTN: DONOT add any config not related to cloud mode here
// ATTN: DONOT add any config not related to cloud mode here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,19 @@ private void checkDiffNode(Map<String, ClusterPB> remoteClusterIdToPB,
if (LOG.isDebugEnabled()) {
LOG.debug("current cluster status {} {}", currentClusterStatus, newClusterStatus);
}
if (!currentClusterStatus.equals(newClusterStatus)) {
boolean needChange = false;
// ATTN: found bug, In the same cluster, the cluster status in the tags of BE nodes is inconsistent.
// Using a set to collect the cluster statuses from the BE nodes.
Set<String> clusterStatusInMem = new HashSet<>();
for (Backend backend : currentBes) {
String beClusterStatus = backend.getTagMap().get(Tag.CLOUD_CLUSTER_STATUS);
clusterStatusInMem.add(beClusterStatus == null ? "NOT_SET" : beClusterStatus);
}
if (clusterStatusInMem.size() != 1) {
LOG.warn("cluster {}, multi be nodes cluster status inconsistent, fix it {}", cid, clusterStatusInMem);
needChange = true;
}
if (!currentClusterStatus.equals(newClusterStatus) || needChange) {
// cluster's status changed
LOG.info("cluster_status corresponding to cluster_id has been changed,"
+ " cluster_id : {} , current_cluster_status : {}, new_cluster_status :{}",
Expand Down Expand Up @@ -426,8 +438,8 @@ private void checkCloudFes() {
}
return nodeMap;
});
LOG.info("diffFrontends nodes: {}, current: {}, toAdd: {}, toDel: {}",
expectedFes, currentFes, toAdd, toDel);
LOG.info("diffFrontends nodes: {}, current: {}, toAdd: {}, toDel: {}, enable auto start: {}",
expectedFes, currentFes, toAdd, toDel, Config.enable_auto_start_for_cloud_cluster);
if (toAdd.isEmpty() && toDel.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("runAfterCatalogReady getObserverFes nothing todo");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -567,10 +568,18 @@ public String getCloudStatusById(final String clusterId) {
}
}

public Set<String> getClusterStatus(List<Backend> backends) {
// ATTN: found bug, In the same cluster, the cluster status in the tags of BE nodes is inconsistent.
// Using a set to collect the cluster statuses from the BE nodes.
return backends.stream().map(Backend::getCloudClusterStatus).collect(Collectors.toSet());
}

public String getCloudStatusByIdNoLock(final String clusterId) {
return clusterIdToBackend.getOrDefault(clusterId, new ArrayList<>())
.stream().map(Backend::getCloudClusterStatus).findFirst()
.orElse(String.valueOf(Cloud.ClusterStatus.UNKNOWN));
List<Backend> bes = clusterIdToBackend.getOrDefault(clusterId, new ArrayList<>());
Optional<String> hasNormal = bes.stream().map(Backend::getCloudClusterStatus)
.filter(status -> status.equals(String.valueOf(Cloud.ClusterStatus.NORMAL))).findAny();
return hasNormal.orElseGet(() -> bes.stream().map(Backend::getCloudClusterStatus).findFirst()
.orElse(String.valueOf(Cloud.ClusterStatus.NORMAL)));
}

public void updateClusterNameToId(final String newName,
Expand Down Expand Up @@ -949,6 +958,9 @@ public String waitForAutoStart(String clusterName) throws DdlException {
if (Config.isNotCloudMode()) {
return null;
}
if (!Config.enable_auto_start_for_cloud_cluster) {
return null;
}
clusterName = getClusterNameAutoStart(clusterName);
if (Strings.isNullOrEmpty(clusterName)) {
LOG.warn("auto start in cloud mode, but clusterName empty {}", clusterName);
Expand Down Expand Up @@ -999,7 +1011,7 @@ public String waitForAutoStart(String clusterName) throws DdlException {
}
}
// wait 5 mins
int retryTimes = 5 * 60;
int retryTimes = Config.auto_start_wait_to_resume_times < 0 ? 300 : Config.auto_start_wait_to_resume_times;
int retryTime = 0;
StopWatch stopWatch = new StopWatch();
stopWatch.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public Backend(long id, String host, int heartbeatPort) {
}

public String getCloudClusterStatus() {
return tagMap.getOrDefault(Tag.CLOUD_CLUSTER_STATUS, String.valueOf(Cloud.ClusterStatus.UNKNOWN));
return tagMap.getOrDefault(Tag.CLOUD_CLUSTER_STATUS, String.valueOf(Cloud.ClusterStatus.NORMAL));
}

public void setCloudClusterStatus(final String clusterStatus) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ public void setMaster(int clusterId, String token, long epoch) {
// Set cloud_instance_id and meta_service_endpoint even if there are empty
// Be can knowns that fe is working in cloud mode.
// Set the cloud instance ID for cloud deployment identification
tMasterInfo.setCloudInstanceId(Config.cloud_instance_id);
if (!Strings.isNullOrEmpty(Config.cloud_instance_id)) {
tMasterInfo.setCloudInstanceId(Config.cloud_instance_id);
}
// Set the endpoint for the metadata service in cloud mode
tMasterInfo.setMetaServiceEndpoint(Config.meta_service_endpoint);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.awaitility.Awaitility;
import org.apache.doris.regression.util.Http
import static java.util.concurrent.TimeUnit.SECONDS;

suite('test_auto_start_in_cloud', 'multi_cluster') {
suite('test_auto_start_in_cloud', 'multi_cluster, docker') {
if (!isCloudMode()) {
return;
}
Expand Down Expand Up @@ -168,5 +168,29 @@ suite('test_auto_start_in_cloud', 'multi_cluster') {

future1.get()
future2.get()

tag = getCloudBeTagByName(clusterName)
logger.info("tag check = {}", tag)
jsonObject = jsonSlurper.parseText(tag)
String cluster_status = jsonObject.cloud_cluster_status
assertEquals("NORMAL", cluster_status)

// add 1 nodes, check it status NORMAL
cluster.addBackend(1, null)
dockerAwaitUntil(5) {
result = sql """SHOW BACKENDS"""
result.size() == 4
}

def bes = sql_return_maparray "SHOW BACKENDS"
bes.each {
tag = it.Tag
if (!tag.contains(clusterName)) {
return
}
jsonObject = jsonSlurper.parseText(tag)
cluster_status = jsonObject.cloud_cluster_status
assertEquals("NORMAL", cluster_status)
}
}
}