diff --git a/be/src/agent/heartbeat_server.cpp b/be/src/agent/heartbeat_server.cpp index e981e9165b5e1d..562950b54d4621 100644 --- a/be/src/agent/heartbeat_server.cpp +++ b/be/src/agent/heartbeat_server.cpp @@ -246,36 +246,43 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) { } if (master_info.__isset.meta_service_endpoint != config::is_cloud_mode()) { - return Status::InvalidArgument( + LOG(WARNING) << "Detected mismatch in cloud mode configuration between FE and BE. " + << "FE cloud mode: " + << (master_info.__isset.meta_service_endpoint ? "true" : "false") + << ", BE cloud mode: " << (config::is_cloud_mode() ? "true" : "false"); + return Status::InvalidArgument( "fe and be do not work in same mode, fe cloud mode: {}," " be cloud mode: {}", master_info.__isset.meta_service_endpoint, config::is_cloud_mode()); } - if (master_info.__isset.meta_service_endpoint && config::meta_service_endpoint.empty() && - !master_info.meta_service_endpoint.empty()) { - auto st = config::set_config("meta_service_endpoint", master_info.meta_service_endpoint, - true); - LOG(INFO) << "set config meta_service_endpoing " << master_info.meta_service_endpoint << " " - << st; - } - - if (master_info.__isset.cloud_instance_id) { - if (!config::cloud_instance_id.empty() && - config::cloud_instance_id != master_info.cloud_instance_id) { - return Status::InvalidArgument( - "cloud_instance_id in fe.conf and be.conf are not same, fe: {}, be: {}", - master_info.cloud_instance_id, config::cloud_instance_id); + if (master_info.__isset.meta_service_endpoint) { + if (config::meta_service_endpoint.empty() && !master_info.meta_service_endpoint.empty()) { + auto st = config::set_config("meta_service_endpoint", master_info.meta_service_endpoint, + true); + LOG(INFO) << "set config meta_service_endpoing " << master_info.meta_service_endpoint + << " " << st; } - if (config::cloud_instance_id.empty() && !master_info.cloud_instance_id.empty()) { - auto st = config::set_config("cloud_instance_id", master_info.cloud_instance_id, true); - config::set_cloud_unique_id(master_info.cloud_instance_id); - LOG(INFO) << "set config cloud_instance_id " << master_info.cloud_instance_id << " " - << st; + if (master_info.meta_service_endpoint != config::meta_service_endpoint) { + LOG(WARNING) << "Detected mismatch in meta_service_endpoint configuration between FE " + "and BE. " + << "FE meta_service_endpoint: " << master_info.meta_service_endpoint + << ", BE meta_service_endpoint: " << config::meta_service_endpoint; + return Status::InvalidArgument( + "fe and be do not work in same mode, fe meta_service_endpoint: {}," + " be meta_service_endpoint: {}", + master_info.meta_service_endpoint, config::meta_service_endpoint); } } + if (master_info.__isset.cloud_unique_id && + config::cloud_unique_id != master_info.cloud_unique_id && + config::enable_use_cloud_unique_id_from_fe) { + auto st = config::set_config("cloud_unique_id", master_info.cloud_unique_id, true); + LOG(INFO) << "set config cloud_unique_id " << master_info.cloud_unique_id << " " << st; + } + return Status::OK(); } diff --git a/be/src/cloud/cloud_storage_engine.h b/be/src/cloud/cloud_storage_engine.h index d3a55c3c377276..92d2917a916f6a 100644 --- a/be/src/cloud/cloud_storage_engine.h +++ b/be/src/cloud/cloud_storage_engine.h @@ -76,16 +76,12 @@ class CloudStorageEngine final : public BaseStorageEngine { std::optional get_storage_resource(const std::string& vault_id) { LOG(INFO) << "Getting storage resource for vault_id: " << vault_id; - if (vault_id.empty()) { - if (latest_fs() == nullptr) { - LOG(INFO) << "there is not latest fs"; - return std::nullopt; - } - return StorageResource {latest_fs()}; - } bool synced = false; do { + if (vault_id.empty() && latest_fs() != nullptr) { + return StorageResource {latest_fs()}; + } if (auto storage_resource = doris::get_storage_resource(vault_id); storage_resource) { return storage_resource->first; } diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp index 44f9fa42cae0ba..3a93928fd0b8a9 100644 --- a/be/src/cloud/config.cpp +++ b/be/src/cloud/config.cpp @@ -22,7 +22,6 @@ namespace doris::config { DEFINE_String(deploy_mode, ""); -DEFINE_mString(cloud_instance_id, ""); DEFINE_mString(cloud_unique_id, ""); DEFINE_mString(meta_service_endpoint, ""); DEFINE_Bool(meta_service_use_load_balancer, "false"); @@ -72,10 +71,6 @@ DEFINE_mInt32(remove_expired_tablet_txn_info_interval_seconds, "300"); DEFINE_mInt32(tablet_txn_info_min_expired_seconds, "120"); -void set_cloud_unique_id(std::string instance_id) { - if (cloud_unique_id.empty() && !instance_id.empty()) { - static_cast(set_config("cloud_unique_id", "1:" + instance_id + ":compute", true)); - } -} +DEFINE_mBool(enable_use_cloud_unique_id_from_fe, "true"); } // namespace doris::config diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h index ba20bccbcc7876..eb40cd09e3119e 100644 --- a/be/src/cloud/config.h +++ b/be/src/cloud/config.h @@ -23,15 +23,12 @@ namespace doris::config { DECLARE_String(deploy_mode); // deprecated do not configure directly -DECLARE_mString(cloud_instance_id); DECLARE_mString(cloud_unique_id); static inline bool is_cloud_mode() { return deploy_mode == "cloud" || !cloud_unique_id.empty(); } -void set_cloud_unique_id(std::string instance_id); - // Set the endpoint of meta service. // // If meta services are deployed behind a load balancer, set this config to "host:port" of the load balancer. @@ -107,4 +104,6 @@ DECLARE_mInt32(remove_expired_tablet_txn_info_interval_seconds); DECLARE_mInt32(tablet_txn_info_min_expired_seconds); +DECLARE_mBool(enable_use_cloud_unique_id_from_fe); + } // namespace doris::config diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 88f7289dfbf7d0..0837f14344ec75 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1670,8 +1670,6 @@ bool init(const char* conf_file, bool fill_conf_map, bool must_exist, bool set_t SET_FIELD(it.second, std::vector, fill_conf_map, set_to_default); } - set_cloud_unique_id(cloud_instance_id); - return true; } diff --git a/docker/runtime/doris-compose/cluster.py b/docker/runtime/doris-compose/cluster.py index 0ce12f3c7d4d19..3a2d95ac986bfd 100644 --- a/docker/runtime/doris-compose/cluster.py +++ b/docker/runtime/doris-compose/cluster.py @@ -49,6 +49,8 @@ IP_PART4_SIZE = 200 +CLUSTER_ID = "12345678" + LOG = utils.get_logger() @@ -412,7 +414,7 @@ def get_add_init_config(self): if self.cluster.sql_mode_node_mgr: cfg += [ - "cloud_instance_id = " + self.cloud_instance_id(), + "cluster_id = " + CLUSTER_ID, ] else: cfg += [ @@ -439,9 +441,6 @@ def docker_env(self): def cloud_unique_id(self): return "sql_server_{}".format(self.id) - def cloud_instance_id(self): - return "reg_cloud_instance" - def entrypoint(self): return ["bash", os.path.join(DOCKER_RESOURCE_PATH, "init_fe.sh")] @@ -484,9 +483,9 @@ def get_add_init_config(self): "meta_service_endpoint = {}".format( self.cluster.get_meta_server_addr()), ] - if self.cluster.be_cloud_instanceid: + if self.cluster.be_cluster_id: cfg += [ - "cloud_instance_id = " + self.cloud_instance_id(), + "cluster_id = " + CLUSTER_ID, ] if not self.cluster.sql_mode_node_mgr: cfg += [ @@ -553,9 +552,6 @@ def docker_env(self): def cloud_unique_id(self): return "compute_node_{}".format(self.id) - def cloud_instance_id(self): - return "reg_cloud_instance" - def docker_home_dir(self): return os.path.join(DOCKER_DORIS_PATH, "be") @@ -666,7 +662,7 @@ class Cluster(object): def __init__(self, name, subnet, image, is_cloud, fe_config, be_config, ms_config, recycle_config, fe_follower, be_disks, be_cluster, reg_be, coverage_dir, cloud_store_config, sql_mode_node_mgr, - be_metaservice_endpoint, be_cloud_instanceid): + be_metaservice_endpoint, be_cluster_id): self.name = name self.subnet = subnet self.image = image @@ -687,13 +683,13 @@ def __init__(self, name, subnet, image, is_cloud, fe_config, be_config, } self.sql_mode_node_mgr = sql_mode_node_mgr self.be_metaservice_endpoint = be_metaservice_endpoint - self.be_cloud_instanceid = be_cloud_instanceid + self.be_cluster_id = be_cluster_id @staticmethod def new(name, image, is_cloud, fe_config, be_config, ms_config, recycle_config, fe_follower, be_disks, be_cluster, reg_be, coverage_dir, cloud_store_config, sql_mode_node_mgr, - be_metaservice_endpoint, be_cloud_instanceid): + be_metaservice_endpoint, be_cluster_id): if not os.path.exists(LOCAL_DORIS_PATH): os.makedirs(LOCAL_DORIS_PATH, exist_ok=True) os.chmod(LOCAL_DORIS_PATH, 0o777) @@ -707,7 +703,7 @@ def new(name, image, is_cloud, fe_config, be_config, ms_config, fe_follower, be_disks, be_cluster, reg_be, coverage_dir, cloud_store_config, sql_mode_node_mgr, be_metaservice_endpoint, - be_cloud_instanceid) + be_cluster_id) os.makedirs(cluster.get_path(), exist_ok=True) os.makedirs(get_status_path(name), exist_ok=True) cluster._save_meta() diff --git a/docker/runtime/doris-compose/command.py b/docker/runtime/doris-compose/command.py index e162368bf657e2..3a5afc714ddda3 100644 --- a/docker/runtime/doris-compose/command.py +++ b/docker/runtime/doris-compose/command.py @@ -322,19 +322,19 @@ def add_parser(self, args_parsers): if self._support_boolean_action(): parser.add_argument( - "--be-cloud-instanceid", + "--be-cluster-id", default=True, action=self._get_parser_bool_action(False), help= - "Do not set BE cloud instance ID in conf. Default is False.") + "Do not set BE cluster ID in conf. Default is False.") else: parser.add_argument( - "--no-be-cloud-instanceid", - dest='be_cloud_instanceid', + "--no-be-cluster-id", + dest='be_cluster_id', default=True, action=self._get_parser_bool_action(False), help= - "Do not set BE cloud instance ID in conf. Default is False.") + "Do not set BE cluser ID in conf. Default is False.") parser.add_argument( "--fdb-version", @@ -434,7 +434,7 @@ def run(self, args): args.be_config, args.ms_config, args.recycle_config, args.fe_follower, args.be_disks, args.be_cluster, args.reg_be, args.coverage_dir, cloud_store_config, args.sql_mode_node_mgr, - args.be_metaservice_endpoint, args.be_cloud_instanceid) + args.be_metaservice_endpoint, args.be_cluster_id) LOG.info("Create new cluster {} succ, cluster path is {}".format( args.NAME, cluster.get_path())) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 85a4ded7f67931..af4a4f96d27e80 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2809,16 +2809,12 @@ public class Config extends ConfigBase { public static String deploy_mode = ""; // compatibily with elder version. - // cloud_unique_id is introduced before cloud_instance_id, so it has higher priority. + // cloud_unique_id has higher priority than cluster_id. @ConfField public static String cloud_unique_id = ""; - // If cloud_unique_id is empty, cloud_instance_id works, otherwise cloud_unique_id works. - @ConfField - public static String cloud_instance_id = ""; - public static boolean isCloudMode() { - return deploy_mode.equals("cloud") || !cloud_unique_id.isEmpty() || !cloud_instance_id.isEmpty(); + return deploy_mode.equals("cloud") || !cloud_unique_id.isEmpty(); } public static boolean isNotCloudMode() { @@ -2893,6 +2889,8 @@ public static int metaServiceRpcRetryTimes() { @ConfField public static boolean enable_cloud_snapshot_version = true; + // Interval in seconds for checking the status of compute groups (cloud clusters). + // Compute groups and cloud clusters refer to the same concept. @ConfField public static int cloud_cluster_check_interval_second = 10; diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisLexer.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisLexer.g4 index 9695c34ebbdbbb..dfa0eb8b8c0c85 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisLexer.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisLexer.g4 @@ -153,6 +153,7 @@ COMMITTED: 'COMMITTED'; COMPACT: 'COMPACT'; COMPLETE: 'COMPLETE'; COMPRESS_TYPE: 'COMPRESS_TYPE'; +COMPUTE: 'COMPUTE'; CONDITIONS: 'CONDITIONS'; CONFIG: 'CONFIG'; CONNECTION: 'CONNECTION'; @@ -553,6 +554,7 @@ VARIABLE: 'VARIABLE'; VARIABLES: 'VARIABLES'; VARIANT: 'VARIANT'; VAULT: 'VAULT'; +VAULTS: 'VAULTS'; VERBOSE: 'VERBOSE'; VERSION: 'VERSION'; VIEW: 'VIEW'; diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index e1157b1f432f8d..3c65e935c77388 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -183,8 +183,9 @@ unsupportedOtherStatement | UNINSTALL PLUGIN name=identifierOrText #uninstallPlugin | LOCK TABLES (lockTable (COMMA lockTable)*)? #lockTables | UNLOCK TABLES #unlockTables - | WARM UP CLUSTER destination=identifier WITH - (CLUSTER source=identifier | (warmUpItem (COMMA warmUpItem)*)) FORCE? #warmUpCluster + | WARM UP (CLUSTER | COMPUTE GROUP) destination=identifier WITH + ((CLUSTER | COMPUTE GROUP) source=identifier | + (warmUpItem (COMMA warmUpItem)*)) FORCE? #warmUpCluster | BACKUP SNAPSHOT label=multipartIdentifier TO repo=identifier ((ON | EXCLUDE) LEFT_PAREN baseTableRef (COMMA baseTableRef)* RIGHT_PAREN)? properties=propertyClause? #backup @@ -208,7 +209,7 @@ unsupportedShowStatement | SHOW ROW POLICY (FOR (userIdentify | (ROLE role=identifier)))? #showRowPolicy | SHOW STORAGE POLICY (USING (FOR policy=identifierOrText)?)? #showStoragePolicy | SHOW STAGES #showStages - | SHOW STORAGE VAULT #showStorageVault + | SHOW STORAGE (VAULT | VAULTS) #showStorageVault | SHOW CREATE REPOSITORY FOR identifier #showCreateRepository | SHOW WHITELIST #showWhitelist | SHOW (GLOBAL | SESSION | LOCAL)? VARIABLES wildWhere? #showVariables @@ -307,7 +308,7 @@ unsupportedShowStatement | (FROM tableName=multipartIdentifier (ALL VERBOSE?)?))? #showQueryStats | SHOW BUILD INDEX ((FROM | IN) database=multipartIdentifier)? wildWhere? sortClause? limitClause? #showBuildIndex - | SHOW CLUSTERS #showClusters + | SHOW (CLUSTERS | (COMPUTE GROUPS)) #showClusters | SHOW CONVERT_LSC ((FROM | IN) database=multipartIdentifier)? #showConvertLsc | SHOW REPLICA STATUS FROM baseTableRef wildWhere? #showReplicaStatus | SHOW REPLICA DISTRIBUTION FROM baseTableRef #showREplicaDistribution @@ -495,13 +496,13 @@ unsupportedGrantRevokeStatement : GRANT privilegeList ON multipartIdentifierOrAsterisk TO (userIdentify | ROLE STRING_LITERAL) #grantTablePrivilege | GRANT privilegeList ON - (RESOURCE | CLUSTER | STAGE | STORAGE VAULT | WORKLOAD GROUP) + (RESOURCE | CLUSTER | COMPUTE GROUP | STAGE | STORAGE VAULT | WORKLOAD GROUP) identifierOrTextOrAsterisk TO (userIdentify | ROLE STRING_LITERAL) #grantResourcePrivilege | GRANT roles+=STRING_LITERAL (COMMA roles+=STRING_LITERAL)* TO userIdentify #grantRole | REVOKE privilegeList ON multipartIdentifierOrAsterisk FROM (userIdentify | ROLE STRING_LITERAL) #grantTablePrivilege | REVOKE privilegeList ON - (RESOURCE | CLUSTER | STAGE | STORAGE VAULT | WORKLOAD GROUP) + (RESOURCE | CLUSTER | COMPUTE GROUP | STAGE | STORAGE VAULT | WORKLOAD GROUP) identifierOrTextOrAsterisk FROM (userIdentify | ROLE STRING_LITERAL) #grantResourcePrivilege | REVOKE roles+=STRING_LITERAL (COMMA roles+=STRING_LITERAL)* FROM userIdentify #grantRole ; @@ -1820,6 +1821,7 @@ nonReserved | COMPACT | COMPLETE | COMPRESS_TYPE + | COMPUTE | CONDITIONS | CONFIG | CONNECTION @@ -2082,6 +2084,7 @@ nonReserved | VARIABLES | VARIANT | VAULT + | VAULTS | VERBOSE | VERSION | VIEW diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 29a05856ff3b8e..9422f480bbb6d9 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -319,6 +319,7 @@ terminal String KW_COMPACT, KW_COMPLETE, KW_COMPRESS_TYPE, + KW_COMPUTE, KW_CONFIG, KW_CONNECTION, KW_CONNECTION_ID, @@ -691,7 +692,8 @@ terminal String KW_LINES, KW_IGNORE, KW_CONVERT_LSC, - KW_VAULT; + KW_VAULT, + KW_VAULTS; terminal COMMA, COLON, DOT, DOTDOTDOT, AT, STAR, LPAREN, RPAREN, SEMICOLON, LBRACKET, RBRACKET, LBRACE, RBRACE, DIVIDE, MOD, ADD, SUBTRACT, PLACEHOLDER, ARROW; terminal BITAND, BITOR, BITXOR, BITNOT; @@ -1481,6 +1483,14 @@ warm_up_stmt ::= {: RESULT = new WarmUpClusterStmt(dstClusterName, list, force); :} + | KW_WARM KW_UP KW_COMPUTE KW_GROUP ident:dstClusterName KW_WITH KW_COMPUTE KW_GROUP ident:srcClusterName opt_force:force + {: + RESULT = new WarmUpClusterStmt(dstClusterName, srcClusterName, force); + :} + | KW_WARM KW_UP KW_COMPUTE KW_GROUP ident:dstClusterName KW_WITH warm_up_list:list opt_force:force + {: + RESULT = new WarmUpClusterStmt(dstClusterName, list, force); + :} ; warm_up_item ::= @@ -3050,6 +3060,14 @@ grant_stmt ::= {: RESULT = new GrantStmt(null, role, resourcePattern, privs, ResourceTypeEnum.CLUSTER); :} + | KW_GRANT privilege_list:privs KW_ON KW_COMPUTE KW_GROUP resource_pattern:resourcePattern KW_TO user_identity:userId + {: + RESULT = new GrantStmt(userId, null, resourcePattern, privs, ResourceTypeEnum.CLUSTER); + :} + | KW_GRANT privilege_list:privs KW_ON KW_COMPUTE KW_GROUP resource_pattern:resourcePattern KW_TO KW_ROLE STRING_LITERAL:role + {: + RESULT = new GrantStmt(null, role, resourcePattern, privs, ResourceTypeEnum.CLUSTER); + :} | KW_GRANT privilege_list:privs KW_ON KW_STAGE resource_pattern:resourcePattern KW_TO user_identity:userId {: RESULT = new GrantStmt(userId, null, resourcePattern, privs, ResourceTypeEnum.STAGE); @@ -3154,6 +3172,14 @@ revoke_stmt ::= {: RESULT = new RevokeStmt(null, role, resourcePattern, privs, ResourceTypeEnum.CLUSTER); :} + | KW_REVOKE privilege_list:privs KW_ON KW_COMPUTE KW_GROUP resource_pattern:resourcePattern KW_FROM user_identity:userId + {: + RESULT = new RevokeStmt(userId, null, resourcePattern, privs, ResourceTypeEnum.CLUSTER); + :} + | KW_REVOKE privilege_list:privs KW_ON KW_COMPUTE KW_GROUP resource_pattern:resourcePattern KW_FROM KW_ROLE STRING_LITERAL:role + {: + RESULT = new RevokeStmt(null, role, resourcePattern, privs, ResourceTypeEnum.CLUSTER); + :} | KW_REVOKE privilege_list:privs KW_ON KW_STAGE resource_pattern:resourcePattern KW_FROM user_identity:userId {: RESULT = new RevokeStmt(userId, null, resourcePattern, privs, ResourceTypeEnum.STAGE); @@ -4190,6 +4216,10 @@ show_stmt ::= {: RESULT = new ShowStorageVaultStmt(); :} + | KW_SHOW KW_STORAGE KW_VAULTS + {: + RESULT = new ShowStorageVaultStmt(); + :} ; show_param ::= @@ -4693,7 +4723,12 @@ show_param ::= /* Cloud Cluster */ | KW_CLUSTERS {: - RESULT = new ShowClusterStmt(); + RESULT = new ShowClusterStmt(false); + :} + /* Compute Group */ + | KW_COMPUTE KW_GROUPS + {: + RESULT = new ShowClusterStmt(true); :} | KW_CONVERT_LSC opt_db:db {: @@ -8422,6 +8457,8 @@ keyword ::= {: RESULT = id; :} | KW_CLUSTERS:id {: RESULT = id; :} + | KW_COMPUTE:id + {: RESULT = id; :} | KW_LINK:id {: RESULT = id; :} | KW_MIGRATE:id @@ -8518,6 +8555,8 @@ keyword ::= {: RESULT = id; :} | KW_VAULT:id {: RESULT = id; :} + | KW_VAULTS:id + {: RESULT = id; :} | KW_VARIANT:id {: RESULT = id; :} | KW_IPV4:id diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetUserPropertyVar.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetUserPropertyVar.java index a2f31818b8b689..ccc189343c6463 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetUserPropertyVar.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetUserPropertyVar.java @@ -92,6 +92,13 @@ private void checkAccess(Analyzer analyzer, boolean isSelf) throws AnalysisExcep .getCloudClusterNames().contains(value)) { ErrorReport.reportAnalysisException(ErrorCode.ERR_CLOUD_CLUSTER_ERROR, value); } + + if (key.equals(UserProperty.DEFAULT_COMPUTE_GROUP) + && !Strings.isNullOrEmpty(value) + && !((CloudSystemInfoService) Env.getCurrentSystemInfo()) + .getCloudClusterNames().contains(value)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_CLOUD_CLUSTER_ERROR, value); + } } return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowCacheHotSpotStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowCacheHotSpotStmt.java index 847b015825dc1c..2c14a9f8cbbc62 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowCacheHotSpotStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowCacheHotSpotStmt.java @@ -43,8 +43,8 @@ public class ShowCacheHotSpotStmt extends ShowStmt implements NotFallbackInParser { public static final ShowResultSetMetaData[] RESULT_SET_META_DATAS = { ShowResultSetMetaData.builder() - .addColumn(new Column("cluster_id", ScalarType.createType(PrimitiveType.BIGINT))) - .addColumn(new Column("cluster_name", ScalarType.createVarchar(128))) + .addColumn(new Column("compute_group_id", ScalarType.createType(PrimitiveType.BIGINT))) + .addColumn(new Column("compute_group_name", ScalarType.createVarchar(128))) .addColumn(new Column("table_id", ScalarType.createType(PrimitiveType.BIGINT))) .addColumn(new Column("table_name", ScalarType.createVarchar(128))) .build(), @@ -129,7 +129,8 @@ private String generateQueryString() { + "sum(query_per_week) as query_per_week_total " + "FROM " + TABLE_NAME.toString() + " group by cluster_id, cluster_name, table_id, table_name, insert_day) "); - StringBuilder q2 = new StringBuilder("select cluster_id, cluster_name, " + StringBuilder q2 = new StringBuilder("select cluster_id as compute_group_id, " + + "cluster_name as compute_group_name, " + "table_id, table_name as hot_table_name from (select row_number() " + "over (partition by cluster_id order by insert_day desc, " + "query_per_day_total desc, query_per_week_total desc) as dr2, " diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowCloudWarmUpStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowCloudWarmUpStmt.java index e91e9b7d6fe251..f823aeb9c15636 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowCloudWarmUpStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowCloudWarmUpStmt.java @@ -34,9 +34,9 @@ public class ShowCloudWarmUpStmt extends ShowStmt implements NotFallbackInParser private boolean showAllJobs = false; private long jobId = -1; - private static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() + private static final ImmutableList WARM_UP_JOB_TITLE_NAMES = new ImmutableList.Builder() .add("JobId") - .add("ClusterName") + .add("ComputeGroup") .add("Status") .add("Type") .add("CreateTime") @@ -116,7 +116,7 @@ public String toString() { @Override public ShowResultSetMetaData getMetaData() { ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder(); - for (String title : ShowCloudWarmUpStmt.TITLE_NAMES) { + for (String title : ShowCloudWarmUpStmt.WARM_UP_JOB_TITLE_NAMES) { builder.addColumn(new Column(title, ScalarType.createVarchar(30))); } return builder.build(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowClusterStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowClusterStmt.java index acb6d789f45e59..c29978267a3bcd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowClusterStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowClusterStmt.java @@ -34,10 +34,16 @@ import com.google.common.collect.ImmutableList; public class ShowClusterStmt extends ShowStmt implements NotFallbackInParser { - public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() - .add("cluster").add("is_current").add("users").build(); + public static final ImmutableList CLUSTER_TITLE_NAMES = new ImmutableList.Builder() + .add("cluster").add("is_current").add("users").add("backend_num").build(); - public ShowClusterStmt() { + public static final ImmutableList COMPUTE_GROUP_TITLE_NAMES = new ImmutableList.Builder() + .add("Name").add("IsCurrent").add("Users").add("BackendNum").build(); + + boolean isComputeGroup = true; + + public ShowClusterStmt(boolean isComputeGroup) { + this.isComputeGroup = isComputeGroup; } @Override @@ -45,7 +51,11 @@ public ShowResultSetMetaData getMetaData() { ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder(); ImmutableList titleNames = null; - titleNames = TITLE_NAMES; + if (isComputeGroup) { + titleNames = COMPUTE_GROUP_TITLE_NAMES; + } else { + titleNames = CLUSTER_TITLE_NAMES; + } for (String title : titleNames) { builder.addColumn(new Column(title, ScalarType.createVarchar(128))); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowRolesStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowRolesStmt.java index 1f3f19a6d29202..3f78398aafb80f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowRolesStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowRolesStmt.java @@ -45,6 +45,7 @@ public class ShowRolesStmt extends ShowStmt implements NotFallbackInParser { builder.addColumn(new Column("CloudStagePrivs", ScalarType.createVarchar(300))); builder.addColumn(new Column("StorageVaultPrivs", ScalarType.createVarchar(300))); builder.addColumn(new Column("WorkloadGroupPrivs", ScalarType.createVarchar(300))); + builder.addColumn(new Column("ComputeGroupPrivs", ScalarType.createVarchar(300))); META_DATA = builder.build(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowStorageVaultStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowStorageVaultStmt.java index f6124c4d20184d..6a06a580cec21f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowStorageVaultStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowStorageVaultStmt.java @@ -33,7 +33,7 @@ **/ public class ShowStorageVaultStmt extends ShowStmt implements NotFallbackInParser { - private final String stmt = "SHOW STORAGE VAULT"; + private final String stmt = "SHOW STORAGE VAULTS"; public ShowStorageVaultStmt() { } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 45c13a087b3eaf..31b7daa79731db 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -1151,6 +1151,17 @@ protected boolean isStartFromEmpty() { return !roleFile.exists() && !versionFile.exists(); } + private void getClusterIdFromStorage(Storage storage) throws IOException { + clusterId = storage.getClusterID(); + if (Config.cluster_id != -1 && Config.cluster_id != this.clusterId) { + LOG.warn("Configured cluster_id {} does not match stored cluster_id {}. " + + "This may indicate a configuration error.", + Config.cluster_id, this.clusterId); + throw new IOException("Configured cluster_id does not match stored cluster_id. " + + "Please check your configuration."); + } + } + protected void getClusterIdAndRole() throws IOException { File roleFile = new File(this.imageDir, Storage.ROLE_FILE); File versionFile = new File(this.imageDir, Storage.VERSION_FILE); @@ -1232,7 +1243,7 @@ protected void getClusterIdAndRole() throws IOException { frontends.put(nodeName, self); LOG.info("add self frontend: {}", self); } else { - clusterId = storage.getClusterID(); + getClusterIdFromStorage(storage); if (storage.getToken() == null) { token = Strings.isNullOrEmpty(Config.auth_token) ? Storage.newToken() : Config.auth_token; LOG.info("refresh new token"); @@ -1287,7 +1298,7 @@ protected void getClusterIdAndRole() throws IOException { // NOTE: cluster_id will be init when Storage object is constructed, // so we new one. storage = new Storage(this.imageDir); - clusterId = storage.getClusterID(); + getClusterIdFromStorage(storage); token = storage.getToken(); if (Strings.isNullOrEmpty(token)) { token = Config.auth_token; @@ -1295,7 +1306,7 @@ protected void getClusterIdAndRole() throws IOException { } else { // If the version file exist, read the cluster id and check the // id with helper node to make sure they are identical - clusterId = storage.getClusterID(); + getClusterIdFromStorage(storage); token = storage.getToken(); try { String url = "http://" + NetUtils @@ -2061,7 +2072,7 @@ public boolean hasReplayer() { public void loadImage(String imageDir) throws IOException, DdlException { Storage storage = new Storage(imageDir); - clusterId = storage.getClusterID(); + getClusterIdFromStorage(storage); File curFile = storage.getCurrentImageFile(); if (!curFile.exists()) { // image.0 may not exist @@ -2996,6 +3007,11 @@ public void addFrontend(FrontendNodeType role, String host, int editLogPort) thr } public void addFrontend(FrontendNodeType role, String host, int editLogPort, String nodeName) throws DdlException { + addFrontend(role, host, editLogPort, nodeName, ""); + } + + public void addFrontend(FrontendNodeType role, String host, int editLogPort, String nodeName, String cloudUniqueId) + throws DdlException { if (!tryLock(false)) { throw new DdlException("Failed to acquire env lock. Try again"); } @@ -3026,6 +3042,7 @@ public void addFrontend(FrontendNodeType role, String host, int editLogPort, Str // Only add frontend after removing the conflict nodes, to ensure the exception safety. fe = new Frontend(role, nodeName, host, editLogPort); + fe.setCloudUniqueId(cloudUniqueId); frontends.put(nodeName, fe); LOG.info("add frontend: {}", fe); @@ -3076,7 +3093,6 @@ public void dropFrontend(FrontendNodeType role, String host, int port) throws Dd } public void dropFrontendFromBDBJE(FrontendNodeType role, String host, int port) throws DdlException { - if (port == selfNode.getPort() && feType == FrontendNodeType.MASTER && selfNode.getHost().equals(host)) { throw new DdlException("can not drop current master node."); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVault.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVault.java index fe285b6919db1a..11867bcfb960a6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVault.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVault.java @@ -189,8 +189,8 @@ protected void replaceIfEffectiveValue(Map properties, String ke public static final ShowResultSetMetaData STORAGE_VAULT_META_DATA = ShowResultSetMetaData.builder() - .addColumn(new Column("StorageVaultName", ScalarType.createVarchar(100))) - .addColumn(new Column("StorageVaultId", ScalarType.createVarchar(20))) + .addColumn(new Column("Name", ScalarType.createVarchar(100))) + .addColumn(new Column("Id", ScalarType.createVarchar(20))) .addColumn(new Column("Propeties", ScalarType.createVarchar(65535))) .addColumn(new Column("IsDefault", ScalarType.createVarchar(5))) .build(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java index 0dfcf322a0cbb9..e27339c2aacc14 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java @@ -52,6 +52,8 @@ public class CloudClusterChecker extends MasterDaemon { private CloudSystemInfoService cloudSystemInfoService; + boolean isUpdateCloudUniqueId = false; + public CloudClusterChecker(CloudSystemInfoService cloudSystemInfoService) { super("cloud cluster check", Config.cloud_cluster_check_interval_second * 1000L); this.cloudSystemInfoService = cloudSystemInfoService; @@ -394,6 +396,23 @@ private void checkCloudFes() { List toAdd = new ArrayList<>(); List toDel = new ArrayList<>(); List expectedFes = cpb.getNodesList(); + + if (!isUpdateCloudUniqueId) { + // Just run once and number of fes is small, so iterating is ok. + // newly addde fe has cloudUniqueId. + for (Frontend fe : currentFes) { + for (Cloud.NodeInfoPB node : expectedFes) { + if (fe.getHost().equals(Config.enable_fqdn_mode ? node.getHost() : node.getIp()) + && fe.getEditLogPort() == node.getEditLogPort()) { + fe.setCloudUniqueId(node.getCloudUniqueId()); + LOG.info("update cloud unique id result {}", fe); + break; + } + } + } + isUpdateCloudUniqueId = true; + } + diffNodes(toAdd, toDel, () -> { // memory Map currentMap = new HashMap<>(); @@ -407,6 +426,7 @@ private void checkCloudFes() { endpoint = endpoint + "_" + fe.getRole(); currentMap.put(endpoint, fe); } + LOG.info("fes in memory {}", currentMap); return currentMap; }, () -> { // meta service @@ -425,17 +445,20 @@ private void checkCloudFes() { Cloud.NodeInfoPB.NodeType type = node.getNodeType(); // ATTN: just allow to add follower or observer if (Cloud.NodeInfoPB.NodeType.FE_MASTER.equals(type)) { - LOG.warn("impossible !!!, get fe node {} type equel master from ms", node); + LOG.warn("impossible !!!, get fe node {} type equal master from ms", node); } - FrontendNodeType role = type == Cloud.NodeInfoPB.NodeType.FE_FOLLOWER - ? FrontendNodeType.FOLLOWER : FrontendNodeType.OBSERVER; + FrontendNodeType role = type == Cloud.NodeInfoPB.NodeType.FE_OBSERVER + ? FrontendNodeType.OBSERVER : FrontendNodeType.FOLLOWER; Frontend fe = new Frontend(role, CloudEnv.genFeNodeNameFromMeta(host, node.getEditLogPort(), node.getCtime() * 1000), host, node.getEditLogPort()); + fe.setCloudUniqueId(node.getCloudUniqueId()); // add type to map key, for diff endpoint = endpoint + "_" + fe.getRole(); nodeMap.put(endpoint, fe); } + LOG.info("fes in ms {}", nodeMap); + return nodeMap; }); LOG.info("diffFrontends nodes: {}, current: {}, toAdd: {}, toDel: {}, enable auto start: {}", diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java index e212a7f948ecba..3138bad382a0fb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java @@ -38,9 +38,11 @@ import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; import org.apache.doris.common.io.CountingDataOutputStream; +import org.apache.doris.common.util.NetUtils; import org.apache.doris.ha.FrontendNodeType; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.system.Frontend; import org.apache.doris.system.SystemInfoService.HostInfo; import com.google.common.base.Preconditions; @@ -50,7 +52,6 @@ import java.io.DataInputStream; import java.io.IOException; -import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -71,6 +72,8 @@ public class CloudEnv extends Env { private CleanCopyJobScheduler cleanCopyJobScheduler; + private String cloudInstanceId; + public CloudEnv(boolean isCheckpointCatalog) { super(isCheckpointCatalog); this.cleanCopyJobScheduler = new CleanCopyJobScheduler(); @@ -91,17 +94,32 @@ public CloudUpgradeMgr getCloudUpgradeMgr() { return this.upgradeMgr; } + public String getCloudInstanceId() { + return cloudInstanceId; + } + + private void setCloudInstanceId(String cloudInstanceId) { + this.cloudInstanceId = cloudInstanceId; + } + @Override public void initialize(String[] args) throws Exception { - if (Strings.isNullOrEmpty(Config.cloud_unique_id)) { - if (Strings.isNullOrEmpty(Config.cloud_instance_id)) { - throw new UserException("cloud_instance_id must be specified if deployed in dissaggregated"); - } - LOG.info("cloud_unique_id is not set, setting it using instance_id"); - Config.cloud_unique_id = "1:" + Config.cloud_instance_id + ":sql_server00"; + if (Strings.isNullOrEmpty(Config.cloud_unique_id) && Config.cluster_id == -1) { + throw new UserException("cluster_id must be specified in fe.conf if deployed " + + "in cloud mode, because FE should known to which it belongs"); } - LOG.info("Initializing CloudEnv with cloud_unique_id: {}", Config.cloud_unique_id); + if (Config.cluster_id != -1) { + setCloudInstanceId(String.valueOf(Config.cluster_id)); + } + + if (Strings.isNullOrEmpty(Config.cloud_unique_id) && !Strings.isNullOrEmpty(cloudInstanceId)) { + Config.cloud_unique_id = "1:" + cloudInstanceId + ":fe"; + LOG.info("cloud_unique_id is empty, setting it to: {}", Config.cloud_unique_id); + } + + LOG.info("Initializing CloudEnv with cloud_unique_id: {}, cluster_id: {}, cloudInstanceId: {}", + Config.cloud_unique_id, Config.cluster_id, cloudInstanceId); super.initialize(args); } @@ -162,23 +180,12 @@ private Cloud.NodeInfoPB getLocalTypeFromMetaService() { .stream().filter(NodeInfoPB::hasNodeType).collect(Collectors.toList()); helperNodes.clear(); - if (allNodes.stream().anyMatch(n -> n.getNodeType() == NodeInfoPB.NodeType.FE_FOLLOWER)) { - // multi followers mode, select first - Optional helperNode = allNodes.stream() - .filter(nodeInfoPB -> nodeInfoPB.getNodeType() == NodeInfoPB.NodeType.FE_FOLLOWER) - .map(nodeInfoPB -> new HostInfo( - Config.enable_fqdn_mode ? nodeInfoPB.getHost() : nodeInfoPB.getIp(), nodeInfoPB.getEditLogPort())) - .min(Comparator.comparing(HostInfo::getHost)); - helperNode.ifPresent(hostInfo -> helperNodes.add(hostInfo)); - } else { - // master observers mode - // helper node select follower's first, just one - helperNodes.addAll(allNodes.stream() - .filter(nodeInfoPB -> nodeInfoPB.getNodeType() == NodeInfoPB.NodeType.FE_MASTER) - .map(nodeInfoPB -> new HostInfo( - Config.enable_fqdn_mode ? nodeInfoPB.getHost() : nodeInfoPB.getIp(), nodeInfoPB.getEditLogPort())) - .collect(Collectors.toList())); - // check only have one master node. + Optional firstNonObserverNode = allNodes.stream().findFirst(); + if (firstNonObserverNode.isPresent()) { + helperNodes.add(new HostInfo( + Config.enable_fqdn_mode ? firstNonObserverNode.get().getHost() + : firstNonObserverNode.get().getIp(), + firstNonObserverNode.get().getEditLogPort())); } Preconditions.checkState(helperNodes.size() == 1); @@ -187,14 +194,11 @@ private Cloud.NodeInfoPB getLocalTypeFromMetaService() { return local.orElse(null); } - private void tryAddMyselToMS() { + private void tryAddMyselfToMS() { try { try { - if (Strings.isNullOrEmpty(Config.cloud_instance_id)) { - throw new DdlException("unable to create instance due to empty cloud_instance_id"); - } - getCloudSystemInfoService().tryCreateInstance(Config.cloud_instance_id, - Config.cloud_instance_id, false); + getCloudSystemInfoService().tryCreateInstance(getCloudInstanceId(), + getCloudInstanceId(), false); } catch (Exception e) { return; } @@ -219,7 +223,7 @@ protected void getClusterIdAndRole() throws IOException { LOG.warn("failed to get local fe's type, sleep {} s, try again.", Config.resource_not_ready_sleep_seconds); if (isStartFromEmpty()) { - tryAddMyselToMS(); + tryAddMyselfToMS(); } try { Thread.sleep(Config.resource_not_ready_sleep_seconds * 1000); @@ -228,10 +232,20 @@ protected void getClusterIdAndRole() throws IOException { } continue; } + type = nodeInfoPB.getNodeType(); break; } + try { + String instanceId; + instanceId = getCloudSystemInfoService().getInstanceId(Config.cloud_unique_id); + setCloudInstanceId(instanceId); + } catch (IOException e) { + LOG.error("Failed to get instance ID from cloud_unique_id: {}", Config.cloud_unique_id, e); + throw e; + } + LOG.info("current fe's role is {}", type == NodeInfoPB.NodeType.FE_MASTER ? "MASTER" : type == NodeInfoPB.NodeType.FE_FOLLOWER ? "FOLLOWER" : type == NodeInfoPB.NodeType.FE_OBSERVER ? "OBSERVER" : "UNKNOWN"); @@ -399,11 +413,25 @@ public void dropFrontend(FrontendNodeType role, String host, int port) throws Dd throw new DdlException("can not drop current master node."); } - getCloudSystemInfoService().dropFrontend(role, host, port); + Frontend frontend = checkFeExist(host, port); + if (frontend == null) { + throw new DdlException("Frontend does not exist."); + } + + if (frontend.getRole() != role) { + throw new DdlException(role.toString() + " does not exist[" + NetUtils + .getHostPortInAccessibleFormat(host, port) + "]"); + } + + if (Strings.isNullOrEmpty(frontend.getCloudUniqueId())) { + throw new DdlException("Frontend does not have a cloudUniqueId, wait for a minute."); + } + + getCloudSystemInfoService().dropFrontend(frontend); } @Override public void modifyFrontendHostName(String srcHost, int srcPort, String destHost) throws DdlException { - throw new DdlException("modify frontend host name is not supported in cloud mode"); + throw new DdlException("Modifying frontend hostname is not supported in cloud mode"); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java index c1c58f7b898ca8..0b8c0c553d5248 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java @@ -184,7 +184,7 @@ protected Partition createPartitionWithIndices(long dbId, OlapTable tbl, long pa requestBuilder.setDbId(dbId); LOG.info("create tablets, dbId: {}, tableId: {}, tableName: {}, partitionId: {}, partitionName: {}, " - + "indexId: {}, vault name {}", + + "indexId: {}, vault name: {}", dbId, tbl.getId(), tbl.getName(), partitionId, partitionName, indexId, storageVaultName); Cloud.CreateTabletsResponse resp = sendCreateTabletsRpc(requestBuilder); // If the resp has no vault id set, it means the MS is running with enable_storage_vault false diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java index a91892870d67a3..dbd582c21ea6fc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java @@ -17,6 +17,8 @@ package org.apache.doris.cloud.system; +import org.apache.doris.analysis.ModifyBackendClause; +import org.apache.doris.analysis.ModifyBackendHostNameClause; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.ReplicaAllocation; import org.apache.doris.cloud.catalog.CloudEnv; @@ -50,6 +52,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.IOException; import java.util.ArrayList; import java.util.Comparator; import java.util.HashSet; @@ -303,7 +306,7 @@ public synchronized void updateFrontends(List toAdd, List to } try { Env.getCurrentEnv().addFrontend(fe.getRole(), - fe.getHost(), fe.getEditLogPort(), fe.getNodeName()); + fe.getHost(), fe.getEditLogPort(), fe.getNodeName(), fe.getCloudUniqueId()); LOG.info("added cloud frontend={} ", fe); } catch (DdlException e) { LOG.warn("failed to add cloud frontend={} ", fe); @@ -311,19 +314,20 @@ public synchronized void updateFrontends(List toAdd, List to } } - private void alterBackendCluster(List hostInfos, String clusterId, + private void alterBackendCluster(List hostInfos, String computeGroupId, String cloudUniqueId, Cloud.AlterClusterRequest.Operation operation) throws DdlException { - if (Strings.isNullOrEmpty(Config.cloud_instance_id)) { + if (Strings.isNullOrEmpty(((CloudEnv) Env.getCurrentEnv()).getCloudInstanceId())) { throw new DdlException("unable to alter backends due to empty cloud_instance_id"); } // Issue rpc to meta to alter node, then fe master would add this node to its frontends Cloud.ClusterPB clusterPB = Cloud.ClusterPB.newBuilder() - .setClusterId(clusterId) + .setClusterId(computeGroupId) .setType(Cloud.ClusterPB.Type.COMPUTE) .build(); for (HostInfo hostInfo : hostInfos) { Cloud.NodeInfoPB nodeInfoPB = Cloud.NodeInfoPB.newBuilder() + .setCloudUniqueId(cloudUniqueId) .setIp(hostInfo.getHost()) .setHost(hostInfo.getHost()) .setHeartbeatPort(hostInfo.getPort()) @@ -333,7 +337,7 @@ private void alterBackendCluster(List hostInfos, String clusterId, } Cloud.AlterClusterRequest request = Cloud.AlterClusterRequest.newBuilder() - .setInstanceId(Config.cloud_instance_id) + .setInstanceId(((CloudEnv) Env.getCurrentEnv()).getCloudInstanceId()) .setOp(operation) .setCluster(clusterPB) .build(); @@ -360,13 +364,18 @@ private void alterBackendCluster(List hostInfos, String clusterId, public void addBackends(List hostInfos, Map tagMap) throws UserException { // issue rpc to meta to add this node, then fe master would add this node to its backends - String clusterName = tagMap.getOrDefault(Tag.CLOUD_CLUSTER_NAME, Tag.VALUE_DEFAULT_CLOUD_CLUSTER_NAME); + String clusterName = tagMap.getOrDefault(Tag.COMPUTE_GROUP_NAME, Tag.VALUE_DEFAULT_COMPUTE_GROUP_NAME); if (clusterName.isEmpty()) { - throw new UserException("clusterName empty"); + throw new UserException("ComputeGroup'name can not be empty"); } - String clusterId = tryCreateCluster(clusterName, RandomIdentifierGenerator.generateRandomIdentifier(8)); - alterBackendCluster(hostInfos, clusterId, Cloud.AlterClusterRequest.Operation.ADD_NODE); + String computeGroupId = tryCreateComputeGroup(clusterName, + RandomIdentifierGenerator.generateRandomIdentifier(8)); + String instanceId = Config.cluster_id == -1 ? ((CloudEnv) Env.getCurrentEnv()).getCloudInstanceId() + : String.valueOf(Config.cluster_id); + + String cloudUniqueId = "1:" + instanceId + ":" + RandomIdentifierGenerator.generateRandomIdentifier(8); + alterBackendCluster(hostInfos, computeGroupId, cloudUniqueId, Cloud.AlterClusterRequest.Operation.ADD_NODE); } // final entry of dropping backend @@ -379,28 +388,32 @@ public void dropBackend(String host, int heartbeatPort) throws DdlException { throw new DdlException("backend does not exists[" + NetUtils .getHostPortInAccessibleFormat(host, heartbeatPort) + "]"); } - String clusterId = droppedBackend.getTagMap().get(Tag.CLOUD_CLUSTER_ID); - if (clusterId == null || clusterId.isEmpty()) { + String computeGroupId = droppedBackend.getTagMap().get(Tag.CLOUD_CLUSTER_ID); + if (computeGroupId == null || computeGroupId.isEmpty()) { throw new DdlException("Failed to get cluster ID for backend: " + droppedBackend.getId()); } List hostInfos = new ArrayList<>(); hostInfos.add(new HostInfo(host, heartbeatPort)); - alterBackendCluster(hostInfos, clusterId, Cloud.AlterClusterRequest.Operation.DROP_NODE); + String cloudUniqueId = droppedBackend.getCloudUniqueId(); + alterBackendCluster(hostInfos, computeGroupId, cloudUniqueId, + Cloud.AlterClusterRequest.Operation.DROP_NODE); } @Override public void decommissionBackend(Backend backend) throws UserException { - String clusterId = backend.getTagMap().get(Tag.CLOUD_CLUSTER_ID); - if (clusterId == null || clusterId.isEmpty()) { + String computeGroupId = backend.getTagMap().get(Tag.CLOUD_CLUSTER_ID); + if (computeGroupId == null || computeGroupId.isEmpty()) { throw new UserException("Failed to get cluster ID for backend: " + backend.getId()); } List hostInfos = new ArrayList<>(); hostInfos.add(new HostInfo(backend.getHost(), backend.getHeartbeatPort())); try { - alterBackendCluster(hostInfos, clusterId, Cloud.AlterClusterRequest.Operation.DECOMMISSION_NODE); + String cloudUniqueId = backend.getCloudUniqueId(); + alterBackendCluster(hostInfos, computeGroupId, cloudUniqueId, + Cloud.AlterClusterRequest.Operation.DECOMMISSION_NODE); } catch (DdlException e) { String errorMessage = e.getMessage(); LOG.warn("Failed to decommission backend: {}", errorMessage); @@ -408,6 +421,16 @@ public void decommissionBackend(Backend backend) throws UserException { } } + @Override + public void modifyBackends(ModifyBackendClause alterClause) throws UserException { + throw new UserException("Modifying backends is not supported in cloud mode"); + } + + @Override + public void modifyBackendHost(ModifyBackendHostNameClause clause) throws UserException { + throw new UserException("Modifying backend hostname is not supported in cloud mode"); + } + @Override public void replayAddBackend(Backend newBackend) { super.replayAddBackend(newBackend); @@ -702,6 +725,7 @@ public String addCloudCluster(final String clusterName, final String userName) t List backends = new ArrayList<>(); for (Cloud.NodeInfoPB node : cpb.getNodesList()) { Map newTagMap = Tag.DEFAULT_BACKEND_TAG.toMap(); + newTagMap.put(Tag.CLOUD_CLUSTER_NAME, clusterNameMeta); newTagMap.put(Tag.CLOUD_CLUSTER_ID, clusterId); newTagMap.put(Tag.CLOUD_CLUSTER_STATUS, String.valueOf(clusterStatus)); @@ -773,18 +797,29 @@ public Map getCloudClusterNameToId() { // FrontendCluster = SqlServerCluster private void alterFrontendCluster(FrontendNodeType role, String host, int editLogPort, - Cloud.AlterClusterRequest.Operation op) throws DdlException { - if (Strings.isNullOrEmpty(Config.cloud_instance_id)) { + String cloudUnqiueID, Cloud.AlterClusterRequest.Operation op) throws DdlException { + if (Strings.isNullOrEmpty(((CloudEnv) Env.getCurrentEnv()).getCloudInstanceId())) { throw new DdlException("unable to alter frontend due to empty cloud_instance_id"); } + Cloud.NodeInfoPB.NodeType nodeType; + if (role == FrontendNodeType.MASTER) { + nodeType = Cloud.NodeInfoPB.NodeType.FE_MASTER; + } else if (role == FrontendNodeType.FOLLOWER) { + nodeType = Cloud.NodeInfoPB.NodeType.FE_FOLLOWER; + } else if (role == FrontendNodeType.OBSERVER) { + nodeType = Cloud.NodeInfoPB.NodeType.FE_OBSERVER; + } else { + throw new DdlException("unable to alter frontend due to invalid role"); + } + // Issue rpc to meta to add this node, then fe master would add this node to its frontends Cloud.NodeInfoPB nodeInfoPB = Cloud.NodeInfoPB.newBuilder() + .setCloudUniqueId(cloudUnqiueID) .setIp(host) .setHost(host) .setEditLogPort(editLogPort) - .setNodeType(role == FrontendNodeType.MASTER ? Cloud.NodeInfoPB.NodeType.FE_MASTER - : Cloud.NodeInfoPB.NodeType.FE_OBSERVER) + .setNodeType(nodeType) .setCtime(System.currentTimeMillis() / 1000) .build(); @@ -796,7 +831,7 @@ private void alterFrontendCluster(FrontendNodeType role, String host, int editLo .build(); Cloud.AlterClusterRequest request = Cloud.AlterClusterRequest.newBuilder() - .setInstanceId(Config.cloud_instance_id) + .setInstanceId(((CloudEnv) Env.getCurrentEnv()).getCloudInstanceId()) .setOp(op) .setCluster(clusterPB) .build(); @@ -819,20 +854,21 @@ public void addFrontend(FrontendNodeType role, String host, int editLogPort) thr Cloud.AlterClusterRequest.Operation op; op = role == FrontendNodeType.MASTER ? Cloud.AlterClusterRequest.Operation.ADD_CLUSTER : Cloud.AlterClusterRequest.Operation.ADD_NODE; - alterFrontendCluster(role, host, editLogPort, op); + alterFrontendCluster(role, host, editLogPort, Config.cloud_unique_id, op); } - public void dropFrontend(FrontendNodeType role, String host, int editLogPort) throws DdlException { - alterFrontendCluster(role, host, editLogPort, Cloud.AlterClusterRequest.Operation.DROP_NODE); + public void dropFrontend(Frontend frontend) throws DdlException { + alterFrontendCluster(frontend.getRole(), frontend.getHost(), frontend.getEditLogPort(), + frontend.getCloudUniqueId(), Cloud.AlterClusterRequest.Operation.DROP_NODE); } - private String tryCreateCluster(String clusterName, String clusterId) throws UserException { - if (Strings.isNullOrEmpty(Config.cloud_instance_id)) { - throw new DdlException("unable to create cluster due to empty cloud_instance_id"); + private String tryCreateComputeGroup(String clusterName, String computeGroupId) throws UserException { + if (Strings.isNullOrEmpty(((CloudEnv) Env.getCurrentEnv()).getCloudInstanceId())) { + throw new DdlException("unable to create compute group due to empty cluster_id"); } Cloud.ClusterPB clusterPB = Cloud.ClusterPB.newBuilder() - .setClusterId(clusterId) + .setClusterId(computeGroupId) .setClusterName(clusterName) .setType(Cloud.ClusterPB.Type.COMPUTE) .build(); @@ -855,7 +891,7 @@ private String tryCreateCluster(String clusterName, String clusterId) throws Use } if (response.getStatus().getCode() == Cloud.MetaServiceCode.OK) { - return clusterId; + return computeGroupId; } else if (response.getStatus().getCode() == Cloud.MetaServiceCode.ALREADY_EXISTED) { Cloud.GetClusterResponse clusterResponse = getCloudCluster(clusterName, "", ""); if (clusterResponse.getStatus().getCode() == Cloud.MetaServiceCode.OK) { @@ -1075,4 +1111,24 @@ public void tryCreateInstance(String instanceId, String name, boolean sseEnabled throw new DdlException("Failed to create instance"); } } + + public String getInstanceId(String cloudUniqueId) throws IOException { + Cloud.GetInstanceRequest.Builder builder = Cloud.GetInstanceRequest.newBuilder(); + builder.setCloudUniqueId(cloudUniqueId); + + Cloud.GetInstanceResponse response; + try { + Cloud.GetInstanceRequest request = builder.build(); + response = MetaServiceProxy.getInstance().getInstance(request); + LOG.info("get instance info, request: {}, response: {}", request, response); + if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) { + LOG.warn("Failed to get instance info, response: {}", response); + throw new IOException("Failed to get instance info"); + } + return response.getInstance().getInstanceId(); + } catch (RpcException e) { + LOG.warn("Failed to get instance info {}", cloudUniqueId, e); + throw new IOException("Failed to get instance info"); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java b/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java index c65116dcc8310b..68e15e7ee46e81 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java @@ -1228,9 +1228,9 @@ public enum ErrorCode { "There can only be one stmt that returns the result and it is at the end."), ERR_CLOUD_CLUSTER_ERROR(5098, new byte[]{'4', '2', '0', '0', '0'}, - "Cluster %s not exist, use SQL 'SHOW CLUSTERS' to get a valid cluster"), + "Compute group (aka. Cloud cluster) %s not exist, use SQL 'SHOW COMPUTE GROUPS' to get a valid compute group"), - ERR_NO_CLUSTER_ERROR(5099, new byte[]{'4', '2', '0', '0', '0'}, "No cluster selected"), + ERR_NO_CLUSTER_ERROR(5099, new byte[]{'4', '2', '0', '0', '0'}, "No compute group (cloud cluster) selected"), ERR_NOT_CLOUD_MODE(6000, new byte[]{'4', '2', '0', '0', '0'}, "Command only support in cloud mode."); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/AuthProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/AuthProcDir.java index b05284d59a0538..0fa5b5d5b4179b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/AuthProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/AuthProcDir.java @@ -33,7 +33,7 @@ public class AuthProcDir implements ProcDirInterface { public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() .add("UserIdentity").add("Comment").add("Password").add("Roles").add("GlobalPrivs").add("CatalogPrivs") .add("DatabasePrivs").add("TablePrivs").add("ColPrivs").add("ResourcePrivs").add("CloudClusterPrivs") - .add("CloudStagePrivs").add("StorageVaultPrivs").add("WorkloadGroupPrivs") + .add("CloudStagePrivs").add("StorageVaultPrivs").add("WorkloadGroupPrivs").add("ComputeGroupPrivs") .build(); private Auth auth; diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/ClusterAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/ClusterAction.java index a5c915e0bbc48f..95934d31dee5c9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/ClusterAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/ClusterAction.java @@ -42,7 +42,7 @@ * Used to return the cluster information for the manager. */ @RestController -@RequestMapping("/rest/v2/manager/cluster") +@RequestMapping(path = {"/rest/v2/manager/cluster", "/rest/v2/manager/compute_group"}) public class ClusterAction extends RestBaseController { // Returns mysql and http connection information for the cluster. @@ -54,7 +54,7 @@ public class ClusterAction extends RestBaseController { // "" // ] // } - @RequestMapping(path = "/cluster_info/conn_info", method = RequestMethod.GET) + @RequestMapping(path = {"/cluster_info/conn_info", "/compute_group_info/conn_info"}, method = RequestMethod.GET) public Object clusterInfo(HttpServletRequest request, HttpServletResponse response) { executeCheckPassword(request, response); checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN); @@ -85,7 +85,8 @@ public static class BeClusterInfo { public volatile long lastFragmentUpdateTime = 0; } - @RequestMapping(path = "/cluster_info/cloud_cluster_status", method = RequestMethod.GET) + @RequestMapping(path = {"/cluster_info/cloud_cluster_status", "/compute_group_info/compute_group_status"}, + method = RequestMethod.GET) public Object cloudClusterInfo(HttpServletRequest request, HttpServletResponse response) { executeCheckPassword(request, response); checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java index 36070d20173271..6642b28424362b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java @@ -1436,6 +1436,13 @@ private void getUserAuthInfo(List> userAuthInfos, UserIdentity user userAuthInfo.add(Joiner.on("; ").join(workloadGroupPrivs)); } + // compute groups + if (cloudClusterPrivs.isEmpty()) { + userAuthInfo.add(FeConstants.null_string); + } else { + userAuthInfo.add(Joiner.on("; ").join(cloudClusterPrivs)); + } + userAuthInfos.add(userAuthInfo); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/RoleManager.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/RoleManager.java index 6b215982c7d926..f1db573eac5073 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/RoleManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/RoleManager.java @@ -199,8 +199,10 @@ public void getRoleInfo(List> results) { }, (s1, s2) -> s1 + " " + s2 )); + // METADATA in ShowRolesStmt, the 2nd CLUSTER is for compute group. Stream.of(PrivLevel.GLOBAL, PrivLevel.CATALOG, PrivLevel.DATABASE, PrivLevel.TABLE, PrivLevel.RESOURCE, - PrivLevel.CLUSTER, PrivLevel.STAGE, PrivLevel.STORAGE_VAULT, PrivLevel.WORKLOAD_GROUP) + PrivLevel.CLUSTER, PrivLevel.STAGE, PrivLevel.STORAGE_VAULT, PrivLevel.WORKLOAD_GROUP, + PrivLevel.CLUSTER) .forEach(level -> { String infoItem = infoMap.get(level); if (Strings.isNullOrEmpty(infoItem)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java index ccc21b58660b35..176e0f25801043 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java @@ -84,6 +84,7 @@ public class UserProperty implements Writable { public static final String PROP_WORKLOAD_GROUP = "default_workload_group"; public static final String DEFAULT_CLOUD_CLUSTER = "default_cloud_cluster"; + public static final String DEFAULT_COMPUTE_GROUP = "default_compute_group"; // for system user public static final Set ADVANCED_PROPERTIES = Sets.newHashSet(); @@ -142,6 +143,7 @@ public class UserProperty implements Writable { Pattern.CASE_INSENSITIVE)); COMMON_PROPERTIES.add(Pattern.compile("^" + PROP_WORKLOAD_GROUP + "$", Pattern.CASE_INSENSITIVE)); COMMON_PROPERTIES.add(Pattern.compile("^" + DEFAULT_CLOUD_CLUSTER + "$", Pattern.CASE_INSENSITIVE)); + COMMON_PROPERTIES.add(Pattern.compile("^" + DEFAULT_COMPUTE_GROUP + "$", Pattern.CASE_INSENSITIVE)); } public UserProperty() { @@ -263,6 +265,15 @@ public void update(List> properties, boolean isReplay) thro value = ""; } newDefaultCloudCluster = value; + } else if (keyArr[0].equalsIgnoreCase(DEFAULT_COMPUTE_GROUP)) { + // set property "DEFAULT_CLOUD_CLUSTER" = "cluster1" + if (keyArr.length != 1) { + throw new DdlException(DEFAULT_COMPUTE_GROUP + " format error"); + } + if (value == null) { + value = ""; + } + newDefaultCloudCluster = value; } else if (keyArr[0].equalsIgnoreCase(PROP_MAX_QUERY_INSTANCES)) { // set property "max_query_instances" = "1000" if (keyArr.length != 1) { @@ -536,6 +547,13 @@ public List> fetchProperty() { result.add(Lists.newArrayList(DEFAULT_CLOUD_CLUSTER, "")); } + // default cloud cluster + if (defaultCloudCluster != null) { + result.add(Lists.newArrayList(DEFAULT_COMPUTE_GROUP, defaultCloudCluster)); + } else { + result.add(Lists.newArrayList(DEFAULT_COMPUTE_GROUP, "")); + } + for (Map.Entry entry : clusterToDppConfig.entrySet()) { String cluster = entry.getKey(); DppConfig dppConfig = entry.getValue(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index fa81825d370bc0..4100ed63c28873 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -1157,10 +1157,10 @@ public static String cloudNoBackendsReason() { StringBuilder sb = new StringBuilder(); if (ConnectContext.get() != null) { String clusterName = ConnectContext.get().getCloudCluster(); - String hits = "or you may not have permission to access the current cluster = "; + String hits = "or you may not have permission to access the current compute group = "; sb.append(" "); if (Strings.isNullOrEmpty(clusterName)) { - return sb.append(hits).append("cluster name empty").toString(); + return sb.append(hits).append("compute group name empty").toString(); } String clusterStatus = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) .getCloudStatusByName(clusterName); @@ -1193,12 +1193,12 @@ public CloudClusterResult getCloudClusterByPolicy() { // valid r = new CloudClusterResult(defaultCloudCluster, CloudClusterResult.Comment.FOUND_BY_DEFAULT_CLUSTER); - LOG.info("use default cluster {}", defaultCloudCluster); + LOG.info("use default compute group {}", defaultCloudCluster); } else { // invalid r = new CloudClusterResult(defaultCloudCluster, CloudClusterResult.Comment.DEFAULT_CLUSTER_SET_BUT_NOT_EXIST); - LOG.warn("default cluster {} current invalid, please change it", r); + LOG.warn("default compute group {} current invalid, please change it", r); } return r; } @@ -1214,7 +1214,7 @@ public CloudClusterResult getCloudClusterByPolicy() { .getBackendsByClusterName(cloudClusterName); AtomicBoolean hasAliveBe = new AtomicBoolean(false); bes.stream().filter(Backend::isAlive).findAny().ifPresent(backend -> { - LOG.debug("get a clusterName {}, it's has more than one alive be {}", cloudCluster, backend); + LOG.debug("get a compute group {}, it's has more than one alive be {}", cloudCluster, backend); hasAliveBe.set(true); }); if (hasAliveBe.get()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index c2170f90d5716e..be90023bf27d75 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -823,8 +823,12 @@ private void handleShowCluster() throws AnalysisException { PrivPredicate.of(PrivBitSet.of(Privilege.ADMIN_PRIV), Operator.OR))) { users.removeIf(user -> !user.equals(ClusterNamespace.getNameFromFullName(ctx.getQualifiedUser()))); } + String result = Joiner.on(", ").join(users); row.add(result); + int backendNum = ((CloudSystemInfoService) Env.getCurrentEnv().getCurrentSystemInfo()) + .getBackendsByClusterName(clusterName).size(); + row.add(String.valueOf(backendNum)); rows.add(row); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java b/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java index b88cf84282f9e2..7d6a18829cf3a4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java @@ -73,7 +73,9 @@ public class Tag implements Writable { public static final String CLOUD_CLUSTER_PRIVATE_ENDPOINT = "cloud_cluster_private_endpoint"; public static final String CLOUD_CLUSTER_STATUS = "cloud_cluster_status"; - public static final String VALUE_DEFAULT_CLOUD_CLUSTER_NAME = "default_cluster"; + public static final String COMPUTE_GROUP_NAME = "compute_group_name"; + + public static final String VALUE_DEFAULT_COMPUTE_GROUP_NAME = "default_compute_group"; public static final String WORKLOAD_GROUP = "workload_group"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java index 01bf800e97ebb8..93a8ecdec6a279 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java @@ -925,8 +925,28 @@ public TNetworkAddress getArrowFlightAddress() { return new TNetworkAddress(getHost(), getArrowFlightSqlPort()); } + // Only used for users, we hide and rename some internal tags. public String getTagMapString() { - return "{" + new PrintableMap<>(tagMap, ":", true, false).toString() + "}"; + Map displayTagMap = Maps.newHashMap(); + displayTagMap.putAll(tagMap); + + if (displayTagMap.containsKey("cloud_cluster_public_endpoint")) { + displayTagMap.put("public_endpoint", displayTagMap.remove("cloud_cluster_public_endpoint")); + } + if (displayTagMap.containsKey("cloud_cluster_private_endpoint")) { + displayTagMap.put("private_endpoint", displayTagMap.remove("cloud_cluster_private_endpoint")); + } + if (displayTagMap.containsKey("cloud_cluster_status")) { + displayTagMap.put("compute_group_status", displayTagMap.remove("cloud_cluster_status")); + } + if (displayTagMap.containsKey("cloud_cluster_id")) { + displayTagMap.put("compute_group_id", displayTagMap.remove("cloud_cluster_id")); + } + if (displayTagMap.containsKey("cloud_cluster_name")) { + displayTagMap.put("compute_group_name", displayTagMap.remove("cloud_cluster_name")); + } + + return "{" + new PrintableMap<>(displayTagMap, ":", true, false).toString() + "}"; } public Long getPublishTaskLastTimeAccumulated() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java index 4f4bdbb7a43530..76fc67c5ac6c61 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java @@ -47,6 +47,9 @@ public class Frontend implements Writable { // used for getIpByHostname @SerializedName("editLogPort") private int editLogPort; + @SerializedName("cloudUniqueId") + private String cloudUniqueId; + private String version; private int queryPort; @@ -141,6 +144,14 @@ public List getDiskInfos() { return diskInfos; } + public void setCloudUniqueId(String cloudUniqueId) { + this.cloudUniqueId = cloudUniqueId; + } + + public String getCloudUniqueId() { + return cloudUniqueId; + } + /** * handle Frontend's heartbeat response. Because the replayed journal id is very likely to be * changed at each heartbeat response, so we simple return true if the heartbeat status is OK. diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java index ee85a2422399c7..3fc09b31f2d312 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -96,12 +96,6 @@ public void setMaster(int clusterId, String token, long epoch) { long flags = heartbeatFlags.getHeartbeatFlags(); tMasterInfo.setHeartbeatFlags(flags); if (Config.isCloudMode()) { - // Set cloud_instance_id and meta_service_endpoint even if there are empty - // Be can knowns that fe is working in cloud mode. - // Set the cloud instance ID for cloud deployment identification - if (!Strings.isNullOrEmpty(Config.cloud_instance_id)) { - tMasterInfo.setCloudInstanceId(Config.cloud_instance_id); - } // Set the endpoint for the metadata service in cloud mode tMasterInfo.setMetaServiceEndpoint(Config.meta_service_endpoint); } @@ -256,6 +250,10 @@ public HeartbeatResponse call() { copiedMasterInfo.setHeartbeatFlags(flags); copiedMasterInfo.setBackendId(backendId); copiedMasterInfo.setFrontendInfos(feInfos); + if (Config.isCloudMode()) { + String cloudUniqueId = backend.getTagMap().get(Tag.CLOUD_UNIQUE_ID); + copiedMasterInfo.setCloudUniqueId(cloudUniqueId); + } THeartbeatResult result; if (!FeConstants.runningUnitTest) { client = ClientPool.backendHeartbeatPool.borrowObject(beAddr); diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex b/fe/fe-core/src/main/jflex/sql_scanner.flex index be16c0a6a00440..1863fc877cbcd9 100644 --- a/fe/fe-core/src/main/jflex/sql_scanner.flex +++ b/fe/fe-core/src/main/jflex/sql_scanner.flex @@ -158,6 +158,7 @@ import org.apache.doris.qe.SqlModeHelper; keywordMap.put("committed", new Integer(SqlParserSymbols.KW_COMMITTED)); keywordMap.put("compact", new Integer(SqlParserSymbols.KW_COMPACT)); keywordMap.put("complete", new Integer(SqlParserSymbols.KW_COMPLETE)); + keywordMap.put("compute", new Integer(SqlParserSymbols.KW_COMPUTE)); keywordMap.put("config", new Integer(SqlParserSymbols.KW_CONFIG)); keywordMap.put("connection", new Integer(SqlParserSymbols.KW_CONNECTION)); keywordMap.put("connection_id", new Integer(SqlParserSymbols.KW_CONNECTION_ID)); @@ -456,6 +457,7 @@ import org.apache.doris.qe.SqlModeHelper; keywordMap.put("stop", new Integer(SqlParserSymbols.KW_STOP)); keywordMap.put("storage", new Integer(SqlParserSymbols.KW_STORAGE)); keywordMap.put("vault", new Integer(SqlParserSymbols.KW_VAULT)); + keywordMap.put("vaults", new Integer(SqlParserSymbols.KW_VAULTS)); keywordMap.put("stream", new Integer(SqlParserSymbols.KW_STREAM)); keywordMap.put("streaming", new Integer(SqlParserSymbols.KW_STREAMING)); keywordMap.put("string", new Integer(SqlParserSymbols.KW_STRING)); diff --git a/fe/fe-core/src/test/java/org/apache/doris/mysql/privilege/AuthTest.java b/fe/fe-core/src/test/java/org/apache/doris/mysql/privilege/AuthTest.java index 1c7c2a6c655bff..e7f81c31a64363 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/mysql/privilege/AuthTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/mysql/privilege/AuthTest.java @@ -2478,7 +2478,7 @@ public void testShowRoles() { String name = row.get(0); if (role.equals(name)) { findWgPriv = true; - String wgPriv = row.get(row.size() - 1); + String wgPriv = row.get(row.size() - 2); Assert.assertTrue("test_wg: Usage_priv".equals(wgPriv)); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java index 207bddae1b30a6..850d8b27b062af 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java @@ -280,7 +280,7 @@ public void test() throws Exception { Assert.assertEquals(1000000, execMemLimit); List> userProps = Env.getCurrentEnv().getAuth().getUserProperties(Auth.ROOT_USER); - Assert.assertEquals(12, userProps.size()); + Assert.assertEquals(13, userProps.size()); // now : // be1 be2 be3 ==>tag1; diff --git a/gensrc/thrift/HeartbeatService.thrift b/gensrc/thrift/HeartbeatService.thrift index 7c94bc9ce0c871..c03f04a6543f22 100644 --- a/gensrc/thrift/HeartbeatService.thrift +++ b/gensrc/thrift/HeartbeatService.thrift @@ -40,7 +40,7 @@ struct TMasterInfo { 8: optional i64 backend_id 9: optional list frontend_infos 10: optional string meta_service_endpoint; - 11: optional string cloud_instance_id; + 11: optional string cloud_unique_id; } struct TBackendInfo { diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index 5ad40f9df51260..f1d3dd06cb192c 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -66,6 +66,7 @@ import java.util.concurrent.ExecutorService import java.util.concurrent.Executors import java.util.concurrent.Future import java.util.concurrent.ThreadFactory +import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean import java.util.stream.Collectors import java.util.stream.LongStream @@ -833,6 +834,11 @@ class Suite implements GroovyInterceptable { return s3Url } + String getJdbcPassword() { + String sk = context.config.otherConfigs.get("jdbcPassword"); + return sk + } + static void scpFiles(String username, String host, String files, String filePath, boolean fromDst=true) { String cmd = "scp -o StrictHostKeyChecking=no -r ${username}@${host}:${files} ${filePath}" if (!fromDst) { @@ -937,8 +943,13 @@ class Suite implements GroovyInterceptable { return sql_return_maparray("show frontends").collect { it.Host + ":" + it.HttpPort }; } + List getFrontendIpEditlogPort() { + return sql_return_maparray("show frontends").collect { it.Host + ":" + it.EditLogPort }; + } + void getBackendIpHttpPort(Map backendId_to_backendIP, Map backendId_to_backendHttpPort) { List> backends = sql("show backends"); + logger.info("Content of backends: ${backends}") for (List backend : backends) { backendId_to_backendIP.put(String.valueOf(backend[0]), String.valueOf(backend[1])); backendId_to_backendHttpPort.put(String.valueOf(backend[0]), String.valueOf(backend[4])); @@ -946,6 +957,18 @@ class Suite implements GroovyInterceptable { return; } + void getBackendIpHeartbeatPort(Map backendId_to_backendIP, + Map backendId_to_backendHeartbeatPort) { + List> backends = sql("show backends"); + logger.info("Content of backends: ${backends}") + for (List backend : backends) { + backendId_to_backendIP.put(String.valueOf(backend[0]), String.valueOf(backend[1])); + backendId_to_backendHeartbeatPort.put(String.valueOf(backend[0]), String.valueOf(backend[2])); + } + return; + } + + void getBackendIpHttpAndBrpcPort(Map backendId_to_backendIP, Map backendId_to_backendHttpPort, Map backendId_to_backendBrpcPort) { @@ -1470,6 +1493,71 @@ class Suite implements GroovyInterceptable { } } + void waitAddFeFinished(String host, int port) { + logger.info("waiting for ${host}:${port}") + Awaitility.await().atMost(60, TimeUnit.SECONDS).with().pollDelay(100, TimeUnit.MILLISECONDS).and() + .pollInterval(100, TimeUnit.MILLISECONDS).await().until(() -> { + def frontends = getFrontendIpEditlogPort() + logger.info("frontends ${frontends}") + boolean matched = false + String expcetedFE = "${host}:${port}" + for (frontend: frontends) { + logger.info("checking fe ${frontend}, expectedFe ${expcetedFE}") + if (frontend.equals(expcetedFE)) { + matched = true; + } + } + return matched; + }); + } + + void waitDropFeFinished(String host, int port) { + Awaitility.await().atMost(60, TimeUnit.SECONDS).with().pollDelay(100, TimeUnit.MILLISECONDS).and() + .pollInterval(100, TimeUnit.MILLISECONDS).await().until(() -> { + def frontends = getFrontendIpEditlogPort() + boolean matched = false + for (frontend: frontends) { + if (frontend == "$host:$port") { + matched = true + } + } + return !matched; + }); + } + + void waitAddBeFinished(String host, int port) { + logger.info("waiting ${host}:${port} added"); + Awaitility.await().atMost(60, TimeUnit.SECONDS).with().pollDelay(100, TimeUnit.MILLISECONDS).and() + .pollInterval(100, TimeUnit.MILLISECONDS).await().until(() -> { + def ipList = [:] + def portList = [:] + getBackendIpHeartbeatPort(ipList, portList) + boolean matched = false + ipList.each { beid, ip -> + if (ip.equals(host) && ((portList[beid] as int) == port)) { + matched = true; + } + } + return matched; + }); + } + + void waitDropBeFinished(String host, int port) { + Awaitility.await().atMost(60, TimeUnit.SECONDS).with().pollDelay(100, TimeUnit.MILLISECONDS).and() + .pollInterval(100, TimeUnit.MILLISECONDS).await().until(() -> { + def ipList = [:] + def portList = [:] + getBackendIpHeartbeatPort(ipList, portList) + boolean matched = false + ipList.each { beid, ip -> + if (ip == host && portList[beid] as int == port) { + matched = true; + } + } + return !matched; + }); + } + void waiteCreateTableFinished(String tableName) { Thread.sleep(2000); String showCreateTable = "SHOW CREATE TABLE ${tableName}" @@ -1769,7 +1857,7 @@ class Suite implements GroovyInterceptable { drop_cluster_api.call(js) { respCode, body -> - log.info("dorp cluster resp: ${body} ${respCode}".toString()) + log.info("drop cluster resp: ${body} ${respCode}".toString()) def json = parseJson(body) assertTrue(json.code.equalsIgnoreCase("OK") || json.code.equalsIgnoreCase("ALREADY_EXISTED")) } diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy index 8e4c46a130ae9a..862f437840eccd 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy @@ -39,7 +39,7 @@ class ClusterOptions { Boolean sqlModeNodeMgr = false Boolean beMetaServiceEndpoint = true - Boolean beCloudInstanceId = false + Boolean beClusterId = false int waitTimeout = 180 @@ -322,8 +322,8 @@ class SuiteCluster { if (!options.beMetaServiceEndpoint) { cmd += ['--no-be-metaservice-endpoint'] } - if (!options.beCloudInstanceId) { - cmd += ['--no-be-cloud-instanceid'] + if (!options.beClusterId) { + cmd += ['--no-be-cluster-id'] } cmd += ['--wait-timeout', String.valueOf(options.waitTimeout)] diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteContext.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteContext.groovy index d9268643a444bb..fe875af8a47dc0 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteContext.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteContext.groovy @@ -296,7 +296,7 @@ class SuiteContext implements Closeable { public T connect(String user, String password, String url, Closure actionSupplier) { def originConnection = threadLocalConn.get() try { - log.info("Create new connection for user '${user}'") + log.info("Create new connection for user '${user}' to '${url}'") return DriverManager.getConnection(url, user, password).withCloseable { newConn -> def newConnInfo = new ConnectionInfo() newConnInfo.conn = newConn @@ -306,7 +306,7 @@ class SuiteContext implements Closeable { return actionSupplier.call() } } finally { - log.info("Recover original connection") + log.info("Recover original connection to '${url}'") if (originConnection == null) { threadLocalConn.remove() } else { diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/DebugPoint.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/DebugPoint.groovy index 8c764eb453d017..7386d896ac3555 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/DebugPoint.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/DebugPoint.groovy @@ -103,6 +103,10 @@ class DebugPoint { def enableDebugPointForAllBEs(String name, Map params = null) { operateDebugPointForAllBEs({ host, port -> logger.info("enable debug point ${name} with params ${params} for BE $host:$port") + if (port == -1) { + logger.info("skip for BE $host:$port") + return + } enableDebugPoint(host, port, NodeType.BE, name, params) }) } @@ -111,6 +115,10 @@ class DebugPoint { def disableDebugPointForAllBEs(String name) { operateDebugPointForAllBEs { host, port -> logger.info("disable debug point ${name} for BE $host:$port") + if (port == -1) { + logger.info("skip for BE $host:$port") + return + } disableDebugPoint(host, port, NodeType.BE, name) } } @@ -119,6 +127,10 @@ class DebugPoint { def clearDebugPointsForAllBEs() { operateDebugPointForAllBEs { host, port -> logger.info("clear debug point for BE $host:$port") + if (port == -1) { + logger.info("skip for BE $host:$port") + return + } clearDebugPoints(host, port, NodeType.BE) } } @@ -137,6 +149,10 @@ class DebugPoint { def enableDebugPointForAllFEs(String name, Map params = null) { operateDebugPointForAllFEs({ host, port -> logger.info("enable debug point ${name} with params ${params} for FE $host:$port") + if (port == -1) { + logger.info("skip for FE $host:$port") + return + } enableDebugPoint(host, port, NodeType.FE, name, params) }) } @@ -144,6 +160,10 @@ class DebugPoint { def disableDebugPointForAllFEs(String name) { operateDebugPointForAllFEs { host, port -> logger.info("disable debug point ${name} for FE $host:$port") + if (port == -1) { + logger.info("skip for FE $host:$port") + return + } disableDebugPoint(host, port, NodeType.FE, name) } } @@ -151,6 +171,10 @@ class DebugPoint { def clearDebugPointsForAllFEs() { operateDebugPointForAllFEs { host, port -> logger.info("clear debug point for FE $host:$port") + if (port == -1) { + logger.info("skip for FE $host:$port") + return + } clearDebugPoints(host, port, NodeType.FE) } } diff --git a/regression-test/suites/account_p0/test_nereids_row_policy.groovy b/regression-test/suites/account_p0/test_nereids_row_policy.groovy index 6ae858997b11a0..987a4861ae918c 100644 --- a/regression-test/suites/account_p0/test_nereids_row_policy.groovy +++ b/regression-test/suites/account_p0/test_nereids_row_policy.groovy @@ -21,11 +21,8 @@ suite("test_nereids_row_policy") { def user='row_policy_user' def tokens = context.config.jdbcUrl.split('/') def url=tokens[0] + "//" + tokens[2] + "/" + dbName + "?" - def isCloudMode = { - def ret = sql_return_maparray """show backends""" - ret.Tag[0].contains("cloud_cluster_name") - } - def cloudMode = isCloudMode.call() + + def cloudMode = isCloudMode() //cloud-mode if (cloudMode) { def clusters = sql " SHOW CLUSTERS; " diff --git a/regression-test/suites/cloud_p0/auth/test_grant_revoke_cluster_to_user.groovy b/regression-test/suites/cloud_p0/auth/test_grant_revoke_cluster_to_user.groovy index ab9660c5891337..9cd752bdff5547 100644 --- a/regression-test/suites/cloud_p0/auth/test_grant_revoke_cluster_to_user.groovy +++ b/regression-test/suites/cloud_p0/auth/test_grant_revoke_cluster_to_user.groovy @@ -107,7 +107,7 @@ suite("test_grant_revoke_cluster_to_user", "cloud_auth") { connect(user = "${user3}", password = 'Cloud12345', url = context.config.jdbcUrl) { test { sql """select * from ${db}.${tbl}""" - exception "or you may not have permission to access the current cluster" + exception "or you may not have permission to access the current compute group" } } @@ -135,7 +135,7 @@ suite("test_grant_revoke_cluster_to_user", "cloud_auth") { connect(user = "${user1}", password = 'Cloud12345', url = context.config.jdbcUrl) { test { sql """use @${cluster1}""" - exception "Cluster ${cluster1} not exist" + exception "${cluster1} not exist" } result = sql_return_maparray """show grants for '${user1}'""" commonAuth result, "'${user1}'@'%'", "Yes", "admin", "Admin_priv" diff --git a/regression-test/suites/cloud_p0/auth/test_grant_revoke_compute_group_to_user.groovy b/regression-test/suites/cloud_p0/auth/test_grant_revoke_compute_group_to_user.groovy new file mode 100644 index 00000000000000..a57b9e9a07f427 --- /dev/null +++ b/regression-test/suites/cloud_p0/auth/test_grant_revoke_compute_group_to_user.groovy @@ -0,0 +1,299 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_grant_revoke_compute_group_to_user", "cloud_auth") { + if (!isCloudMode()) { + log.info("not cloud mode just return") + return + } + def role = "admin" + def user1 = "regression_test_compute_group_user1" + def user2 = "regression_test_compute_group_user2" + def user3 = "regression_test_compute_group_user3" + def tbl = "test_auth_compute_group_tbl" + + def logAndExecuteSql = { sqlStatement -> + log.info("Executing SQL: ${sqlStatement}") + return sql(sqlStatement) + } + + logAndExecuteSql """drop user if exists ${user1}""" + logAndExecuteSql """drop user if exists ${user2}""" + logAndExecuteSql """drop user if exists ${user3}""" + logAndExecuteSql """drop table if exists ${tbl}""" + + def getCluster = { group -> + def result = sql " SHOW COMPUTE GROUPS; " + for (int i = 0; i < result.size(); i++) { + if (result[i][0] == group) { + return result[i] + } + } + return null + } + + def commonAuth = { result, UserIdentity, Password, Roles, GlobalPrivs -> + assertEquals(UserIdentity as String, result.UserIdentity[0] as String) + assertEquals(Password as String, result.Password[0] as String) + assertEquals(Roles as String, result.Roles[0] as String) + assertEquals(GlobalPrivs as String, result.GlobalPrivs[0] as String) + } + + def getProperty = { property, user -> + def result = null + if (user == "") { + result = sql_return_maparray """SHOW PROPERTY""" + } else { + result = sql_return_maparray """SHOW PROPERTY FOR '${user}'""" + } + result.find { + it.Key == property as String + } + } + + def groups = sql " SHOW COMPUTE GROUPS; " + logger.info("compute groups {}", groups); + assertTrue(!groups.isEmpty()) + def validCluster = groups[0][0] + + // 1. change user + // ${user1} admin role + logAndExecuteSql """create user ${user1} identified by 'Cloud12345' default role 'admin'""" + result = sql_return_maparray """show grants for '${user1}'""" + commonAuth result, "'${user1}'@'%'" as String, "Yes", "admin", "Admin_priv" + assertNull(result.ComputeGroupPrivs[0]) + + + // ${user2} not admin role + logAndExecuteSql """create user ${user2} identified by 'Cloud12345'""" + logAndExecuteSql """GRANT USAGE_PRIV ON COMPUTE GROUP '${validCluster}' TO '${user2}'""" + // for use default_group:regression_test + logAndExecuteSql """grant select_priv on *.*.* to ${user2}""" + + + logAndExecuteSql """ + CREATE TABLE ${tbl} ( + `k1` int(11) NULL, + `k2` char(5) NULL + ) + DUPLICATE KEY(`k1`, `k2`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ( + "replication_num"="1" + ); + """ + + logAndExecuteSql """ + insert into ${tbl} (k1, k2) values (1, "10"); + """ + + logAndExecuteSql """create user ${user3} identified by 'Cloud12345'""" + logAndExecuteSql """GRANT SELECT_PRIV ON *.*.* TO '${user3}'@'%'""" + result = connect(user = "${user3}", password = 'Cloud12345', url = context.config.jdbcUrl) { + sql """SHOW COMPUTE GROUPS""" + } + // not grant any group to user3 + assertTrue(result.isEmpty()) + def db = context.dbName + + connect(user = "${user3}", password = 'Cloud12345', url = context.config.jdbcUrl) { + test { + sql """select * from ${db}.${tbl}""" + exception "or you may not have permission to access the current compute group" + } + } + + // 2. grant group + def group1 = "groupA" + def result + + logAndExecuteSql "sync" + + // admin role user can grant group to use + result = connect(user = "${user1}", password = 'Cloud12345', url = context.config.jdbcUrl) { + sql """GRANT USAGE_PRIV ON COMPUTE GROUP '${group1}' TO '${user1}'""" + } + + // case run user(default root), and show grant again, should be same result + result = sql_return_maparray """show grants for '${user1}'""" + commonAuth result, "'${user1}'@'%'" as String, "Yes", "admin", "Admin_priv" + assertTrue((result.ComputeGroupPrivs as String).contains("${group1}: Cluster_usage_priv")) + + logAndExecuteSql """GRANT USAGE_PRIV ON COMPUTE GROUP '${group1}' TO '${user1}'""" + result = sql_return_maparray """show grants for '${user1}'""" + commonAuth result, "'${user1}'@'%'" as String, "Yes", "admin", "Admin_priv" + assertTrue((result.ComputeGroupPrivs as String).contains("${group1}: Cluster_usage_priv")) + + connect(user = "${user1}", password = 'Cloud12345', url = context.config.jdbcUrl) { + test { + sql """use @${group1}""" + exception "${group1} not exist" + } + result = sql_return_maparray """show grants for '${user1}'""" + commonAuth result, "'${user1}'@'%'", "Yes", "admin", "Admin_priv" + assertTrue((result.ComputeGroupPrivs as String).contains("${group1}: Cluster_usage_priv")) + } + + + logAndExecuteSql """GRANT USAGE_PRIV ON COMPUTE GROUP '${group1}' TO '${user2}'""" + try { + result = connect(user = "${user2}", password = 'Cloud12345', url = context.config.jdbcUrl) { + sql """GRANT USAGE_PRIV ON COMPUTE GROUP '${group1}' TO '${user1}'""" + } + } catch (Exception e) { + assertTrue(e.getMessage().contains("Access denied; you need all [Grant_priv, Cluster_usage_priv] privilege(s) for this operation"), e.getMessage()) + } + logAndExecuteSql """REVOKE USAGE_PRIV ON COMPUTE GROUP '${group1}' FROM '${user2}'""" + + // default compute group + logAndExecuteSql """SET PROPERTY FOR '${user1}' 'default_compute_group' = '${validCluster}'""" + logAndExecuteSql """SET PROPERTY FOR '${user2}' 'default_compute_group' = '${validCluster}'""" + def show_group_1 = getCluster(validCluster) + + assertTrue(show_group_1[2].contains(user2), "Expect contain users ${user2}") + + result = getProperty("default_compute_group", "${user1}") + assertEquals(result.Value as String, "${validCluster}" as String) + + connect(user = "${user2}", password = 'Cloud12345', url = context.config.jdbcUrl) { + result = sql """use @${validCluster}""" + assertEquals(result[0][0], 0) + result = getProperty("default_compute_group", "") + assertEquals(result.Value as String, "${validCluster}" as String) + } + // set default_compute_group to '' + logAndExecuteSql """SET PROPERTY FOR '${user2}' 'default_compute_group' = ''""" + connect(user = "${user2}", password = 'Cloud12345', url = context.config.jdbcUrl) { + result = getProperty("default_compute_group", "") + assertEquals(result.Value as String, "" as String) + } + + logAndExecuteSql """SET PROPERTY FOR '${user2}' 'default_compute_group' = '${validCluster}'""" + result = logAndExecuteSql """REVOKE USAGE_PRIV ON COMPUTE GROUP '${validCluster}' FROM '${user2}'""" + assertEquals(result[0][0], 0) + connect(user = "${user2}", password = 'Cloud12345', url = context.config.jdbcUrl) { + test { + sql """use @${group1}""" + exception "USAGE denied to user" + } + } + + connect(user = "${user2}", password = 'Cloud12345', url = context.config.jdbcUrl) { + test { + sql """use @${validCluster}""" + exception "USAGE denied to user" + } + } + + logAndExecuteSql """GRANT USAGE_PRIV ON COMPUTE GROUP '${group1}' TO '${user2}'""" + logAndExecuteSql """GRANT USAGE_PRIV ON COMPUTE GROUP '${validCluster}' TO '${user2}'""" + show_group_2 = connect(user = "${user2}", password = 'Cloud12345', url = context.config.jdbcUrl) { + getCluster(validCluster) + } + + assertTrue(show_group_2[2].equals(user2), "Expect just only have user ${user2}") + + result = connect(user = "${user2}", password = 'Cloud12345', url = context.config.jdbcUrl) { + sql """USE @${validCluster}""" + } + assertEquals(result[0][0], 0) + + logAndExecuteSql """REVOKE USAGE_PRIV ON COMPUTE GROUP '${validCluster}' FROM '${user2}'""" + + connect(user = "${user2}", password = 'Cloud12345', url = context.config.jdbcUrl) { + test { + sql """use @${validCluster}""" + exception "USAGE denied to user" + } + result = sql_return_maparray """show grants for '${user2}'""" + commonAuth result, "'${user2}'@'%'" as String, "Yes", "", "Select_priv" + assertTrue((result.ComputeGroupPrivs as String).contains("${group1}: Cluster_usage_priv")) + + test { + sql """REVOKE USAGE_PRIV ON COMPUTE GROUP 'NotExistCluster' FROM '${user2}'""" + exception "Access denied; you need all" + } + } + + logAndExecuteSql """REVOKE USAGE_PRIV ON COMPUTE GROUP '${validCluster}' FROM '${user2}'""" + result = sql_return_maparray """show grants for '${user2}'""" + commonAuth result, "'${user2}'@'%'" as String, "Yes", "", "Select_priv" + assertTrue((result.ComputeGroupPrivs as String).contains("${group1}: Cluster_usage_priv")) + + logAndExecuteSql "sync" + // 3. revoke group + // admin role user can revoke group + result = connect(user = "${user1}", password = 'Cloud12345', url = context.config.jdbcUrl) { + sql """REVOKE USAGE_PRIV ON COMPUTE GROUP '${group1}' FROM '${user1}'""" + } + + // revoke GRANT_PRIV from general user, he can not revoke group to other user. + logAndExecuteSql """revoke GRANT_PRIV on *.*.* from ${user2}""" + + logAndExecuteSql "sync" + + // general user can't revoke group + try { + result = connect(user = "${user2}", password = 'Cloud12345', url = context.config.jdbcUrl) { + sql """REVOKE USAGE_PRIV ON COMPUTE GROUP '${group1}' FROM '${user2}'""" + } + } catch (Exception e) { + assertTrue(e.getMessage().contains("Access denied; you need all"), e.getMessage()) + } + + result = sql_return_maparray """show grants for '${user1}'""" + commonAuth result, "'${user1}'@'%'" as String, "Yes", "admin", "Admin_priv" + assertNull(result.ComputeGroupPrivs[0]) + + result = sql_return_maparray """show grants for '${user2}'""" + commonAuth result, "'${user2}'@'%'" as String, "Yes", "", "Select_priv" + assertTrue((result.ComputeGroupPrivs as String).contains("${group1}: Cluster_usage_priv")) + + // revoke user1 admin role + logAndExecuteSql """REVOKE 'admin' FROM ${user1}""" + result = sql_return_maparray """show grants for '${user1}'""" + assertEquals("'${user1}'@'%'" as String, result.UserIdentity[0] as String) + assertEquals("", result.Roles[0]) + assertNull(result.GlobalPrivs[0]) + assertNull(result.ComputeGroupPrivs[0]) + + // user1 no admin auth, so failed to set other default compute group + try { + result = connect(user = "${user1}", password = 'Cloud12345', url = context.config.jdbcUrl) { + sql """SET PROPERTY FOR '${user2}' 'default_compute_group' = '${validCluster}'""" + } + } catch (Exception e) { + assertTrue(e.getMessage().contains("Access denied for user"), e.getMessage()) + } + + logAndExecuteSql """drop user if exists ${user1}""" + // grant not exists user + result = logAndExecuteSql """GRANT USAGE_PRIV ON COMPUTE GROUP '${group1}' TO 'NotExitUser'""" + assertEquals(result[0][0], 0) + + // drop user and grant he group priv + result = logAndExecuteSql """GRANT USAGE_PRIV ON COMPUTE GROUP '${group1}' TO '${user1}'""" + assertEquals(result[0][0], 0) + result = logAndExecuteSql """REVOKE USAGE_PRIV ON COMPUTE GROUP '${group1}' FROM '${user1}'""" + assertEquals(result[0][0], 0) + // general user can't grant group to use + logAndExecuteSql """drop user if exists ${user2}""" + logAndExecuteSql """drop user if exists ${user3}""" +} + + diff --git a/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_compute_group.groovy b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_compute_group.groovy new file mode 100644 index 00000000000000..a086731efffce4 --- /dev/null +++ b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_compute_group.groovy @@ -0,0 +1,265 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("test_warm_up_compute_group") { + def ttlProperties = """ PROPERTIES("file_cache_ttl_seconds"="12000") """ + def getJobState = { jobId -> + def jobStateResult = sql """ SHOW WARM UP JOB WHERE ID = ${jobId} """ + return jobStateResult[0][2] + } + def table = "customer" + + List ipList = new ArrayList<>(); + List hbPortList = new ArrayList<>() + List httpPortList = new ArrayList<>() + List brpcPortList = new ArrayList<>() + List beUniqueIdList = new ArrayList<>() + + String[] bes = context.config.multiClusterBes.split(','); + println("the value is " + context.config.multiClusterBes); + int num = 0 + for(String values : bes) { + if (num++ == 2) break; + println("the value is " + values); + String[] beInfo = values.split(':'); + ipList.add(beInfo[0]); + hbPortList.add(beInfo[1]); + httpPortList.add(beInfo[2]); + beUniqueIdList.add(beInfo[3]); + brpcPortList.add(beInfo[4]); + } + + println("the ip is " + ipList); + println("the heartbeat port is " + hbPortList); + println("the http port is " + httpPortList); + println("the be unique id is " + beUniqueIdList); + println("the brpc port is " + brpcPortList); + + sql new File("""${context.file.parent}/../ddl/${table}_delete.sql""").text + sql new File("""${context.file.parent}/../ddl/supplier_delete.sql""").text + // create table if not exists + sql (new File("""${context.file.parent}/../ddl/${table}.sql""").text + ttlProperties) + sql (new File("""${context.file.parent}/../ddl/supplier.sql""").text + ttlProperties) + + sql """ TRUNCATE TABLE __internal_schema.cloud_cache_hotspot; """ + sleep(30000) + + def s3BucketName = getS3BucketName() + def s3WithProperties = """WITH S3 ( + |"AWS_ACCESS_KEY" = "${getS3AK()}", + |"AWS_SECRET_KEY" = "${getS3SK()}", + |"AWS_ENDPOINT" = "${getS3Endpoint()}", + |"AWS_REGION" = "${getS3Region()}", + |"provider" = "${getS3Provider()}") + |PROPERTIES( + |"exec_mem_limit" = "8589934592", + |"load_parallelism" = "3")""".stripMargin() + + + + def clearFileCache = { ip, port -> + httpTest { + endpoint "" + uri ip + ":" + port + """/api/file_cache?op=clear&sync=true""" + op "get" + body "" + } + } + + def getMetricsMethod = { ip, port, check_func -> + httpTest { + endpoint ip + ":" + port + uri "/brpc_metrics" + op "get" + check check_func + } + } + + clearFileCache.call(ipList[0], httpPortList[0]); + clearFileCache.call(ipList[1], httpPortList[1]); + + def load_customer_once = { + def uniqueID = Math.abs(UUID.randomUUID().hashCode()).toString() + def loadLabel = table + "_" + uniqueID + // load data from cos + def loadSql = new File("""${context.file.parent}/../ddl/${table}_load.sql""").text.replaceAll("\\\$\\{s3BucketName\\}", s3BucketName) + loadSql = loadSql.replaceAll("\\\$\\{loadLabel\\}", loadLabel) + s3WithProperties + sql loadSql + + // check load state + while (true) { + def stateResult = sql "show load where Label = '${loadLabel}'" + def loadState = stateResult[stateResult.size() - 1][2].toString() + if ("CANCELLED".equalsIgnoreCase(loadState)) { + throw new IllegalStateException("load ${loadLabel} failed.") + } else if ("FINISHED".equalsIgnoreCase(loadState)) { + break + } + sleep(5000) + } + } + + def load_supplier_once = { + def uniqueID = Math.abs(UUID.randomUUID().hashCode()).toString() + def loadLabel = "supplier_" + uniqueID + // load data from cos + def loadSql = new File("""${context.file.parent}/../ddl/supplier_load.sql""").text.replaceAll("\\\$\\{s3BucketName\\}", s3BucketName) + loadSql = loadSql.replaceAll("\\\$\\{loadLabel\\}", loadLabel) + s3WithProperties + sql loadSql + + // check load state + while (true) { + def stateResult = sql "show load where Label = '${loadLabel}'" + def loadState = stateResult[stateResult.size() - 1][2].toString() + if ("CANCELLED".equalsIgnoreCase(loadState)) { + throw new IllegalStateException("load ${loadLabel} failed.") + } else if ("FINISHED".equalsIgnoreCase(loadState)) { + break + } + sleep(5000) + } + } + + sql "use @regression_cluster_name0" + load_customer_once() + load_customer_once() + load_customer_once() + load_customer_once() + load_customer_once() + load_supplier_once() + load_supplier_once() + load_supplier_once() + + for (int i = 0; i < 1000; i++) { + sql "select count(*) from customer" + sql "select count(*) from supplier" + } + sleep(40000) + def jobId_ = sql "WARM UP COMPUTE GROUP regression_cluster_name1 WITH COMPUTE GROUP regression_cluster_name0" + def waitJobDone = { jobId -> + int retryTime = 120 + int i = 0 + for (; i < retryTime; i++) { + sleep(1000) + def status = getJobState(jobId[0][0]) + logger.info(status) + if (status.equals("CANCELLED")) { + assertTrue(false); + } + if (status.equals("FINISHED")) { + break; + } + } + if (i == retryTime) { + sql "cancel warm up job where id = ${jobId[0][0]}" + assertTrue(false); + } + } + waitJobDone(jobId_) + + sleep(30000) + long ttl_cache_size = 0 + getMetricsMethod.call(ipList[0], brpcPortList[0]) { + respCode, body -> + assertEquals("${respCode}".toString(), "200") + String out = "${body}".toString() + def strs = out.split('\n') + Boolean flag = false; + for (String line in strs) { + if (line.contains("ttl_cache_size")) { + if (line.startsWith("#")) { + continue + } + def i = line.indexOf(' ') + ttl_cache_size = line.substring(i).toLong() + flag = true + break + } + } + assertTrue(flag) + } + + getMetricsMethod.call(ipList[1], brpcPortList[1]) { + respCode, body -> + assertEquals("${respCode}".toString(), "200") + String out = "${body}".toString() + def strs = out.split('\n') + Boolean flag = false; + for (String line in strs) { + if (line.contains("ttl_cache_size")) { + if (line.startsWith("#")) { + continue + } + def i = line.indexOf(' ') + assertEquals(ttl_cache_size, line.substring(i).toLong()) + flag = true + break + } + } + assertTrue(flag) + } + + try { + sql "WARM UP COMPUTE GROUP regression_cluster_name1 WITH COMPUTE GROUP regression_cluster_name2" + assertTrue(false) + } catch (Exception e) { + assertTrue(true) + } + + try { + sql "WARM UP COMPUTE GROUP regression_cluster_name2 WITH COMPUTE GROUP regression_cluster_name0" + assertTrue(false) + } catch (Exception e) { + assertTrue(true) + } + + try { + sql "WARM UP COMPUTE GROUP regression_cluster_name0 WITH COMPUTE GROUP regression_cluster_name0" + assertTrue(false) + } catch (Exception e) { + assertTrue(true) + } + + sql new File("""${context.file.parent}/../ddl/${table}_delete.sql""").text + sql new File("""${context.file.parent}/../ddl/supplier_delete.sql""").text + + clearFileCache.call(ipList[1], httpPortList[1]); + jobId_ = sql "WARM UP COMPUTE GROUP regression_cluster_name1 WITH COMPUTE GROUP regression_cluster_name0" + waitJobDone(jobId_) + sleep(40000) + getMetricsMethod.call(ipList[1], brpcPortList[1]) { + respCode, body -> + assertEquals("${respCode}".toString(), "200") + String out = "${body}".toString() + def strs = out.split('\n') + Boolean flag = false; + for (String line in strs) { + if (line.contains("ttl_cache_size")) { + if (line.startsWith("#")) { + continue + } + def j = line.indexOf(' ') + assertEquals(0, line.substring(j).toLong()) + flag = true + break + } + } + assertTrue(flag) + } +} diff --git a/regression-test/suites/cloud_p0/multi_cluster/test_auto_start.groovy b/regression-test/suites/cloud_p0/multi_cluster/test_auto_start.groovy index d6db6364d38af7..d9b105ec92d552 100644 --- a/regression-test/suites/cloud_p0/multi_cluster/test_auto_start.groovy +++ b/regression-test/suites/cloud_p0/multi_cluster/test_auto_start.groovy @@ -100,7 +100,7 @@ suite('test_auto_start_in_cloud', 'multi_cluster, docker') { def jsonSlurper = new JsonSlurper() def jsonObject = jsonSlurper.parseText(tag) - String cloudClusterId = jsonObject.cloud_cluster_id + String cloudClusterId = jsonObject.compute_group_id String uniqueId = jsonObject.cloud_unique_id sleep(5 * 1000) @@ -130,7 +130,7 @@ suite('test_auto_start_in_cloud', 'multi_cluster, docker') { tag = getCloudBeTagByName(clusterName) logger.info("tag = {}", tag) jsonObject = jsonSlurper.parseText(tag) - String cluster_status = jsonObject.cloud_cluster_status + String cluster_status = jsonObject.compute_group_status cluster_status == "SUSPENDED" } @@ -158,7 +158,7 @@ suite('test_auto_start_in_cloud', 'multi_cluster, docker') { tag = getCloudBeTagByName(clusterName) logger.info("tag = {}", tag) jsonObject = jsonSlurper.parseText(tag) - String cluster_status = jsonObject.cloud_cluster_status + String cluster_status = jsonObject.compute_group_status cluster_status == "TO_RESUME" } sleep(5 * 1000) @@ -172,7 +172,7 @@ suite('test_auto_start_in_cloud', 'multi_cluster, docker') { tag = getCloudBeTagByName(clusterName) logger.info("tag check = {}", tag) jsonObject = jsonSlurper.parseText(tag) - String cluster_status = jsonObject.cloud_cluster_status + String cluster_status = jsonObject.compute_group_status assertEquals("NORMAL", cluster_status) // add 1 nodes, check it status NORMAL @@ -189,7 +189,7 @@ suite('test_auto_start_in_cloud', 'multi_cluster, docker') { return } jsonObject = jsonSlurper.parseText(tag) - cluster_status = jsonObject.cloud_cluster_status + cluster_status = jsonObject.compute_group_status assertEquals("NORMAL", cluster_status) } } diff --git a/regression-test/suites/cloud_p0/multi_cluster/test_tvf.groovy b/regression-test/suites/cloud_p0/multi_cluster/test_tvf.groovy index 90fd6656b8ffbb..68b89b6f3d661c 100644 --- a/regression-test/suites/cloud_p0/multi_cluster/test_tvf.groovy +++ b/regression-test/suites/cloud_p0/multi_cluster/test_tvf.groovy @@ -52,7 +52,7 @@ suite('test_tvf_in_cloud', 'multi_cluster,docker') { def jsonSlurper = new JsonSlurper() def jsonObject = jsonSlurper.parseText(tag) - def cloudClusterId = jsonObject.cloud_cluster_id + def cloudClusterId = jsonObject.compute_group_id // multi cluster env // current cluster diff --git a/regression-test/suites/cloud_p0/node_mgr/test_not_allowed_op.groovy b/regression-test/suites/cloud_p0/node_mgr/test_not_allowed_op.groovy new file mode 100644 index 00000000000000..0ad6fd4ccdc0a5 --- /dev/null +++ b/regression-test/suites/cloud_p0/node_mgr/test_not_allowed_op.groovy @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite('test_not_allowed_op', 'p0') { + if (!isCloudMode()) { + return; + } + + // Test modifying frontend is not allowed + try { + // Get current frontend information + def frontendResult = sql_return_maparray """SHOW FRONTENDS""" + logger.info("Current frontends: ${frontendResult}") + + // Extract the first frontend's information + def firstFrontend = frontendResult[0] + def frontendHost = firstFrontend['Host'] + def frontendEditLogPort = firstFrontend['EditLogPort'] + + // Construct the frontend address + def frontendAddress = "${frontendHost}:${frontendEditLogPort}" + logger.info("Attempting to modify frontend: ${frontendAddress}") + def result = sql """ ALTER SYSTEM MODIFY FRONTEND "${frontendAddress}" HOSTNAME 'localhost' """ + logger.info("Modify frontend result: ${result}") + throw new IllegalStateException("Expected exception was not thrown") + } catch (Exception e) { + assertTrue(e.getMessage().contains("Modifying frontend hostname is not supported in cloud mode")) + } + + // Get current backend information + def backendResult = sql_return_maparray """SHOW BACKENDS""" + logger.info("Current backends: ${backendResult}") + + // Extract the first backend's information + def firstBackend = backendResult[0] + def backendHost = firstBackend['Host'] + def backendHeartbeatPort = firstBackend['HeartbeatPort'] + + // Construct the backend address + def backendAddress = "${backendHost}:${backendHeartbeatPort}" + // Test modifying backend is not allowed + try { + logger.info("Attempting to modify backend: ${backendAddress}") + def result = sql """ ALTER SYSTEM MODIFY BACKEND '${backendAddress}' SET("tag.location" = "tag1") """ + logger.info("Modify backend result: ${result}") + throw new IllegalStateException("Expected exception was not thrown") + } catch (Exception e) { + logger.info("Caught expected exception: ${e.getMessage()}") + assertTrue(e.getMessage().contains("Modifying backends is not supported in cloud mode")) + } + + // Test modifying backend hostname is not allowed + try { + sql """ ALTER SYSTEM MODIFY BACKEND "${backendAddress}" HOSTNAME 'localhost' """ + throw new IllegalStateException("Expected exception was not thrown") + } catch (Exception e) { + assertTrue(e.getMessage().contains("Modifying backend hostname is not supported in cloud mode")) + } + + logger.info("All tests for disallowed operations in cloud mode passed successfully") +} \ No newline at end of file diff --git a/regression-test/suites/cloud_p0/node_mgr/test_sql_mode_node_mgr.groovy b/regression-test/suites/cloud_p0/node_mgr/test_sql_mode_node_mgr.groovy index c79219aeac2bb7..77f7d05ff299c0 100644 --- a/regression-test/suites/cloud_p0/node_mgr/test_sql_mode_node_mgr.groovy +++ b/regression-test/suites/cloud_p0/node_mgr/test_sql_mode_node_mgr.groovy @@ -26,6 +26,8 @@ suite('test_sql_mode_node_mgr', 'docker,p1') { def clusterOptions = [ new ClusterOptions(), new ClusterOptions(), + new ClusterOptions(), + new ClusterOptions(), ] for (options in clusterOptions) { @@ -40,12 +42,22 @@ suite('test_sql_mode_node_mgr', 'docker,p1') { "heartbeat_interval_second=1",] } - clusterOptions[0].beCloudInstanceId = true; + clusterOptions[0].sqlModeNodeMgr = true; + clusterOptions[0].beClusterId = true; clusterOptions[0].beMetaServiceEndpoint = true; - clusterOptions[1].beCloudInstanceId = false; + clusterOptions[1].sqlModeNodeMgr = true; + clusterOptions[1].beClusterId = false; clusterOptions[1].beMetaServiceEndpoint = false; + clusterOptions[2].sqlModeNodeMgr = false; + clusterOptions[2].beClusterId = true; + clusterOptions[2].beMetaServiceEndpoint = true; + + clusterOptions[3].sqlModeNodeMgr = false; + clusterOptions[3].beClusterId = false; + clusterOptions[3].beMetaServiceEndpoint = false; + for (options in clusterOptions) { docker(options) { logger.info("docker started"); @@ -145,8 +157,24 @@ suite('test_sql_mode_node_mgr', 'docker,p1') { cluster.restartFrontends(); cluster.restartBackends(); - sleep(30000) - context.reconnectFe() + def reconnectFe = { + sleep(10000) + logger.info("Reconnecting to a new frontend...") + def newFe = cluster.getMasterFe() + if (newFe) { + logger.info("New frontend found: ${newFe.host}:${newFe.httpPort}") + def url = String.format( + "jdbc:mysql://%s:%s/?useLocalSessionState=true&allowLoadLocalInfile=false", + newFe.host, newFe.queryPort) + url = context.config.buildUrlWithDb(url, context.dbName) + context.connectTo(url, context.config.jdbcUser, context.config.jdbcPassword) + logger.info("Successfully reconnected to the new frontend") + } else { + logger.error("No new frontend found to reconnect") + } + } + + reconnectFe() checkClusterStatus(3, 3, 1) @@ -186,7 +214,7 @@ suite('test_sql_mode_node_mgr', 'docker,p1') { // CASE 3. Add the dropped backend back logger.info("Adding back the dropped backend: {}:{}", backendHost, backendHeartbeatPort) - sql """ ALTER SYSTEM ADD BACKEND "${backendHost}:${backendHeartbeatPort}"; """ + sql """ ALTER SYSTEM ADD BACKEND "${backendHost}:${backendHeartbeatPort}" PROPERTIES ("tag.compute_group_name" = "another_compute_group"); """ // Wait for the backend to be fully added back maxWaitSeconds = 300 @@ -207,6 +235,30 @@ suite('test_sql_mode_node_mgr', 'docker,p1') { checkClusterStatus(3, 3, 3) + // CASE 4. Check compute groups + logger.info("Checking compute groups") + + def computeGroups = sql_return_maparray("SHOW COMPUTE GROUPS") + logger.info("Compute groups: {}", computeGroups) + + // Verify that we have at least two compute groups + assert computeGroups.size() >= 2, "Expected at least 2 compute groups, but got ${computeGroups.size()}" + + // Verify that we have a 'default_compute_group' and 'another_compute_group' + def defaultGroup = computeGroups.find { it['IsCurrent'] == "TRUE" } + def anotherGroup = computeGroups.find { it['IsCurrent'] == "FALSE" } + + assert defaultGroup != null, "Expected to find 'default_compute_group'" + assert anotherGroup != null, "Expected to find 'another_compute_group'" + + // Verify that 'another_compute_group' has exactly one backend + assert anotherGroup['BackendNum'] == '1', "Expected 'another_compute_group' to have 1 backend, but it has ${anotherGroup['BackendNum']}" + + // Verify that 'default_compute_group' has the remaining backends + assert defaultGroup['BackendNum'] == '2', "Expected 'default_compute_group' to have 2 backends, but it has ${defaultGroup['BackendNum']}" + + logger.info("Compute groups verified successfully") + // CASE 4. If a fe is dropped, query and writing also work. // Get the list of frontends def frontends = sql_return_maparray("SHOW FRONTENDS") @@ -218,16 +270,18 @@ suite('test_sql_mode_node_mgr', 'docker,p1') { def feHost = feToDropMap['Host'] def feEditLogPort = feToDropMap['EditLogPort'] + def feRole = feToDropMap['Role'] logger.info("Dropping non-master frontend: {}:{}", feHost, feEditLogPort) // Drop the selected non-master frontend - sql """ ALTER SYSTEM DROP FOLLOWER "${feHost}:${feEditLogPort}"; """ + sql """ ALTER SYSTEM DROP ${feRole} "${feHost}:${feEditLogPort}"; """ // Wait for the frontend to be fully dropped maxWaitSeconds = 300 waited = 0 while (waited < maxWaitSeconds) { + reconnectFe() def currentFrontends = sql_return_maparray("SHOW FRONTENDS") if (currentFrontends.size() == frontends.size() - 1) { logger.info("Non-master frontend successfully dropped") @@ -286,6 +340,59 @@ suite('test_sql_mode_node_mgr', 'docker,p1') { logger.info("Frontend successfully added back and cluster status verified") + // CASE 6. Drop frontend and add back again + logger.info("Dropping frontend and adding back again") + + // Get the frontend to be dropped + def frontendToDrop = frontends.find { it['Host'] == feHost && it['EditLogPort'] == feEditLogPort } + assert frontendToDrop != null, "Could not find the frontend to drop" + + // Drop the frontend + sql """ ALTER SYSTEM DROP FOLLOWER "${feHost}:${feEditLogPort}"; """ + sleep(30000) + reconnectFe() + + // Wait for the frontend to be fully dropped + maxWaitSeconds = 300 + waited = 0 + while (waited < maxWaitSeconds) { + def updatedFrontends = sql_return_maparray("SHOW FRONTENDS") + if (!updatedFrontends.any { it['Host'] == feHost && it['EditLogPort'] == feEditLogPort }) { + logger.info("Frontend successfully dropped") + break + } + sleep(10000) + waited += 10 + } + + if (waited >= maxWaitSeconds) { + throw new Exception("Timeout waiting for frontend to be dropped") + } + + // Add the frontend back + sql """ ALTER SYSTEM ADD FOLLOWER "${feHost}:${feEditLogPort}"; """ + + // Wait for the frontend to be fully added back + maxWaitSeconds = 300 + waited = 0 + while (waited < maxWaitSeconds) { + def updatedFrontends = sql_return_maparray("SHOW FRONTENDS") + if (updatedFrontends.any { it['Host'] == feHost && it['EditLogPort'] == feEditLogPort }) { + logger.info("Frontend successfully added back") + break + } + sleep(10000) + waited += 10 + } + + if (waited >= maxWaitSeconds) { + throw new Exception("Timeout waiting for frontend to be added back") + } + + // Verify cluster status after adding the frontend back + checkClusterStatus(3, 3, 6) + + logger.info("Frontend successfully added back and cluster status verified") // CASE 6. If fe can not drop itself. // 6. Attempt to drop the master FE and expect an exception logger.info("Attempting to drop the master frontend") @@ -333,7 +440,7 @@ suite('test_sql_mode_node_mgr', 'docker,p1') { def originalBackendCount = 3 // As per the initial setup in this test assert currentBackends.size() == originalBackendCount, "Number of backends should remain unchanged after attempting to drop a non-existent backend" - checkClusterStatus(3, 3, 6) + checkClusterStatus(3, 3, 7) // CASE 8. Decommission a backend and verify the process logger.info("Attempting to decommission a backend") @@ -381,7 +488,7 @@ suite('test_sql_mode_node_mgr', 'docker,p1') { logger.info("Successfully decommissioned backend and verified its status") - checkClusterStatus(3, 3, 7) + checkClusterStatus(3, 3, 8) } } diff --git a/regression-test/suites/compaction/test_full_compaction.groovy b/regression-test/suites/compaction/test_full_compaction.groovy index 217a4da707cefb..60f52f6f5a55a0 100644 --- a/regression-test/suites/compaction/test_full_compaction.groovy +++ b/regression-test/suites/compaction/test_full_compaction.groovy @@ -19,10 +19,6 @@ import org.codehaus.groovy.runtime.IOGroovyMethods suite("test_full_compaction") { def tableName = "test_full_compaction" - def isCloudMode = { - def ret = sql_return_maparray """show backends""" - ret.Tag[0].contains("cloud_cluster_name") - } try { String backend_id; @@ -171,7 +167,7 @@ suite("test_full_compaction") { assert tabletJson.rowsets instanceof List rowsetCount +=((List) tabletJson.rowsets).size() } - def cloudMode = isCloudMode.call() + def cloudMode = isCloudMode() if (cloudMode) { assert (rowsetCount == 2) } else { diff --git a/regression-test/suites/node_p0/test_backend.groovy b/regression-test/suites/node_p0/test_backend.groovy index cce111b0a19076..5e68e5019acede 100644 --- a/regression-test/suites/node_p0/test_backend.groovy +++ b/regression-test/suites/node_p0/test_backend.groovy @@ -24,23 +24,28 @@ suite("test_backend", "nonConcurrent") { logger.info("result:${result}") sql """ALTER SYSTEM ADD BACKEND "${address}:${notExistPort}";""" + waitAddBeFinished(address, notExistPort) result = sql """SHOW BACKENDS;""" logger.info("result:${result}") - sql """ALTER SYSTEM MODIFY BACKEND "${address}:${notExistPort}" SET ("disable_query" = "true"); """ - sql """ALTER SYSTEM MODIFY BACKEND "${address}:${notExistPort}" SET ("disable_load" = "true"); """ + if (!isCloudMode()) { + sql """ALTER SYSTEM MODIFY BACKEND "${address}:${notExistPort}" SET ("disable_query" = "true"); """ + sql """ALTER SYSTEM MODIFY BACKEND "${address}:${notExistPort}" SET ("disable_load" = "true"); """ + } result = sql """SHOW BACKENDS;""" logger.info("result:${result}") sql """ALTER SYSTEM DROPP BACKEND "${address}:${notExistPort}";""" + waitDropBeFinished(address, notExistPort) result = sql """SHOW BACKENDS;""" logger.info("result:${result}") } - if (context.config.jdbcUser.equals("root")) { + // Cancel decommission backend is not supported in cloud mode. + if (context.config.jdbcUser.equals("root") && !isCloudMode()) { def beId1 = null try { GetDebugPoint().enableDebugPointForAllFEs("SystemHandler.decommission_no_check_replica_num"); diff --git a/regression-test/suites/node_p0/test_frontend.groovy b/regression-test/suites/node_p0/test_frontend.groovy index c861b0e6b80b90..4478a1d3709c28 100644 --- a/regression-test/suites/node_p0/test_frontend.groovy +++ b/regression-test/suites/node_p0/test_frontend.groovy @@ -24,18 +24,22 @@ suite("test_frontend") { logger.debug("result:${result}") sql """ALTER SYSTEM ADD FOLLOWER "${address}:${notExistPort}";""" + waitAddFeFinished(address, notExistPort); result = sql """SHOW FRONTENDS;""" logger.debug("result:${result}") sql """ALTER SYSTEM DROP FOLLOWER "${address}:${notExistPort}";""" + waitDropFeFinished(address, notExistPort); result = sql """SHOW FRONTENDS;""" logger.debug("result:${result}") sql """ALTER SYSTEM ADD OBSERVER "${address}:${notExistPort}";""" + waitAddFeFinished(address, notExistPort); result = sql """SHOW FRONTENDS;""" logger.debug("result:${result}") sql """ALTER SYSTEM DROP OBSERVER "${address}:${notExistPort}";""" + waitDropFeFinished(address, notExistPort); result = sql """SHOW FRONTENDS;""" logger.debug("result:${result}") } diff --git a/regression-test/suites/partition_p0/dynamic_partition/test_dynamic_partition.groovy b/regression-test/suites/partition_p0/dynamic_partition/test_dynamic_partition.groovy index ec626eaa6915d2..a4b3eb661768dd 100644 --- a/regression-test/suites/partition_p0/dynamic_partition/test_dynamic_partition.groovy +++ b/regression-test/suites/partition_p0/dynamic_partition/test_dynamic_partition.groovy @@ -88,10 +88,7 @@ suite("test_dynamic_partition") { assertEquals(result.get(0).Buckets.toInteger(), 3) sql "drop table dy_par_bucket_set_by_distribution" sql "drop table if exists dy_par_bad" - def isCloudMode = { - def ret = sql_return_maparray """show backends""" - ret.Tag[0].contains("cloud_cluster_name") - } + def isCloudMode = isCloudMode() // not support tag in cloud mode if (!isCloudMode) {