diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 9421364b482f11..4c627a99376aad 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2268,6 +2268,12 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static int publish_topic_info_interval_ms = 30000; // 30s + @ConfField(mutable = true) + public static int workload_sched_policy_interval_ms = 10000; // 10s + + @ConfField(mutable = true) + public static int workload_action_interval_ms = 10000; // 10s + @ConfField(description = {"查询be wal_queue 的超时阈值(ms)", "the timeout threshold of checking wal_queue on be(ms)"}) public static int check_wal_queue_timeout_threshold = 180000; // 3 min diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index a81f8cf93c4445..4128c4558e23de 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -52,6 +52,8 @@ import org.apache.doris.common.Version; import org.apache.doris.mysql.MysqlPassword; import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.policy.PolicyTypeEnum; +import org.apache.doris.resource.workloadschedpolicy.WorkloadConditionMeta; +import org.apache.doris.resource.workloadschedpolicy.WorkloadActionMeta; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -530,6 +532,9 @@ terminal String KW_PROCESSLIST, KW_PROFILE, KW_PROPERTIES, + KW_CONDITIONS, + KW_ACTIONS, + KW_SET_SESSION_VAR, KW_PROPERTY, KW_QUANTILE_STATE, KW_QUANTILE_UNION, @@ -841,6 +846,8 @@ nonterminal SetVar option_value, option_value_follow_option_type, option_value_n // List of set variable nonterminal List option_value_list, option_value_list_continued, start_option_value_list, start_option_value_list_following_option_type, user_property_list; +nonterminal List workload_policy_condition_list, conditions, opt_conditions; +nonterminal List workload_policy_action_list, opt_actions, actions; nonterminal Map key_value_map, opt_key_value_map, opt_key_value_map_in_paren, opt_properties, opt_ext_properties, opt_enable_feature_properties, properties; @@ -963,6 +970,9 @@ nonterminal StorageBackend storage_backend; nonterminal ArrayList opt_lock_tables_list; nonterminal LockTable lock_table; +// workload policy/group +nonterminal String policy_condition_op, policy_condition_value; + precedence nonassoc COMMA; precedence nonassoc STRING_LITERAL; precedence nonassoc KW_COLUMNS; @@ -986,7 +996,7 @@ precedence left BITAND, BITOR, BITXOR; precedence left KW_PIPE; precedence left BITNOT; precedence left KW_ORDER, KW_BY, KW_LIMIT; -precedence right KW_PROPERTIES; +precedence right KW_PROPERTIES, KW_CONDITIONS, KW_ACTIONS; precedence left LPAREN, RPAREN; precedence left KW_PREPARE, KW_EXECUTE; // Support chaining of timestamp arithmetic exprs. @@ -1362,6 +1372,10 @@ alter_stmt ::= {: RESULT = new AlterWorkloadGroupStmt(workloadGroupName, properties); :} + | KW_ALTER KW_WORKLOAD KW_SCHEDULE KW_POLICY ident_or_text:policyName opt_properties:properties + {: + RESULT = new AlterWorkloadSchedPolicyStmt(policyName, properties); + :} | KW_ALTER KW_ROUTINE KW_LOAD KW_FOR job_label:jobLabel opt_properties:jobProperties opt_datasource_properties:datasourceProperties {: @@ -1884,6 +1898,11 @@ create_stmt ::= {: RESULT = new CreateWorkloadGroupStmt(ifNotExists, workloadGroupName, properties); :} + /* workload schedule policy */ + | KW_CREATE KW_WORKLOAD KW_SCHEDULE KW_POLICY opt_if_not_exists:ifNotExists ident_or_text:workloadPolicyName opt_conditions:conditions opt_actions:actions opt_properties:properties + {: + RESULT = new CreateWorkloadSchedPolicyStmt(ifNotExists, workloadPolicyName, conditions, actions, properties); + :} /* encryptkey */ | KW_CREATE KW_ENCRYPTKEY opt_if_not_exists:ifNotExists encryptkey_name:keyName KW_AS STRING_LITERAL:keyString {: @@ -2987,6 +3006,10 @@ drop_stmt ::= {: RESULT = new DropWorkloadGroupStmt(ifExists, workloadGroupName); :} + | KW_DROP KW_WORKLOAD KW_SCHEDULE KW_POLICY opt_if_exists:ifExists ident_or_text:policyName + {: + RESULT = new DropWorkloadSchedPolicyStmt(ifExists, policyName); + :} | KW_DROP KW_ENCRYPTKEY opt_if_exists:ifExists encryptkey_name:keyName {: RESULT = new DropEncryptKeyStmt(ifExists, keyName); @@ -3455,6 +3478,135 @@ opt_properties ::= :} ; +policy_condition_op ::= + EQUAL + {: + RESULT = "="; + :} + | GREATERTHAN + {: + RESULT = ">"; + :} + | LESSTHAN + {: + RESULT = "<"; + :} + | GREATERTHAN EQUAL + {: + RESULT = ">="; + :} + | LESSTHAN EQUAL + {: + RESULT = "<="; + :}; + +policy_condition_value ::= + INTEGER_LITERAL:val + {: + RESULT = String.valueOf(val); + :} + | STRING_LITERAL:val + {: + RESULT = val; + :} + | FLOATINGPOINT_LITERAL:val + {: + RESULT = String.valueOf(val); + :} + | DECIMAL_LITERAL:val + {: + RESULT = val.toString(); + :} + ; + +workload_policy_condition_list ::= + ident:metricName policy_condition_op:op policy_condition_value:value + {: + RESULT = new ArrayList<>(); + WorkloadConditionMeta cm = new WorkloadConditionMeta(metricName, op, value); + RESULT.add(cm); + :} + | workload_policy_condition_list:list COMMA ident:metricName policy_condition_op:op policy_condition_value:value + {: + WorkloadConditionMeta cm = new WorkloadConditionMeta(metricName, op, value); + list.add(cm); + RESULT = list; + :}; + +conditions ::= + KW_CONDITIONS LPAREN workload_policy_condition_list:list RPAREN + {: + RESULT = list; + :} + ; + +opt_conditions ::= + {: + RESULT = null; + :} + | conditions:conditions + {: + RESULT = conditions; + :} + ; + +workload_policy_action_list ::= + KW_SET_SESSION_VAR STRING_LITERAL:args + {: + RESULT = Lists.newArrayList(); + WorkloadActionMeta wam = new WorkloadActionMeta("set_session_variable", args); + RESULT.add(wam); + :} + | workload_policy_action_list:list COMMA KW_SET_SESSION_VAR STRING_LITERAL:args + {: + WorkloadActionMeta wam = new WorkloadActionMeta("set_session_variable", args); + list.add(wam); + RESULT = list; + :} + | ident:firstArgs + {: + RESULT = Lists.newArrayList(); + WorkloadActionMeta wam = new WorkloadActionMeta(firstArgs, ""); + RESULT.add(wam); + :} + | ident:firstArgs STRING_LITERAL:secondArgs + {: + RESULT = Lists.newArrayList(); + WorkloadActionMeta wam = new WorkloadActionMeta(firstArgs, secondArgs); + RESULT.add(wam); + :} + | workload_policy_action_list:list COMMA ident:firstArgs + {: + WorkloadActionMeta wam = new WorkloadActionMeta(firstArgs, ""); + list.add(wam); + RESULT = list; + :} + | workload_policy_action_list:list COMMA ident:firstArgs STRING_LITERAL:secondArgs + {: + WorkloadActionMeta wam = new WorkloadActionMeta(firstArgs, secondArgs); + list.add(wam); + RESULT = list; + :} + ; + + +actions ::= + KW_ACTIONS LPAREN workload_policy_action_list:list RPAREN + {: + RESULT = list; + :} + ; + +opt_actions ::= + {: + RESULT = null; + :} + | actions:actions + {: + RESULT = actions; + :} + ; + opt_ext_properties ::= {: RESULT = null; @@ -4003,6 +4155,10 @@ show_param ::= {: RESULT = new ShowWorkloadGroupsStmt(); :} + | KW_WORKLOAD KW_SCHEDULE KW_POLICY + {: + RESULT = new ShowWorkloadSchedPolicyStmt(); + :} | KW_BACKENDS {: RESULT = new ShowBackendsStmt(); @@ -7640,6 +7796,12 @@ keyword ::= {: RESULT = id; :} | KW_PROPERTIES:id {: RESULT = id; :} + | KW_CONDITIONS:id + {: RESULT = id; :} + | KW_ACTIONS:id + {: RESULT = id; :} + | KW_SET_SESSION_VAR:id + {: RESULT = id; :} | KW_PROPERTY:id {: RESULT = id; :} | KW_QUERY:id diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterWorkloadSchedPolicyStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterWorkloadSchedPolicyStmt.java new file mode 100644 index 00000000000000..17ffdd26d6be27 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterWorkloadSchedPolicyStmt.java @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.PrintableMap; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.qe.ConnectContext; + +import java.util.Map; + +public class AlterWorkloadSchedPolicyStmt extends DdlStmt { + + private final String policyName; + private final Map properties; + + public AlterWorkloadSchedPolicyStmt(String policyName, Map properties) { + this.policyName = policyName; + this.properties = properties; + } + + public String getPolicyName() { + return policyName; + } + + public Map getProperties() { + return properties; + } + + public void analyze(Analyzer analyzer) throws UserException { + super.analyze(analyzer); + + // check auth + if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN"); + } + + if (properties == null || properties.isEmpty()) { + throw new AnalysisException("properties can't be null when alter workload schedule policy"); + } + } + + @Override + public String toSql() { + StringBuilder sb = new StringBuilder(); + sb.append("ALTER WORKLOAD SCHEDULE POLICY "); + sb.append(policyName); + sb.append("PROPERTIES(").append(new PrintableMap<>(properties, " = ", true, false)).append(")"); + return sb.toString(); + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadSchedPolicyStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadSchedPolicyStmt.java new file mode 100644 index 00000000000000..ee82b57822f9c0 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadSchedPolicyStmt.java @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.FeNameFormat; +import org.apache.doris.common.UserException; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.resource.workloadschedpolicy.WorkloadActionMeta; +import org.apache.doris.resource.workloadschedpolicy.WorkloadConditionMeta; + +import java.util.List; +import java.util.Map; + +public class CreateWorkloadSchedPolicyStmt extends DdlStmt { + + private final boolean ifNotExists; + private final String policyName; + private final List conditions; + private final List actions; + private final Map properties; + + public CreateWorkloadSchedPolicyStmt(boolean ifNotExists, String policyName, + List conditions, List actions, Map properties) { + this.ifNotExists = ifNotExists; + this.policyName = policyName; + this.conditions = conditions; + this.actions = actions; + this.properties = properties; + } + + public boolean isIfNotExists() { + return ifNotExists; + } + + @Override + public void analyze(Analyzer analyzer) throws UserException { + super.analyze(analyzer); + + // check auth + if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN"); + } + + // check name + FeNameFormat.checkWorkloadSchedPolicyName(policyName); + + if (conditions == null || conditions.size() < 1) { + throw new DdlException("At least one condition needs to be specified"); + } + + if (actions == null || actions.size() < 1) { + throw new DdlException("At least one action needs to be specified"); + } + } + + public String getPolicyName() { + return policyName; + } + + public List getConditions() { + return conditions; + } + + public List getActions() { + return actions; + } + + public Map getProperties() { + return properties; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DropWorkloadSchedPolicyStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropWorkloadSchedPolicyStmt.java new file mode 100644 index 00000000000000..7eeff5f3214c94 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropWorkloadSchedPolicyStmt.java @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.FeNameFormat; +import org.apache.doris.common.UserException; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.qe.ConnectContext; + +public class DropWorkloadSchedPolicyStmt extends DdlStmt { + + private boolean ifExists; + private String policyName; + + public DropWorkloadSchedPolicyStmt(boolean ifExists, String policyName) { + this.ifExists = ifExists; + this.policyName = policyName; + } + + public boolean isIfExists() { + return ifExists; + } + + public String getPolicyName() { + return policyName; + } + + @Override + public void analyze(Analyzer analyzer) throws UserException { + super.analyze(analyzer); + + // check auth + if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN"); + } + + FeNameFormat.checkWorkloadSchedPolicyName(policyName); + } + + @Override + public String toSql() { + StringBuilder sb = new StringBuilder(); + sb.append("DROP "); + sb.append("WORKLOAD SCHEDULE POLICY '").append(policyName).append("' "); + return sb.toString(); + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowWorkloadSchedPolicyStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowWorkloadSchedPolicyStmt.java new file mode 100644 index 00000000000000..a128ee3e8f7d47 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowWorkloadSchedPolicyStmt.java @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.common.UserException; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.ShowResultSetMetaData; +import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyMgr; + +public class ShowWorkloadSchedPolicyStmt extends ShowStmt { + + public ShowWorkloadSchedPolicyStmt() { + } + + @Override + public void analyze(Analyzer analyzer) throws UserException { + super.analyze(analyzer); + } + + @Override + public String toSql() { + return "SHOW WORKLOAD SCHEDULE POLICY"; + } + + @Override + public ShowResultSetMetaData getMetaData() { + ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder(); + for (String title : WorkloadSchedPolicyMgr.WORKLOAD_SCHED_POLICY_NODE_TITLE_NAMES) { + builder.addColumn(new Column(title, ScalarType.createVarchar(1000))); + } + return builder.build(); + } + + @Override + public RedirectStatus getRedirectStatus() { + if (ConnectContext.get().getSessionVariable().getForwardToMaster()) { + return RedirectStatus.FORWARD_NO_SYNC; + } else { + return RedirectStatus.NO_FORWARD; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index b498ccb6a6734e..481c3d375e0746 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -112,6 +112,7 @@ import org.apache.doris.common.io.Text; import org.apache.doris.common.publish.TopicPublisher; import org.apache.doris.common.publish.TopicPublisherThread; +import org.apache.doris.common.publish.WorkloadActionPublishThread; import org.apache.doris.common.publish.WorkloadGroupPublisher; import org.apache.doris.common.util.Daemon; import org.apache.doris.common.util.DynamicPartitionUtil; @@ -228,6 +229,7 @@ import org.apache.doris.qe.VariableMgr; import org.apache.doris.resource.Tag; import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr; +import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyMgr; import org.apache.doris.scheduler.manager.TransientTaskManager; import org.apache.doris.scheduler.registry.ExportTaskRegister; import org.apache.doris.service.ExecuteEnv; @@ -485,6 +487,7 @@ public class Env { private WorkloadGroupMgr workloadGroupMgr; + private WorkloadSchedPolicyMgr workloadSchedPolicyMgr; private QueryStats queryStats; private StatisticsCleaner statisticsCleaner; @@ -506,6 +509,8 @@ public class Env { private TopicPublisherThread topicPublisherThread; + private WorkloadActionPublishThread workloadActionPublisherThread; + private MTMVService mtmvService; public List getFrontendInfos() { @@ -725,6 +730,7 @@ private Env(boolean isCheckpointCatalog) { this.statisticsAutoCollector = new StatisticsAutoCollector(); this.globalFunctionMgr = new GlobalFunctionMgr(); this.workloadGroupMgr = new WorkloadGroupMgr(); + this.workloadSchedPolicyMgr = new WorkloadSchedPolicyMgr(); this.queryStats = new QueryStats(); this.loadManagerAdapter = new LoadManagerAdapter(); this.hiveTransactionMgr = new HiveTransactionMgr(); @@ -734,6 +740,8 @@ private Env(boolean isCheckpointCatalog) { this.queryCancelWorker = new QueryCancelWorker(systemInfo); this.topicPublisherThread = new TopicPublisherThread( "TopicPublisher", Config.publish_topic_info_interval_ms, systemInfo); + this.workloadActionPublisherThread = new WorkloadActionPublishThread("WorkloadActionPublisher", + Config.workload_action_interval_ms, systemInfo); this.mtmvService = new MTMVService(); } @@ -810,6 +818,10 @@ public WorkloadGroupMgr getWorkloadGroupMgr() { return workloadGroupMgr; } + public WorkloadSchedPolicyMgr getWorkloadSchedPolicyMgr() { + return workloadSchedPolicyMgr; + } + // use this to get correct ClusterInfoService instance public static SystemInfoService getCurrentSystemInfo() { return getCurrentEnv().getClusterInfo(); @@ -985,6 +997,8 @@ public void initialize(String[] args) throws Exception { topicPublisherThread.start(); workloadGroupMgr.startUpdateThread(); + workloadSchedPolicyMgr.start(); + workloadActionPublisherThread.start(); } // wait until FE is ready. @@ -2066,6 +2080,12 @@ public long loadWorkloadGroups(DataInputStream in, long checksum) throws IOExcep return checksum; } + public long loadWorkloadSchedPolicy(DataInputStream in, long checksum) throws IOException { + workloadSchedPolicyMgr = WorkloadSchedPolicyMgr.read(in); + LOG.info("finished replay workload sched policy from image"); + return checksum; + } + public long loadSmallFiles(DataInputStream in, long checksum) throws IOException { smallFileMgr.readFields(in); LOG.info("finished replay smallFiles from image"); @@ -2333,6 +2353,11 @@ public long saveWorkloadGroups(CountingDataOutputStream dos, long checksum) thro return checksum; } + public long saveWorkloadSchedPolicy(CountingDataOutputStream dos, long checksum) throws IOException { + Env.getCurrentEnv().getWorkloadSchedPolicyMgr().write(dos); + return checksum; + } + public long saveSmallFiles(CountingDataOutputStream dos, long checksum) throws IOException { smallFileMgr.write(dos); return checksum; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeNameFormat.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeNameFormat.java index 3a77bde84b6d69..d73e78cedce75b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/FeNameFormat.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeNameFormat.java @@ -134,6 +134,10 @@ public static void checkWorkloadGroupName(String workloadGroupName) throws Analy checkCommonName("workload group", workloadGroupName); } + public static void checkWorkloadSchedPolicyName(String policyName) throws AnalysisException { + checkCommonName("workload schedule policy", policyName); + } + public static void checkCommonName(String type, String name) throws AnalysisException { if (Strings.isNullOrEmpty(name) || !name.matches(getCommonNameRegex())) { ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_NAME_FORMAT, type, name); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadActionPublishThread.java b/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadActionPublishThread.java new file mode 100644 index 00000000000000..cacd7a9da9f340 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadActionPublishThread.java @@ -0,0 +1,123 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.publish; + +import org.apache.doris.common.ClientPool; +import org.apache.doris.common.ThreadPoolManager; +import org.apache.doris.common.util.Daemon; +import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.thrift.BackendService; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TPublishTopicRequest; +import org.apache.doris.thrift.TTopicInfoType; +import org.apache.doris.thrift.TopicInfo; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; + +public class WorkloadActionPublishThread extends Daemon { + + private ExecutorService executor = ThreadPoolManager + .newDaemonFixedThreadPool(4, 256, "workload-action-publish-thread", true); + + private static final Logger LOG = LogManager.getLogger(WorkloadActionPublishThread.class); + + public static Map> workloadActionToplicInfoMap + = new HashMap>(); + + public static synchronized void putWorkloadAction(TTopicInfoType type, TopicInfo topicInfo) { + List list = workloadActionToplicInfoMap.get(type); + if (list == null) { + list = new ArrayList(); + workloadActionToplicInfoMap.put(type, list); + } + list.add(topicInfo); + } + + public static synchronized Map> getCurrentWorkloadActionMap() { + Map> retMap = workloadActionToplicInfoMap; + workloadActionToplicInfoMap = new HashMap>(); + return retMap; + } + + private SystemInfoService clusterInfoService; + + public WorkloadActionPublishThread(String name, long intervalMs, + SystemInfoService clusterInfoService) { + super(name, intervalMs); + this.clusterInfoService = clusterInfoService; + } + + @Override + protected final void runOneCycle() { + Map> actionMap + = WorkloadActionPublishThread.getCurrentWorkloadActionMap(); + if (actionMap.size() == 0) { + LOG.info("no workload action found, skip publish"); + return; + } + Collection currentBeToPublish = clusterInfoService.getIdToBackend().values(); + AckResponseHandler handler = new AckResponseHandler(currentBeToPublish); + TPublishTopicRequest request = new TPublishTopicRequest(); + request.setTopicMap(actionMap); + for (Backend be : currentBeToPublish) { + executor.submit(new WorkloadMoveActionTask(request, be, handler)); + } + } + + public class WorkloadMoveActionTask implements Runnable { + + private TPublishTopicRequest request; + + private Backend be; + + private ResponseHandler handler; + + public WorkloadMoveActionTask(TPublishTopicRequest request, Backend be, + ResponseHandler handler) { + this.request = request; + this.be = be; + this.handler = handler; + } + + @Override + public void run() { + long beginTime = System.currentTimeMillis(); + try { + TNetworkAddress addr = new TNetworkAddress(be.getHost(), be.getBePort()); + BackendService.Client client = ClientPool.backendPool.borrowObject(addr); + client.publishTopicInfo(request); + LOG.info("publish move action topic to be {} success, time cost={} ms", + be.getHost(), (System.currentTimeMillis() - beginTime)); + } catch (Exception e) { + LOG.warn("publish move action topic to be {} error happens: , time cost={} ms", + be.getHost(), (System.currentTimeMillis() - beginTime), e); + } finally { + handler.onResponse(be); + } + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java index b0749db1fe14ea..48636901f40369 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -85,6 +85,7 @@ import org.apache.doris.persist.DropResourceOperationLog; import org.apache.doris.persist.DropSqlBlockRuleOperationLog; import org.apache.doris.persist.DropWorkloadGroupOperationLog; +import org.apache.doris.persist.DropWorkloadSchedPolicyOperatorLog; import org.apache.doris.persist.GlobalVarPersistInfo; import org.apache.doris.persist.HbPackage; import org.apache.doris.persist.LdapInfo; @@ -119,6 +120,7 @@ import org.apache.doris.policy.Policy; import org.apache.doris.policy.StoragePolicy; import org.apache.doris.resource.workloadgroup.WorkloadGroup; +import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicy; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.TableStatsMeta; import org.apache.doris.system.Backend; @@ -807,6 +809,17 @@ public void readFields(DataInput in) throws IOException { isRead = true; break; } + case OperationType.OP_CREATE_WORKLOAD_SCHED_POLICY: + case OperationType.OP_ALTER_WORKLOAD_SCHED_POLICY: { + data = WorkloadSchedPolicy.read(in); + isRead = true; + break; + } + case OperationType.OP_DROP_WORKLOAD_SCHED_POLICY: { + data = DropWorkloadSchedPolicyOperatorLog.read(in); + isRead = true; + break; + } case OperationType.OP_ALTER_LIGHT_SCHEMA_CHANGE: { data = AlterLightSchemaChangeInfo.read(in); isRead = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/DropWorkloadSchedPolicyOperatorLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/DropWorkloadSchedPolicyOperatorLog.java new file mode 100644 index 00000000000000..dea190623c3fa0 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/DropWorkloadSchedPolicyOperatorLog.java @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.persist; + +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.gson.annotations.SerializedName; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +public class DropWorkloadSchedPolicyOperatorLog implements Writable { + + @SerializedName(value = "id") + private long id; + + public DropWorkloadSchedPolicyOperatorLog(long id) { + this.id = id; + } + + public long getId() { + return id; + } + + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, GsonUtils.GSON.toJson(this)); + } + + public static DropWorkloadSchedPolicyOperatorLog read(DataInput in) throws IOException { + return GsonUtils.GSON.fromJson(Text.readString(in), DropWorkloadSchedPolicyOperatorLog.class); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index e2c5630ce77969..dca2cd1d35bc9a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -79,6 +79,7 @@ import org.apache.doris.policy.Policy; import org.apache.doris.policy.StoragePolicy; import org.apache.doris.resource.workloadgroup.WorkloadGroup; +import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicy; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.AnalysisManager; import org.apache.doris.statistics.TableStatsMeta; @@ -1055,6 +1056,22 @@ public static void loadJournal(Env env, Long logId, JournalEntity journal) { env.getWorkloadGroupMgr().replayAlterWorkloadGroup(resource); break; } + case OperationType.OP_CREATE_WORKLOAD_SCHED_POLICY: { + final WorkloadSchedPolicy policy = (WorkloadSchedPolicy) journal.getData(); + env.getWorkloadSchedPolicyMgr().replayCreateWorkloadSchedPolicy(policy); + break; + } + case OperationType.OP_ALTER_WORKLOAD_SCHED_POLICY: { + final WorkloadSchedPolicy policy = (WorkloadSchedPolicy) journal.getData(); + env.getWorkloadSchedPolicyMgr().replayAlterWorkloadSchedPolicy(policy); + break; + } + case OperationType.OP_DROP_WORKLOAD_SCHED_POLICY: { + final DropWorkloadSchedPolicyOperatorLog dropLog + = (DropWorkloadSchedPolicyOperatorLog) journal.getData(); + env.getWorkloadSchedPolicyMgr().replayDropWorkloadSchedPolicy(dropLog.getId()); + break; + } case OperationType.OP_INIT_EXTERNAL_TABLE: { // Do nothing. break; @@ -1685,6 +1702,18 @@ public void logDropWorkloadGroup(DropWorkloadGroupOperationLog operationLog) { logEdit(OperationType.OP_DROP_WORKLOAD_GROUP, operationLog); } + public void logCreateWorkloadSchedPolicy(WorkloadSchedPolicy workloadSchedPolicy) { + logEdit(OperationType.OP_CREATE_WORKLOAD_SCHED_POLICY, workloadSchedPolicy); + } + + public void logAlterWorkloadSchedPolicy(WorkloadSchedPolicy workloadSchedPolicy) { + logEdit(OperationType.OP_ALTER_WORKLOAD_SCHED_POLICY, workloadSchedPolicy); + } + + public void dropWorkloadSchedPolicy(long policyId) { + logEdit(OperationType.OP_DROP_WORKLOAD_SCHED_POLICY, new DropWorkloadSchedPolicyOperatorLog(policyId)); + } + public void logAlterStoragePolicy(StoragePolicy storagePolicy) { logEdit(OperationType.OP_ALTER_STORAGE_POLICY, storagePolicy); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java index 0769cba0d81e11..ada6adf53d28fc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java @@ -304,6 +304,9 @@ public class OperationType { public static final short OP_CREATE_WORKLOAD_GROUP = 410; public static final short OP_DROP_WORKLOAD_GROUP = 411; public static final short OP_ALTER_WORKLOAD_GROUP = 412; + public static final short OP_CREATE_WORKLOAD_SCHED_POLICY = 413; + public static final short OP_ALTER_WORKLOAD_SCHED_POLICY = 414; + public static final short OP_DROP_WORKLOAD_SCHED_POLICY = 415; // query stats 440 ~ 424 public static final short OP_CLEAN_QUERY_STATS = 420; diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java index 6ed00de22b8ac3..37741d84af66c9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java @@ -212,6 +212,13 @@ public static MetaPersistMethod create(String name) throws NoSuchMethodException metaPersistMethod.writeMethod = Env.class.getDeclaredMethod("saveWorkloadGroups", CountingDataOutputStream.class, long.class); break; + case "workloadSchedPolicy": + metaPersistMethod.readMethod = + Env.class.getDeclaredMethod("loadWorkloadSchedPolicy", DataInputStream.class, long.class); + metaPersistMethod.writeMethod = + Env.class.getDeclaredMethod("saveWorkloadSchedPolicy", CountingDataOutputStream.class, + long.class); + break; case "binlogs": metaPersistMethod.readMethod = Env.class.getDeclaredMethod("loadBinlogs", DataInputStream.class, long.class); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java index f6161629d99169..43255d7e36b31a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java @@ -39,7 +39,7 @@ public class PersistMetaModules { "globalVariable", "cluster", "broker", "resources", "exportJob", "syncJob", "backupHandler", "paloAuth", "transactionState", "colocateTableIndex", "routineLoadJobs", "loadJobV2", "smallFiles", "plugins", "deleteHandler", "sqlBlockRule", "policy", "globalFunction", "workloadGroups", - "binlogs", "resourceGroups", "AnalysisMgrV2", "AsyncJobManager"); + "binlogs", "resourceGroups", "AnalysisMgrV2", "AsyncJobManager", "workloadSchedPolicy"); // Modules in this list is deprecated and will not be saved in meta file. (also should not be in MODULE_NAMES) public static final ImmutableList DEPRECATED_MODULE_NAMES = ImmutableList.of( diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index d86ee57414643b..47fd5ee78443f0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -104,7 +104,7 @@ public enum ConnectType { protected volatile long backendId; protected volatile LoadTaskInfo streamLoadInfo; - protected volatile TUniqueId queryId; + protected volatile TUniqueId queryId = null; protected volatile String traceId; // id for this connection protected volatile int connectionId; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java index 5be4c330e0aec2..b7dbcdfc815dd1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java @@ -170,4 +170,8 @@ public String getQueryIdByTraceId(String traceId) { TUniqueId queryId = traceId2QueryId.get(traceId); return queryId == null ? "" : DebugUtil.printId(queryId); } + + public Map getConnectionMap() { + return connectionMap; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java index 7b41715a425861..7a47dbcb541e8f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -47,6 +47,7 @@ import org.apache.doris.analysis.AlterUserStmt; import org.apache.doris.analysis.AlterViewStmt; import org.apache.doris.analysis.AlterWorkloadGroupStmt; +import org.apache.doris.analysis.AlterWorkloadSchedPolicyStmt; import org.apache.doris.analysis.BackupStmt; import org.apache.doris.analysis.CancelAlterSystemStmt; import org.apache.doris.analysis.CancelAlterTableStmt; @@ -77,6 +78,7 @@ import org.apache.doris.analysis.CreateUserStmt; import org.apache.doris.analysis.CreateViewStmt; import org.apache.doris.analysis.CreateWorkloadGroupStmt; +import org.apache.doris.analysis.CreateWorkloadSchedPolicyStmt; import org.apache.doris.analysis.DdlStmt; import org.apache.doris.analysis.DropAnalyzeJobStmt; import org.apache.doris.analysis.DropCatalogStmt; @@ -94,6 +96,7 @@ import org.apache.doris.analysis.DropTableStmt; import org.apache.doris.analysis.DropUserStmt; import org.apache.doris.analysis.DropWorkloadGroupStmt; +import org.apache.doris.analysis.DropWorkloadSchedPolicyStmt; import org.apache.doris.analysis.GrantStmt; import org.apache.doris.analysis.InstallPluginStmt; import org.apache.doris.analysis.KillAnalysisJobStmt; @@ -285,6 +288,12 @@ public static void execute(Env env, DdlStmt ddlStmt) throws Exception { env.getWorkloadGroupMgr().createWorkloadGroup((CreateWorkloadGroupStmt) ddlStmt); } else if (ddlStmt instanceof DropWorkloadGroupStmt) { env.getWorkloadGroupMgr().dropWorkloadGroup((DropWorkloadGroupStmt) ddlStmt); + } else if (ddlStmt instanceof CreateWorkloadSchedPolicyStmt) { + env.getWorkloadSchedPolicyMgr().createWorkloadSchedPolicy((CreateWorkloadSchedPolicyStmt) ddlStmt); + } else if (ddlStmt instanceof AlterWorkloadSchedPolicyStmt) { + env.getWorkloadSchedPolicyMgr().alterWorkloadSchedPolicy((AlterWorkloadSchedPolicyStmt) ddlStmt); + } else if (ddlStmt instanceof DropWorkloadSchedPolicyStmt) { + env.getWorkloadSchedPolicyMgr().dropWorkloadSchedPolicy((DropWorkloadSchedPolicyStmt) ddlStmt); } else if (ddlStmt instanceof CreateDataSyncJobStmt) { CreateDataSyncJobStmt createSyncJobStmt = (CreateDataSyncJobStmt) ddlStmt; SyncJobManager syncJobMgr = env.getSyncJobManager(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index df3ddc3151d31c..b7ee282d24880d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -105,6 +105,7 @@ import org.apache.doris.analysis.ShowVariablesStmt; import org.apache.doris.analysis.ShowViewStmt; import org.apache.doris.analysis.ShowWorkloadGroupsStmt; +import org.apache.doris.analysis.ShowWorkloadSchedPolicyStmt; import org.apache.doris.analysis.TableName; import org.apache.doris.backup.AbstractJob; import org.apache.doris.backup.BackupJob; @@ -348,6 +349,8 @@ public ShowResultSet execute() throws AnalysisException { handleShowResources(); } else if (stmt instanceof ShowWorkloadGroupsStmt) { handleShowWorkloadGroups(); + } else if (stmt instanceof ShowWorkloadSchedPolicyStmt) { + handleShowWorkloadSchedPolicy(); } else if (stmt instanceof ShowExportStmt) { handleShowExport(); } else if (stmt instanceof ShowBackendsStmt) { @@ -1951,6 +1954,12 @@ private void handleShowWorkloadGroups() { resultSet = new ShowResultSet(showStmt.getMetaData(), workloadGroupsInfos); } + private void handleShowWorkloadSchedPolicy() { + ShowWorkloadSchedPolicyStmt showStmt = (ShowWorkloadSchedPolicyStmt) stmt; + List> workloadSchedInfo = Env.getCurrentEnv().getWorkloadSchedPolicyMgr().getShowPolicyInfo(); + resultSet = new ShowResultSet(showStmt.getMetaData(), workloadSchedInfo); + } + private void handleShowExport() throws AnalysisException { ShowExportStmt showExportStmt = (ShowExportStmt) stmt; Env env = Env.getCurrentEnv(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java index 4bb30b118df357..2285db603b074f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java @@ -57,7 +57,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Objects; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -433,6 +432,19 @@ public List> getResourcesInfo(TUserIdentity tcurrentUserIdentity) { return procNode.fetchResult(currentUserIdentity).getRows(); } + public Long getWorkloadGroupIdByName(String name) { + readLock(); + try { + WorkloadGroup wg = nameToWorkloadGroup.get(name); + if (wg == null) { + return null; + } + return wg.getId(); + } finally { + readUnlock(); + } + } + // for ut public Map getNameToWorkloadGroup() { return nameToWorkloadGroup; diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadAction.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadAction.java new file mode 100644 index 00000000000000..d9298781476173 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadAction.java @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.resource.workloadschedpolicy; + +import org.apache.doris.common.UserException; + +public interface WorkloadAction { + + void exec(WorkloadQueryInfo queryInfo); + + WorkloadActionType getWorkloadActionType(); + + // NOTE(wb) currently createPolicyAction is also used when replay meta, it better not contains heavy check + static WorkloadAction createWorkloadAction(WorkloadActionMeta workloadActionMeta) + throws UserException { + if (WorkloadActionType.CANCEL_QUERY.equals(workloadActionMeta.action)) { + return WorkloadActionCancelQuery.createWorkloadAction(); + } else if (WorkloadActionType.MOVE_QUERY_TO_GROUP.equals(workloadActionMeta.action)) { + return WorkloadActionMoveQueryToGroup.createWorkloadAction(workloadActionMeta.actionArgs); + } else if (WorkloadActionType.SET_SESSION_VARIABLE.equals(workloadActionMeta.action)) { + return WorkloadActionSetSessionVar.createWorkloadAction(workloadActionMeta.actionArgs); + } + throw new UserException("invalid action type " + workloadActionMeta.action); + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionCancelQuery.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionCancelQuery.java new file mode 100644 index 00000000000000..2dcff6075f4d74 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionCancelQuery.java @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.resource.workloadschedpolicy; + +import org.apache.doris.qe.QeProcessorImpl; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class WorkloadActionCancelQuery implements WorkloadAction { + + private static final Logger LOG = LogManager.getLogger(WorkloadActionCancelQuery.class); + + @Override + public void exec(WorkloadQueryInfo queryInfo) { + if (queryInfo.context != null && !queryInfo.context.isKilled() + && queryInfo.tUniqueId != null + && QeProcessorImpl.INSTANCE.getCoordinator(queryInfo.tUniqueId) != null) { + LOG.info("cancel query {} triggered by query schedule policy.", queryInfo.queryId); + queryInfo.context.cancelQuery(); + } + } + + public static WorkloadActionCancelQuery createWorkloadAction() { + return new WorkloadActionCancelQuery(); + } + + @Override + public WorkloadActionType getWorkloadActionType() { + return WorkloadActionType.CANCEL_QUERY; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMeta.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMeta.java new file mode 100644 index 00000000000000..776c0bccfdc4ed --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMeta.java @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.resource.workloadschedpolicy; + +import org.apache.doris.common.UserException; + +import com.google.gson.annotations.SerializedName; + +public class WorkloadActionMeta { + + @SerializedName(value = "action") + public WorkloadActionType action; + + @SerializedName(value = "actionArgs") + public String actionArgs; + + public WorkloadActionMeta(String workloadAction, String actionArgs) throws UserException { + this.action = getWorkloadActionType(workloadAction); + this.actionArgs = actionArgs; + } + + static WorkloadActionType getWorkloadActionType(String strType) throws UserException { + if (WorkloadActionType.CANCEL_QUERY.toString().equalsIgnoreCase(strType)) { + return WorkloadActionType.CANCEL_QUERY; + } else if (WorkloadActionType.MOVE_QUERY_TO_GROUP.toString().equalsIgnoreCase(strType)) { + return WorkloadActionType.MOVE_QUERY_TO_GROUP; + } else if (WorkloadActionType.SET_SESSION_VARIABLE.toString().equalsIgnoreCase(strType)) { + return WorkloadActionType.SET_SESSION_VARIABLE; + } + throw new UserException("invalid action type " + strType); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMoveQueryToGroup.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMoveQueryToGroup.java new file mode 100644 index 00000000000000..59e09345fa9849 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMoveQueryToGroup.java @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.resource.workloadschedpolicy; + +import org.apache.doris.common.publish.WorkloadActionPublishThread; +import org.apache.doris.qe.QeProcessorImpl; +import org.apache.doris.thrift.TTopicInfoType; +import org.apache.doris.thrift.TWorkloadMoveQueryToGroupAction; +import org.apache.doris.thrift.TopicInfo; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class WorkloadActionMoveQueryToGroup implements WorkloadAction { + + private static final Logger LOG = LogManager.getLogger(WorkloadActionMoveQueryToGroup.class); + + private long dstWgId; + + public WorkloadActionMoveQueryToGroup(long dstWgId) { + this.dstWgId = dstWgId; + } + + @Override + public void exec(WorkloadQueryInfo queryInfo) { + if (queryInfo.context != null && !queryInfo.context.isKilled() + && queryInfo.tUniqueId != null + && QeProcessorImpl.INSTANCE.getCoordinator(queryInfo.tUniqueId) != null) { + LOG.info("try move query {} to group {}", queryInfo.queryId, dstWgId); + + TWorkloadMoveQueryToGroupAction moveQueryToGroupAction = new TWorkloadMoveQueryToGroupAction(); + moveQueryToGroupAction.setQueryId(queryInfo.tUniqueId); + moveQueryToGroupAction.setWorkloadGroupId(dstWgId); + + TopicInfo topicInfo = new TopicInfo(); + topicInfo.setMoveAction(moveQueryToGroupAction); + + WorkloadActionPublishThread.putWorkloadAction(TTopicInfoType.MOVE_QUERY_TO_GROUP, topicInfo); + } + } + + @Override + public WorkloadActionType getWorkloadActionType() { + return WorkloadActionType.MOVE_QUERY_TO_GROUP; + } + + public static WorkloadActionMoveQueryToGroup createWorkloadAction(String groupId) { + long wgId = Long.parseLong(groupId); + return new WorkloadActionMoveQueryToGroup(wgId); + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionSetSessionVar.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionSetSessionVar.java new file mode 100644 index 00000000000000..18ba64b34963e2 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionSetSessionVar.java @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.resource.workloadschedpolicy; + +import org.apache.doris.analysis.SetStmt; +import org.apache.doris.analysis.SetVar; +import org.apache.doris.analysis.StringLiteral; +import org.apache.doris.common.UserException; +import org.apache.doris.qe.SetExecutor; + +import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.List; + +public class WorkloadActionSetSessionVar implements WorkloadAction { + + private static final Logger LOG = LogManager.getLogger(WorkloadActionSetSessionVar.class); + + private String varName; + private String varValue; + + public WorkloadActionSetSessionVar(String varName, String varValue) { + this.varName = varName; + this.varValue = varValue; + } + + @Override + public void exec(WorkloadQueryInfo queryInfo) { + try { + List list = new ArrayList<>(); + SetVar sv = new SetVar(varName, new StringLiteral(varValue)); + list.add(sv); + SetStmt setStmt = new SetStmt(list); + SetExecutor executor = new SetExecutor(queryInfo.context, setStmt); + executor.execute(); + } catch (Throwable t) { + LOG.error("error happens when exec {}", WorkloadActionType.SET_SESSION_VARIABLE, t); + } + } + + @Override + public WorkloadActionType getWorkloadActionType() { + return WorkloadActionType.SET_SESSION_VARIABLE; + } + + public String getVarName() { + return varName; + } + + public static WorkloadAction createWorkloadAction(String actionCmdArgs) throws UserException { + String[] strs = actionCmdArgs.split("="); + if (strs.length != 2 || StringUtils.isEmpty(strs[0].trim()) || StringUtils.isEmpty(strs[1].trim())) { + throw new UserException("illegal arguments, it should be like set_session_variable \"xxx=xxx\""); + } + return new WorkloadActionSetSessionVar(strs[0].trim(), strs[1].trim()); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionType.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionType.java new file mode 100644 index 00000000000000..e80c202a763f94 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionType.java @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.resource.workloadschedpolicy; + +public enum WorkloadActionType { + CANCEL_QUERY, // cancel query + MOVE_QUERY_TO_GROUP, // move query from one wg group to another + SET_SESSION_VARIABLE +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java new file mode 100644 index 00000000000000..1f75d81794f4b5 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.resource.workloadschedpolicy; + +import org.apache.doris.common.UserException; + + +public interface WorkloadCondition { + + boolean eval(String strValue); + + WorkloadMetricType getMetricType(); + + // NOTE(wb) currently createPolicyCondition is also used when replay meta, it better not contains heavy check + static WorkloadCondition createWorkloadCondition(WorkloadConditionMeta cm) + throws UserException { + if (WorkloadMetricType.USERNAME.equals(cm.metricName)) { + return WorkloadConditionUsername.createWorkloadCondition(cm.op, cm.value); + } else if (WorkloadMetricType.QUERY_TIME.equals(cm.metricName)) { + return WorkloadConditionQueryTime.createWorkloadCondition(cm.op, cm.value); + } + throw new UserException("invalid metric name:" + cm.metricName); + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionCompareUtils.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionCompareUtils.java new file mode 100644 index 00000000000000..8aa53a6f340abc --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionCompareUtils.java @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.resource.workloadschedpolicy; + +import org.apache.doris.common.UserException; + +import org.apache.commons.lang3.StringUtils; + +public class WorkloadConditionCompareUtils { + + static WorkloadConditionOperator getOperator(String op) throws UserException { + if ("=".equals(op)) { + return WorkloadConditionOperator.EQUAL; + } else if (">".equals(op)) { + return WorkloadConditionOperator.GREATER; + } else if (">=".equals(op)) { + return WorkloadConditionOperator.GREATER_EQUAL; + } else if ("<".equals(op)) { + return WorkloadConditionOperator.LESS; + } else if ("<=".equals(op)) { + return WorkloadConditionOperator.LESS_EQUAl; + } else { + throw new UserException("unexpected compare operator " + op); + } + } + + static boolean compareInteger(WorkloadConditionOperator operator, long firstArgs, long secondArgs) { + switch (operator) { + case EQUAL: + return firstArgs == secondArgs; + case GREATER: + return firstArgs > secondArgs; + case GREATER_EQUAL: + return firstArgs >= secondArgs; + case LESS: + return firstArgs < secondArgs; + case LESS_EQUAl: + return firstArgs <= secondArgs; + default: + throw new RuntimeException("unexpected integer operator " + operator); + } + } + + static boolean compareDouble(WorkloadConditionOperator operator, double firstArgs, double secondArgs) { + switch (operator) { + case EQUAL: + return firstArgs == secondArgs; + case GREATER: + return firstArgs > secondArgs; + case GREATER_EQUAL: + return firstArgs >= secondArgs; + case LESS: + return firstArgs < secondArgs; + case LESS_EQUAl: + return firstArgs <= secondArgs; + default: + throw new RuntimeException("unexpected compare double operator " + operator); + } + } + + static boolean compareString(WorkloadConditionOperator operator, String firstArgs, String secondArgs) { + switch (operator) { + case EQUAL: + return StringUtils.equals(firstArgs, secondArgs); + default: + throw new RuntimeException("unexpected compare string operator " + operator); + } + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java new file mode 100644 index 00000000000000..c6bfb526b9b035 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.resource.workloadschedpolicy; + +import org.apache.doris.common.UserException; + +import com.google.gson.annotations.SerializedName; + + +public class WorkloadConditionMeta { + + @SerializedName(value = "metricName") + public WorkloadMetricType metricName; + + @SerializedName(value = "op") + public WorkloadConditionOperator op; + + @SerializedName(value = "value") + public String value; + + public WorkloadConditionMeta(String metricName, String op, String value) throws UserException { + this.metricName = getMetricType(metricName); + this.op = WorkloadConditionCompareUtils.getOperator(op); + this.value = value; + } + + private static WorkloadMetricType getMetricType(String metricStr) throws UserException { + if (WorkloadMetricType.USERNAME.toString().equalsIgnoreCase(metricStr)) { + return WorkloadMetricType.USERNAME; + } else if (WorkloadMetricType.QUERY_TIME.toString().equalsIgnoreCase(metricStr)) { + return WorkloadMetricType.QUERY_TIME; + } + throw new UserException("invalid metric name:" + metricStr); + } + + public String toString() { + return metricName + " " + op + " " + value; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionOperator.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionOperator.java new file mode 100644 index 00000000000000..c659218fd75e63 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionOperator.java @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.resource.workloadschedpolicy; + +public enum WorkloadConditionOperator { + EQUAL, GREATER, GREATER_EQUAL, LESS, LESS_EQUAl +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionQueryTime.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionQueryTime.java new file mode 100644 index 00000000000000..e61484508df166 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionQueryTime.java @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.resource.workloadschedpolicy; + +import org.apache.doris.common.UserException; + +public class WorkloadConditionQueryTime implements WorkloadCondition { + + private long value; + private WorkloadConditionOperator op; + + public WorkloadConditionQueryTime(WorkloadConditionOperator op, long value) { + this.op = op; + this.value = value; + } + + @Override + public boolean eval(String strValue) { + long inputLongValue = Long.parseLong(strValue); + return WorkloadConditionCompareUtils.compareInteger(op, inputLongValue, value); + } + + public static WorkloadConditionQueryTime createWorkloadCondition(WorkloadConditionOperator op, String value) + throws UserException { + long longValue = Long.parseLong(value); + if (longValue < 0) { + throw new UserException("invalid query time value, " + longValue + ", it requires >= 0"); + } + return new WorkloadConditionQueryTime(op, longValue); + } + + @Override + public WorkloadMetricType getMetricType() { + return WorkloadMetricType.QUERY_TIME; + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionUsername.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionUsername.java new file mode 100644 index 00000000000000..8b7dfacdc7d984 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionUsername.java @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.resource.workloadschedpolicy; + +import org.apache.hbase.thirdparty.com.google.gson.annotations.SerializedName; + +public class WorkloadConditionUsername implements WorkloadCondition { + + @SerializedName(value = "op") + private WorkloadConditionOperator op; + @SerializedName(value = "value") + private String value; + + public WorkloadConditionUsername(WorkloadConditionOperator op, String value) { + this.op = op; + this.value = value; + } + + @Override + public boolean eval(String inputStrValue) { + return WorkloadConditionCompareUtils.compareString(op, inputStrValue, value); + } + + @Override + public WorkloadMetricType getMetricType() { + return WorkloadMetricType.USERNAME; + } + + public static WorkloadConditionUsername createWorkloadCondition(WorkloadConditionOperator op, String value) { + // todo(wb) check whether input username is valid + return new WorkloadConditionUsername(op, value); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java new file mode 100644 index 00000000000000..89dfde9eba38a5 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.resource.workloadschedpolicy; + +public enum WorkloadMetricType { + USERNAME, QUERY_TIME +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadQueryInfo.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadQueryInfo.java new file mode 100644 index 00000000000000..27d821c32c0d11 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadQueryInfo.java @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.resource.workloadschedpolicy; + +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.thrift.TUniqueId; + +import java.util.Map; + +public class WorkloadQueryInfo { + String queryId = null; + TUniqueId tUniqueId = null; + ConnectContext context = null; + Map metricMap; +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java new file mode 100644 index 00000000000000..7186d4409a55cb --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java @@ -0,0 +1,203 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.resource.workloadschedpolicy; + +import org.apache.doris.common.UserException; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonPostProcessable; +import org.apache.doris.persist.gson.GsonUtils; + +import com.esotericsoftware.minlog.Log; +import com.google.common.collect.ImmutableSet; +import com.google.gson.annotations.SerializedName; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class WorkloadSchedPolicy implements Writable, GsonPostProcessable { + + public static final String ENABLED = "enabled"; + public static final String PRIORITY = "priority"; + public static final ImmutableSet POLICY_PROPERTIES = new ImmutableSet.Builder() + .add(ENABLED).add(PRIORITY).build(); + + @SerializedName(value = "id") + long id; + @SerializedName(value = "name") + String name; + @SerializedName(value = "version") + int version; + + @SerializedName(value = "enabled") + private volatile boolean enabled; + @SerializedName(value = "priority") + private volatile int priority; + + @SerializedName(value = "conditionMetaList") + List conditionMetaList; + // we regard action as a command, map's key is command, map's value is args, so it's a command list + @SerializedName(value = "actionMetaList") + List actionMetaList; + + private List workloadConditionList; + private List workloadActionList; + + public WorkloadSchedPolicy(long id, String name, List workloadConditionList, + List workloadActionList, Map properties) throws UserException { + this.id = id; + this.name = name; + this.workloadConditionList = workloadConditionList; + this.workloadActionList = workloadActionList; + // set enable and priority + parseAndSetProperties(properties); + this.version = 0; + } + + // return true, this means all conditions in policy can match queryInfo's data + // return false, + // 1 metric not match + // 2 condition value not match query info's value + boolean isMatch(WorkloadQueryInfo queryInfo) { + for (WorkloadCondition condition : workloadConditionList) { + WorkloadMetricType metricType = condition.getMetricType(); + String value = queryInfo.metricMap.get(metricType); + if (value == null) { + return false; // query info's metric must match all condition's metric + } + if (!condition.eval(value)) { + return false; + } + } + return true; + } + + public boolean isEnabled() { + return enabled; + } + + public int getPriority() { + return priority; + } + + public void execAction(WorkloadQueryInfo queryInfo) { + for (WorkloadAction action : workloadActionList) { + action.exec(queryInfo); + } + } + + // move > log, cancel > log + // move and cancel can not exist at same time + public WorkloadActionType getFirstActionType() { + WorkloadActionType retType = null; + for (WorkloadAction action : workloadActionList) { + WorkloadActionType currentActionType = action.getWorkloadActionType(); + if (retType == null) { + retType = currentActionType; + continue; + } + + if (currentActionType == WorkloadActionType.MOVE_QUERY_TO_GROUP + || currentActionType == WorkloadActionType.CANCEL_QUERY) { + return currentActionType; + } + } + return retType; + } + + public void parseAndSetProperties(Map properties) throws UserException { + String enabledStr = properties.get(ENABLED); + this.enabled = enabledStr == null ? true : Boolean.parseBoolean(enabledStr); + + String priorityStr = properties.get(PRIORITY); + this.priority = priorityStr == null ? 0 : Integer.parseInt(priorityStr); + } + + void incrementVersion() { + this.version++; + } + + public void setConditionMeta(List conditionMeta) { + this.conditionMetaList = conditionMeta; + } + + public void setActionMeta(List actionMeta) { + this.actionMetaList = actionMeta; + } + + public long getId() { + return id; + } + + public String getName() { + return name; + } + + public long getVersion() { + return version; + } + + public List getConditionMetaList() { + return conditionMetaList; + } + + public List getActionMetaList() { + return actionMetaList; + } + + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); + } + + public static WorkloadSchedPolicy read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, WorkloadSchedPolicy.class); + } + + @Override + public void gsonPostProcess() throws IOException { + List policyConditionList = new ArrayList<>(); + for (WorkloadConditionMeta cm : conditionMetaList) { + try { + WorkloadCondition cond = WorkloadCondition.createWorkloadCondition(cm); + policyConditionList.add(cond); + } catch (UserException ue) { + Log.error("unexpected condition data error when replay log ", ue); + } + } + this.workloadConditionList = policyConditionList; + + List actionList = new ArrayList<>(); + for (WorkloadActionMeta actionMeta : actionMetaList) { + try { + WorkloadAction ret = WorkloadAction.createWorkloadAction(actionMeta); + actionList.add(ret); + } catch (UserException ue) { + Log.error("unexpected action data error when replay log ", ue); + } + } + this.workloadActionList = actionList; + + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java new file mode 100644 index 00000000000000..9e2e4cd91afae7 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java @@ -0,0 +1,535 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.resource.workloadschedpolicy; + +import org.apache.doris.analysis.AlterWorkloadSchedPolicyStmt; +import org.apache.doris.analysis.CreateWorkloadSchedPolicyStmt; +import org.apache.doris.analysis.DropWorkloadSchedPolicyStmt; +import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; +import org.apache.doris.common.UserException; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.common.proc.BaseProcResult; +import org.apache.doris.common.proc.ProcResult; +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.persist.gson.GsonPostProcessable; +import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.service.ExecuteEnv; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; +import com.google.gson.annotations.SerializedName; +import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable { + + private static final Logger LOG = LogManager.getLogger(WorkloadSchedPolicyMgr.class); + + @SerializedName(value = "idToPolicy") + private Map idToPolicy = Maps.newConcurrentMap(); + private Map nameToPolicy = Maps.newHashMap(); + + private PolicyProcNode policyProcNode = new PolicyProcNode(); + + public static final ImmutableList WORKLOAD_SCHED_POLICY_NODE_TITLE_NAMES + = new ImmutableList.Builder() + .add("Id").add("Name").add("ItemName").add("ItemValue") + .build(); + + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + + public static Comparator policyComparator = new Comparator() { + @Override + public int compare(WorkloadSchedPolicy p1, WorkloadSchedPolicy p2) { + return p2.getPriority() - p1.getPriority(); + } + }; + + private Thread policyExecThread = new Thread() { + + @Override + public void run() { + while (true) { + try { + // todo(wb) add more query info source, not only comes from connectionmap + // 1 get query info map + Map connectMap = ExecuteEnv.getInstance().getScheduler() + .getConnectionMap(); + List queryInfoList = new ArrayList<>(); + + // a snapshot for connect context + Set keySet = new HashSet<>(); + keySet.addAll(connectMap.keySet()); + + for (Integer connectId : keySet) { + ConnectContext cctx = connectMap.get(connectId); + if (cctx == null || cctx.isKilled()) { + continue; + } + + String username = cctx.getQualifiedUser(); + long queryTime = System.currentTimeMillis() - cctx.getStartTime(); + + WorkloadQueryInfo policyQueryInfo = new WorkloadQueryInfo(); + policyQueryInfo.queryId = cctx.queryId() == null ? null : DebugUtil.printId(cctx.queryId()); + policyQueryInfo.tUniqueId = cctx.queryId(); + policyQueryInfo.context = cctx; + policyQueryInfo.metricMap = new HashMap<>(); + policyQueryInfo.metricMap.put(WorkloadMetricType.USERNAME, username); + policyQueryInfo.metricMap.put(WorkloadMetricType.QUERY_TIME, String.valueOf(queryTime)); + + queryInfoList.add(policyQueryInfo); + } + + // 2 exec policy + if (queryInfoList.size() > 0) { + execPolicy(queryInfoList); + } + } catch (Throwable t) { + LOG.error("[policy thread]error happens when exec policy"); + } + + // 3 sleep + try { + Thread.sleep(Config.workload_sched_policy_interval_ms); + } catch (InterruptedException e) { + LOG.error("error happends when policy exec thread sleep"); + } + } + } + }; + + public void start() { + policyExecThread.setName("workload-auto-scheduler-thread"); + policyExecThread.start(); + } + + public void createWorkloadSchedPolicy(CreateWorkloadSchedPolicyStmt createStmt) throws UserException { + String policyName = createStmt.getPolicyName(); + + // 1 create condition + List originConditions = createStmt.getConditions(); + List policyConditionList = new ArrayList<>(); + for (WorkloadConditionMeta cm : originConditions) { + WorkloadCondition cond = WorkloadCondition.createWorkloadCondition(cm); + policyConditionList.add(cond); + } + + // 2 create action + List originActions = createStmt.getActions(); + List policyActionList = new ArrayList<>(); + for (WorkloadActionMeta workloadActionMeta : originActions) { + WorkloadActionType actionName = workloadActionMeta.action; + String actionArgs = workloadActionMeta.actionArgs; + + // we need convert wgName to wgId, because wgName may change + if (WorkloadActionType.MOVE_QUERY_TO_GROUP.equals(actionName)) { + Long wgId = Env.getCurrentEnv().getWorkloadGroupMgr().getWorkloadGroupIdByName(actionArgs); + if (wgId == null) { + throw new UserException( + "can not find workload group " + actionArgs + " when set workload sched policy"); + } + workloadActionMeta.actionArgs = wgId.toString(); + } + + WorkloadAction ret = WorkloadAction.createWorkloadAction(workloadActionMeta); + policyActionList.add(ret); + } + checkPolicyActionConflicts(policyActionList); + + // 3 create policy + Map propMap = createStmt.getProperties(); + if (propMap == null) { + propMap = new HashMap<>(); + } + if (propMap.size() != 0) { + checkProperties(propMap); + } + writeLock(); + try { + if (nameToPolicy.containsKey(createStmt.getPolicyName())) { + if (createStmt.isIfNotExists()) { + return; + } else { + throw new UserException("workload schedule policy " + policyName + " already exists "); + } + } + long id = Env.getCurrentEnv().getNextId(); + WorkloadSchedPolicy policy = new WorkloadSchedPolicy(id, policyName, + policyConditionList, policyActionList, propMap); + policy.setConditionMeta(originConditions); + policy.setActionMeta(originActions); + Env.getCurrentEnv().getEditLog().logCreateWorkloadSchedPolicy(policy); + idToPolicy.put(id, policy); + nameToPolicy.put(policyName, policy); + } finally { + writeUnlock(); + } + } + + private void checkPolicyActionConflicts(List actionList) throws UserException { + Set actionTypeSet = new HashSet<>(); + Set setSessionVarSet = new HashSet<>(); + for (WorkloadAction action : actionList) { + // set session var cmd can be duplicate, but args can not be duplicate + if (action.getWorkloadActionType().equals(WorkloadActionType.SET_SESSION_VARIABLE)) { + WorkloadActionSetSessionVar setAction = (WorkloadActionSetSessionVar) action; + if (!setSessionVarSet.add(setAction.getVarName())) { + throw new UserException( + "duplicate set_session_variable action args one policy, " + setAction.getVarName()); + } + } else if (!actionTypeSet.add(action.getWorkloadActionType())) { + throw new UserException("duplicate action in one policy"); + } + } + + if (actionTypeSet.contains(WorkloadActionType.CANCEL_QUERY) && actionTypeSet.contains( + WorkloadActionType.MOVE_QUERY_TO_GROUP)) { + throw new UserException(String.format("%s and %s can not exist in one policy at same time", + WorkloadActionType.CANCEL_QUERY, WorkloadActionType.MOVE_QUERY_TO_GROUP)); + } + } + + public void execPolicy(List queryInfoList) { + // 1 get a snapshot of policy + Set policyIdSet = new HashSet<>(); + readLock(); + try { + policyIdSet.addAll(idToPolicy.keySet()); + } finally { + readUnlock(); + } + + for (WorkloadQueryInfo queryInfo : queryInfoList) { + try { + // 1 check policy is match + Map> matchedPolicyMap = Maps.newHashMap(); + for (Long policyId : policyIdSet) { + WorkloadSchedPolicy policy = idToPolicy.get(policyId); + if (policy == null) { + continue; + } + if (policy.isEnabled() && policy.isMatch(queryInfo)) { + WorkloadActionType actionType = policy.getFirstActionType(); + // add to priority queue + Queue queue = matchedPolicyMap.get(actionType); + if (queue == null) { + queue = new PriorityQueue<>(policyComparator); + matchedPolicyMap.put(actionType, queue); + } + queue.offer(policy); + } + } + + if (matchedPolicyMap.size() == 0) { + continue; + } + + // 2 pick higher priority policy when action conflicts + List pickedPolicyList = pickPolicy(matchedPolicyMap); + + // 3 exec action + for (WorkloadSchedPolicy policy : pickedPolicyList) { + policy.execAction(queryInfo); + } + } catch (Throwable e) { + LOG.warn("exec policy with query {} failed ", queryInfo.queryId, e); + } + } + } + + List pickPolicy(Map> policyMap) { + // NOTE(wb) currently all action share the same comparator which use priority. + // But later we may design every action type's own comparator, + // such as if two move group action has the same priority but move to different group, + // then we may pick group by resource usage and query statistics. + + // 1 only need one policy with move action which has the highest priority + WorkloadSchedPolicy policyWithMoveAction = null; + Queue moveActionQueue = policyMap.get(WorkloadActionType.MOVE_QUERY_TO_GROUP); + if (moveActionQueue != null) { + policyWithMoveAction = moveActionQueue.peek(); + } + + // 2 only need one policy with cancel action which has the highest priority + WorkloadSchedPolicy policyWithCancelQueryAction = null; + Queue canelQueryActionQueue = policyMap.get(WorkloadActionType.CANCEL_QUERY); + if (canelQueryActionQueue != null) { + policyWithCancelQueryAction = canelQueryActionQueue.peek(); + } + + // 3 compare policy with move action and cancel action + List ret = new ArrayList<>(); + if (policyWithMoveAction != null && policyWithCancelQueryAction != null) { + if (policyWithMoveAction.getPriority() > policyWithCancelQueryAction.getPriority()) { + ret.add(policyWithMoveAction); + } else { + ret.add(policyWithCancelQueryAction); + } + } else { + if (policyWithCancelQueryAction != null) { + ret.add(policyWithCancelQueryAction); + } else if (policyWithMoveAction != null) { + ret.add(policyWithMoveAction); + } + } + + // 4 add no-conflict policy + for (Map.Entry> entry : policyMap.entrySet()) { + WorkloadActionType type = entry.getKey(); + Queue policyQueue = entry.getValue(); + if (!WorkloadActionType.CANCEL_QUERY.equals(type) && !WorkloadActionType.MOVE_QUERY_TO_GROUP.equals(type) + && policyQueue != null && policyQueue.size() > 0) { + WorkloadSchedPolicy pickedPolicy = policyQueue.peek(); + ret.add(pickedPolicy); + } + } + + Preconditions.checkArgument(ret.size() > 0, "should pick at least one policy"); + return ret; + } + + private void checkProperties(Map properties) throws UserException { + Set allInputPropKeySet = new HashSet<>(); + allInputPropKeySet.addAll(properties.keySet()); + + allInputPropKeySet.removeAll(WorkloadSchedPolicy.POLICY_PROPERTIES); + if (allInputPropKeySet.size() > 0) { + throw new UserException("illegal policy properties " + String.join(",", allInputPropKeySet)); + } + + String enabledStr = properties.get(WorkloadSchedPolicy.ENABLED); + if (enabledStr != null) { + if (!"true".equals(enabledStr) && !"false".equals(enabledStr)) { + throw new UserException("invalid enabled property value, it can only true or false with lower case"); + } + } + + String priorityStr = properties.get(WorkloadSchedPolicy.PRIORITY); + if (priorityStr != null) { + try { + Long prioLongVal = Long.parseLong(priorityStr); + if (prioLongVal < 0 || prioLongVal > 100) { + throw new UserException("policy's priority can only between 0 ~ 100"); + } + } catch (NumberFormatException e) { + throw new UserException("policy's priority must be a number, input value=" + priorityStr); + } + } + } + + public void alterWorkloadSchedPolicy(AlterWorkloadSchedPolicyStmt alterStmt) throws UserException { + writeLock(); + try { + String policyName = alterStmt.getPolicyName(); + WorkloadSchedPolicy policy = nameToPolicy.get(policyName); + if (policy == null) { + throw new UserException("can not find workload schedule policy " + policyName); + } + + Map properties = alterStmt.getProperties(); + checkProperties(properties); + policy.parseAndSetProperties(properties); + policy.incrementVersion(); + Env.getCurrentEnv().getEditLog().logAlterWorkloadSchedPolicy(policy); + } finally { + writeUnlock(); + } + } + + public void dropWorkloadSchedPolicy(DropWorkloadSchedPolicyStmt dropStmt) throws UserException { + writeLock(); + try { + String policyName = dropStmt.getPolicyName(); + WorkloadSchedPolicy schedPolicy = nameToPolicy.get(policyName); + if (schedPolicy == null) { + if (dropStmt.isIfExists()) { + return; + } else { + throw new UserException("workload schedule policy " + policyName + " not exists"); + } + } + + long id = schedPolicy.getId(); + idToPolicy.remove(id); + nameToPolicy.remove(policyName); + Env.getCurrentEnv().getEditLog().dropWorkloadSchedPolicy(id); + } finally { + writeUnlock(); + } + } + + private void readLock() { + lock.readLock().lock(); + } + + private void readUnlock() { + lock.readLock().unlock(); + } + + private void writeLock() { + lock.writeLock().lock(); + } + + private void writeUnlock() { + lock.writeLock().unlock(); + } + + public void replayCreateWorkloadSchedPolicy(WorkloadSchedPolicy policy) { + insertWorkloadSchedPolicy(policy); + } + + public void replayAlterWorkloadSchedPolicy(WorkloadSchedPolicy policy) { + insertWorkloadSchedPolicy(policy); + } + + public void replayDropWorkloadSchedPolicy(long policyId) { + writeLock(); + try { + WorkloadSchedPolicy policy = idToPolicy.get(policyId); + if (policy == null) { + return; + } + idToPolicy.remove(policyId); + nameToPolicy.remove(policy.getName()); + } finally { + writeUnlock(); + } + } + + private void insertWorkloadSchedPolicy(WorkloadSchedPolicy policy) { + writeLock(); + try { + idToPolicy.put(policy.getId(), policy); + nameToPolicy.put(policy.getName(), policy); + } finally { + writeUnlock(); + } + } + + public List> getShowPolicyInfo() { + UserIdentity currentUserIdentity = ConnectContext.get().getCurrentUserIdentity(); + return policyProcNode.fetchResult(currentUserIdentity).getRows(); + } + + public class PolicyProcNode { + public ProcResult fetchResult(UserIdentity currentUserIdentity) { + BaseProcResult result = new BaseProcResult(); + result.setNames(WORKLOAD_SCHED_POLICY_NODE_TITLE_NAMES); + readLock(); + try { + for (WorkloadSchedPolicy policy : idToPolicy.values()) { + if (!Env.getCurrentEnv().getAccessManager().checkWorkloadGroupPriv(currentUserIdentity, + policy.getName(), PrivPredicate.SHOW_WORKLOAD_GROUP)) { + continue; + } + + String pId = String.valueOf(policy.getId()); + String pName = policy.getName(); + + List conditionList = policy.getConditionMetaList(); + for (WorkloadConditionMeta cm : conditionList) { + List condRow = new ArrayList<>(); + condRow.add(pId); + condRow.add(pName); + condRow.add("condition"); + condRow.add(cm.toString()); + result.addRow(condRow); + } + + List actionList = policy.getActionMetaList(); + for (WorkloadActionMeta workloadActionMeta : actionList) { + List actionRow = new ArrayList<>(); + actionRow.add(pId); + actionRow.add(pName); + actionRow.add("action"); + if (StringUtils.isEmpty(workloadActionMeta.actionArgs)) { + actionRow.add(workloadActionMeta.action.toString()); + } else { + actionRow.add(workloadActionMeta.action + " " + workloadActionMeta.actionArgs); + } + result.addRow(actionRow); + } + + List prioRow = new ArrayList<>(); + prioRow.add(pId); + prioRow.add(pName); + prioRow.add("priority"); + prioRow.add(String.valueOf(policy.getPriority())); + result.addRow(prioRow); + + List enabledRow = new ArrayList<>(); + enabledRow.add(pId); + enabledRow.add(pName); + enabledRow.add("enabled"); + enabledRow.add(String.valueOf(policy.isEnabled())); + result.addRow(enabledRow); + + + List versionRow = new ArrayList<>(); + versionRow.add(pId); + versionRow.add(pName); + versionRow.add("version"); + versionRow.add(String.valueOf(policy.getVersion())); + result.addRow(versionRow); + } + } finally { + readUnlock(); + } + return result; + } + } + + public static WorkloadSchedPolicyMgr read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, WorkloadSchedPolicyMgr.class); + } + + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); + } + + @Override + public void gsonPostProcess() throws IOException { + idToPolicy.forEach( + (id, schedPolicy) -> nameToPolicy.put(schedPolicy.getName(), schedPolicy)); + } +} diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex b/fe/fe-core/src/main/jflex/sql_scanner.flex index d9bbd93859a2c0..955555c270972d 100644 --- a/fe/fe-core/src/main/jflex/sql_scanner.flex +++ b/fe/fe-core/src/main/jflex/sql_scanner.flex @@ -380,6 +380,9 @@ import org.apache.doris.qe.SqlModeHelper; keywordMap.put("processlist", new Integer(SqlParserSymbols.KW_PROCESSLIST)); keywordMap.put("profile", new Integer(SqlParserSymbols.KW_PROFILE)); keywordMap.put("properties", new Integer(SqlParserSymbols.KW_PROPERTIES)); + keywordMap.put("conditions", new Integer(SqlParserSymbols.KW_CONDITIONS)); + keywordMap.put("actions", new Integer(SqlParserSymbols.KW_ACTIONS)); + keywordMap.put("set_session_variable", new Integer(SqlParserSymbols.KW_SET_SESSION_VAR)); keywordMap.put("property", new Integer(SqlParserSymbols.KW_PROPERTY)); keywordMap.put("quantile_state", new Integer(SqlParserSymbols.KW_QUANTILE_STATE)); keywordMap.put("quantile_union", new Integer(SqlParserSymbols.KW_QUANTILE_UNION)); diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index 1f2e8185a660c7..855b74897114ef 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -162,6 +162,7 @@ struct TQueryIngestBinlogResult { enum TTopicInfoType { WORKLOAD_GROUP + MOVE_QUERY_TO_GROUP } struct TWorkloadGroupInfo { @@ -175,8 +176,14 @@ struct TWorkloadGroupInfo { 8: optional bool enable_cpu_hard_limit } +struct TWorkloadMoveQueryToGroupAction { + 1: optional Types.TUniqueId query_id + 2: optional i64 workload_group_id; +} + struct TopicInfo { 1: optional TWorkloadGroupInfo workload_group_info + 2: optional TWorkloadMoveQueryToGroupAction move_action } struct TPublishTopicRequest {