Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 57 additions & 8 deletions cloud/src/resource-manager/resource_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,28 +157,41 @@ bool ResourceManager::check_cluster_params_valid(const ClusterPB& cluster, std::
std::stringstream ss;
bool no_err = true;
int master_num = 0;
int follower_num = 0;
for (auto& n : cluster.nodes()) {
if (ClusterPB::SQL == cluster.type() && n.has_edit_log_port() && n.edit_log_port() &&
n.has_node_type() &&
(n.node_type() == NodeInfoPB_NodeType_FE_MASTER ||
n.node_type() == NodeInfoPB_NodeType_FE_OBSERVER)) {
n.node_type() == NodeInfoPB_NodeType_FE_OBSERVER ||
n.node_type() == NodeInfoPB_NodeType_FE_FOLLOWER)) {
master_num += n.node_type() == NodeInfoPB_NodeType_FE_MASTER ? 1 : 0;
follower_num += n.node_type() == NodeInfoPB_NodeType_FE_FOLLOWER ? 1 : 0;
continue;
}
if (ClusterPB::COMPUTE == cluster.type() && n.has_heartbeat_port() && n.heartbeat_port()) {
} else if (ClusterPB::COMPUTE == cluster.type() && n.has_heartbeat_port() &&
n.heartbeat_port()) {
continue;
}
ss << "check cluster params failed, node : " << proto_to_json(n);
*err = ss.str();
no_err = false;
break;
}
// ATTN: add_cluster check must have only a master node
// add_node doesn't check it
if (check_master_num && ClusterPB::SQL == cluster.type() && master_num != 1) {

if (check_master_num && ClusterPB::SQL == cluster.type()) {
no_err = false;
ss << "cluster is SQL type, must have only one master node, now master count: "
<< master_num;
if (master_num > 0 && follower_num > 0) {
ss << "cluster is SQL type, and use multi follower mode, cant set master node, master "
"count: "
<< master_num << " follower count: " << follower_num;
} else if (!follower_num && master_num != 1) {
ss << "cluster is SQL type, must have only one master node, now master count: "
<< master_num;
} else {
// followers mode
// 1. followers 2. observers + followers
no_err = true;
ss << "";
}
*err = ss.str();
}
return no_err;
Expand Down Expand Up @@ -618,6 +631,37 @@ std::pair<TxnErrorCode, std::string> ResourceManager::get_instance(std::shared_p
return ec;
}

// check instance pb is valid
bool is_instance_valid(const InstanceInfoPB& instance) {
// check has fe node
for (auto& c : instance.clusters()) {
if (c.has_type() && c.type() == ClusterPB::SQL) {
int master = 0;
int follower = 0;
std::string mode = "multi-followers";
for (auto& n : c.nodes()) {
if (n.node_type() == NodeInfoPB::FE_MASTER) {
mode = "master-observers";
master++;
} else if (n.node_type() == NodeInfoPB::FE_FOLLOWER) {
follower++;
}
}
// if master/observers mode , not have master or have multi master, return false
if (mode == "master-observers" && master != 1) {
return false;
}
// if multi followers mode, not have follower, return false
if (mode == "multi-followers" && !follower) {
return false;
}
return true;
}
}
// check others ...
return true;
}

std::string ResourceManager::modify_nodes(const std::string& instance_id,
const std::vector<NodeInfo>& to_add,
const std::vector<NodeInfo>& to_del) {
Expand Down Expand Up @@ -910,6 +954,11 @@ std::string ResourceManager::modify_nodes(const std::string& instance_id,
}

LOG(INFO) << "instance " << instance_id << " info: " << instance.DebugString();
if (!to_del.empty() && !is_instance_valid(instance)) {
msg = "instance invalid, cant modify, plz check";
LOG(WARNING) << msg;
return msg;
}

InstanceKeyInfo key_info {instance_id};
std::string key;
Expand Down
29 changes: 22 additions & 7 deletions docker/runtime/doris-compose/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,10 @@ def compose(self):

class FE(Node):

def init(self):
super().init()
self.init_is_follower()

def get_add_init_config(self):
cfg = []
if self.cluster.fe_config:
Expand All @@ -397,10 +401,20 @@ def get_add_init_config(self):

return cfg

def init_is_follower(self):
if self.cluster.is_cloud and self.cluster.fe_follower:
with open(self._is_follower_path(), "w") as f:
f.write("true")

def _is_follower_path(self):
return "{}/conf/is_follower".format(self.get_path())

def docker_env(self):
envs = super().docker_env()
if self.cluster.is_cloud:
envs["CLOUD_UNIQUE_ID"] = self.cloud_unique_id()
if os.path.exists(self._is_follower_path()):
envs["IS_FE_FOLLOWER"] = 1
return envs

def cloud_unique_id(self):
Expand Down Expand Up @@ -613,8 +627,8 @@ def expose_sub_dirs(self):
class Cluster(object):

def __init__(self, name, subnet, image, is_cloud, fe_config, be_config,
ms_config, recycle_config, be_disks, be_cluster, reg_be,
coverage_dir, cloud_store_config):
ms_config, recycle_config, fe_follower, be_disks, be_cluster,
reg_be, coverage_dir, cloud_store_config):
self.name = name
self.subnet = subnet
self.image = image
Expand All @@ -623,6 +637,7 @@ def __init__(self, name, subnet, image, is_cloud, fe_config, be_config,
self.be_config = be_config
self.ms_config = ms_config
self.recycle_config = recycle_config
self.fe_follower = fe_follower
self.be_disks = be_disks
self.be_cluster = be_cluster
self.reg_be = reg_be
Expand All @@ -635,8 +650,8 @@ def __init__(self, name, subnet, image, is_cloud, fe_config, be_config,

@staticmethod
def new(name, image, is_cloud, fe_config, be_config, ms_config,
recycle_config, be_disks, be_cluster, reg_be, coverage_dir,
cloud_store_config):
recycle_config, fe_follower, be_disks, be_cluster, reg_be,
coverage_dir, cloud_store_config):
if not os.path.exists(LOCAL_DORIS_PATH):
os.makedirs(LOCAL_DORIS_PATH, exist_ok=True)
os.chmod(LOCAL_DORIS_PATH, 0o777)
Expand All @@ -646,9 +661,9 @@ def new(name, image, is_cloud, fe_config, be_config, ms_config,
os.chmod(lock_file, 0o666)
subnet = gen_subnet_prefix16()
cluster = Cluster(name, subnet, image, is_cloud, fe_config,
be_config, ms_config, recycle_config, be_disks,
be_cluster, reg_be, coverage_dir,
cloud_store_config)
be_config, ms_config, recycle_config,
fe_follower, be_disks, be_cluster, reg_be,
coverage_dir, cloud_store_config)
os.makedirs(cluster.get_path(), exist_ok=True)
os.makedirs(get_status_path(name), exist_ok=True)
cluster._save_meta()
Expand Down
21 changes: 15 additions & 6 deletions docker/runtime/doris-compose/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,13 @@ def add_parser(self, args_parsers):
type=str,
help="Specify recycle configs for doris_cloud.conf. "\
"Example: --recycle-config \"log_level = warn\".")
group1.add_argument(
"--fe-follower",
default=False,
action=self._get_parser_bool_action(True),
help=
"The new added fe is follower but not observer. Only support in cloud mode."
)
group1.add_argument("--be-disks",
nargs="*",
default=["HDD=1"],
Expand Down Expand Up @@ -383,18 +390,20 @@ def run(self, args):
args.add_ms_num = 0
args.add_recycle_num = 0

cluster = CLUSTER.Cluster.new(args.NAME, args.IMAGE, args.cloud,
args.fe_config, args.be_config,
args.ms_config, args.recycle_config,
args.be_disks, args.be_cluster,
args.reg_be, args.coverage_dir,
cloud_store_config)
cluster = CLUSTER.Cluster.new(
args.NAME, args.IMAGE, args.cloud, args.fe_config,
args.be_config, args.ms_config, args.recycle_config,
args.fe_follower, args.be_disks, args.be_cluster, args.reg_be,
args.coverage_dir, cloud_store_config)
LOG.info("Create new cluster {} succ, cluster path is {}".format(
args.NAME, cluster.get_path()))

if args.be_cluster and cluster.is_cloud:
cluster.be_cluster = args.be_cluster

if cluster.is_cloud:
cluster.fe_follower = args.fe_follower

_, related_nodes, _ = get_ids_related_nodes(cluster, args.fe_id,
args.be_id, args.ms_id,
args.recycle_id,
Expand Down
17 changes: 11 additions & 6 deletions docker/runtime/doris-compose/resource/init_fe.sh
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@ fe_daemon() {
done
}

add_cloud_fe() {
start_cloud_fe() {
if [ -f "$REGISTER_FILE" ]; then
fe_daemon &
bash $DORIS_HOME/bin/start_fe.sh --daemon
return
fi

Expand All @@ -96,6 +98,10 @@ add_cloud_fe() {
node_type=FE_OBSERVER
fi

if [ "a$IS_FE_FOLLOWER" == "a1" ]; then
node_type=FE_FOLLOWER
fi

nodes='{
"cloud_unique_id": "'"${CLOUD_UNIQUE_ID}"'",
"ip": "'"${MY_IP}"'",
Expand Down Expand Up @@ -139,6 +145,10 @@ add_cloud_fe() {
fi

touch $REGISTER_FILE

fe_daemon &
bash $DORIS_HOME/bin/start_fe.sh --daemon

if [ "$MY_ID" == "1" ]; then
echo $MY_IP >$MASTER_FE_IP_FILE
fi
Expand Down Expand Up @@ -182,11 +192,6 @@ start_local_fe() {
fi
}

start_cloud_fe() {
add_cloud_fe
bash $DORIS_HOME/bin/start_fe.sh --daemon
}

main() {
trap stop_frontend SIGTERM

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,9 +303,9 @@ private void checkDiffNode(Map<String, ClusterPB> remoteClusterIdToPB,

@Override
protected void runAfterCatalogReady() {
getCloudBackends();
checkCloudBackends();
updateCloudMetrics();
getCloudObserverFes();
checkCloudFes();
}

private void checkFeNodesMapValid() {
Expand Down Expand Up @@ -348,7 +348,7 @@ private void checkFeNodesMapValid() {
}
}

private void getCloudObserverFes() {
private void checkCloudFes() {
Cloud.GetClusterResponse response = cloudSystemInfoService.getCloudCluster(
Config.cloud_sql_server_cluster_name, Config.cloud_sql_server_cluster_id, "");
if (!response.hasStatus() || !response.getStatus().hasCode()
Expand All @@ -371,22 +371,33 @@ private void getCloudObserverFes() {
LOG.debug("get cloud cluster, clusterId={} nodes={}",
Config.cloud_sql_server_cluster_id, cpb.getNodesList());
}
List<Frontend> currentFes = Env.getCurrentEnv().getFrontends(FrontendNodeType.OBSERVER);
List<Frontend> currentFollowers = Env.getCurrentEnv().getFrontends(FrontendNodeType.FOLLOWER);
List<Frontend> currentObservers = Env.getCurrentEnv().getFrontends(FrontendNodeType.OBSERVER);
currentFollowers.addAll(currentObservers);
List<Frontend> currentFes = new ArrayList<>(currentFollowers.stream().collect(Collectors.toMap(
fe -> fe.getHost() + ":" + fe.getEditLogPort(),
fe -> fe,
(existing, replacement) -> existing
)).values());
List<Frontend> toAdd = new ArrayList<>();
List<Frontend> toDel = new ArrayList<>();
List<Cloud.NodeInfoPB> expectedFes = cpb.getNodesList();
diffNodes(toAdd, toDel, () -> {
// memory
Map<String, Frontend> currentMap = new HashMap<>();
String selfNode = Env.getCurrentEnv().getSelfNode().getIdent();
for (Frontend fe : currentFes) {
String endpoint = fe.getHost() + "_" + fe.getEditLogPort();
if (selfNode.equals(endpoint)) {
continue;
}
// add type to map key, for diff
endpoint = endpoint + "_" + fe.getRole();
currentMap.put(endpoint, fe);
}
return currentMap;
}, () -> {
// meta service
Map<String, Frontend> nodeMap = new HashMap<>();
String selfNode = Env.getCurrentEnv().getSelfNode().getIdent();
for (Cloud.NodeInfoPB node : expectedFes) {
Expand All @@ -399,9 +410,18 @@ private void getCloudObserverFes() {
if (selfNode.equals(endpoint)) {
continue;
}
Frontend fe = new Frontend(FrontendNodeType.OBSERVER,
Cloud.NodeInfoPB.NodeType type = node.getNodeType();
// ATTN: just allow to add follower or observer
if (Cloud.NodeInfoPB.NodeType.FE_MASTER.equals(type)) {
LOG.warn("impossible !!!, get fe node {} type equel master from ms", node);
}
FrontendNodeType role = type == Cloud.NodeInfoPB.NodeType.FE_FOLLOWER
? FrontendNodeType.FOLLOWER : FrontendNodeType.OBSERVER;
Frontend fe = new Frontend(role,
CloudEnv.genFeNodeNameFromMeta(host, node.getEditLogPort(),
node.getCtime() * 1000), host, node.getEditLogPort());
// add type to map key, for diff
endpoint = endpoint + "_" + fe.getRole();
nodeMap.put(endpoint, fe);
}
return nodeMap;
Expand All @@ -421,7 +441,7 @@ private void getCloudObserverFes() {
}
}

private void getCloudBackends() {
private void checkCloudBackends() {
Map<String, List<Backend>> clusterIdToBackend = cloudSystemInfoService.getCloudClusterIdToBackend();
//rpc to ms, to get mysql user can use cluster_id
// NOTE: rpc args all empty, use cluster_unique_id to get a instance's all cluster info.
Expand Down Expand Up @@ -474,7 +494,7 @@ private void updateCloudMetrics() {
for (Map.Entry<String, String> entry : clusterNameToId.entrySet()) {
int aliveNum = 0;
List<Backend> bes = clusterIdToBackend.get(entry.getValue());
if (bes == null || bes.size() == 0) {
if (bes == null || bes.isEmpty()) {
LOG.info("cant get be nodes by cluster {}, bes {}", entry, bes);
continue;
}
Expand Down
Loading