Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2268,6 +2268,12 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static int publish_topic_info_interval_ms = 30000; // 30s

@ConfField(mutable = true)
public static int workload_sched_policy_interval_ms = 10000; // 10s

@ConfField(mutable = true)
public static int workload_action_interval_ms = 10000; // 10s

@ConfField(description = {"查询be wal_queue 的超时阈值(ms)",
"the timeout threshold of checking wal_queue on be(ms)"})
public static int check_wal_queue_timeout_threshold = 180000; // 3 min
Expand Down
164 changes: 163 additions & 1 deletion fe/fe-core/src/main/cup/sql_parser.cup
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ import org.apache.doris.common.Version;
import org.apache.doris.mysql.MysqlPassword;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.policy.PolicyTypeEnum;
import org.apache.doris.resource.workloadschedpolicy.WorkloadConditionMeta;
import org.apache.doris.resource.workloadschedpolicy.WorkloadActionMeta;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -530,6 +532,9 @@ terminal String
KW_PROCESSLIST,
KW_PROFILE,
KW_PROPERTIES,
KW_CONDITIONS,
KW_ACTIONS,
KW_SET_SESSION_VAR,
KW_PROPERTY,
KW_QUANTILE_STATE,
KW_QUANTILE_UNION,
Expand Down Expand Up @@ -841,6 +846,8 @@ nonterminal SetVar option_value, option_value_follow_option_type, option_value_n
// List of set variable
nonterminal List<SetVar> option_value_list, option_value_list_continued, start_option_value_list,
start_option_value_list_following_option_type, user_property_list;
nonterminal List<WorkloadConditionMeta> workload_policy_condition_list, conditions, opt_conditions;
nonterminal List<WorkloadActionMeta> workload_policy_action_list, opt_actions, actions;

nonterminal Map<String, String> key_value_map, opt_key_value_map, opt_key_value_map_in_paren, opt_properties,
opt_ext_properties, opt_enable_feature_properties, properties;
Expand Down Expand Up @@ -963,6 +970,9 @@ nonterminal StorageBackend storage_backend;
nonterminal ArrayList<LockTable> opt_lock_tables_list;
nonterminal LockTable lock_table;

// workload policy/group
nonterminal String policy_condition_op, policy_condition_value;

precedence nonassoc COMMA;
precedence nonassoc STRING_LITERAL;
precedence nonassoc KW_COLUMNS;
Expand All @@ -986,7 +996,7 @@ precedence left BITAND, BITOR, BITXOR;
precedence left KW_PIPE;
precedence left BITNOT;
precedence left KW_ORDER, KW_BY, KW_LIMIT;
precedence right KW_PROPERTIES;
precedence right KW_PROPERTIES, KW_CONDITIONS, KW_ACTIONS;
precedence left LPAREN, RPAREN;
precedence left KW_PREPARE, KW_EXECUTE;
// Support chaining of timestamp arithmetic exprs.
Expand Down Expand Up @@ -1362,6 +1372,10 @@ alter_stmt ::=
{:
RESULT = new AlterWorkloadGroupStmt(workloadGroupName, properties);
:}
| KW_ALTER KW_WORKLOAD KW_SCHEDULE KW_POLICY ident_or_text:policyName opt_properties:properties
{:
RESULT = new AlterWorkloadSchedPolicyStmt(policyName, properties);
:}
| KW_ALTER KW_ROUTINE KW_LOAD KW_FOR job_label:jobLabel opt_properties:jobProperties
opt_datasource_properties:datasourceProperties
{:
Expand Down Expand Up @@ -1884,6 +1898,11 @@ create_stmt ::=
{:
RESULT = new CreateWorkloadGroupStmt(ifNotExists, workloadGroupName, properties);
:}
/* workload schedule policy */
| KW_CREATE KW_WORKLOAD KW_SCHEDULE KW_POLICY opt_if_not_exists:ifNotExists ident_or_text:workloadPolicyName opt_conditions:conditions opt_actions:actions opt_properties:properties
{:
RESULT = new CreateWorkloadSchedPolicyStmt(ifNotExists, workloadPolicyName, conditions, actions, properties);
:}
/* encryptkey */
| KW_CREATE KW_ENCRYPTKEY opt_if_not_exists:ifNotExists encryptkey_name:keyName KW_AS STRING_LITERAL:keyString
{:
Expand Down Expand Up @@ -2987,6 +3006,10 @@ drop_stmt ::=
{:
RESULT = new DropWorkloadGroupStmt(ifExists, workloadGroupName);
:}
| KW_DROP KW_WORKLOAD KW_SCHEDULE KW_POLICY opt_if_exists:ifExists ident_or_text:policyName
{:
RESULT = new DropWorkloadSchedPolicyStmt(ifExists, policyName);
:}
| KW_DROP KW_ENCRYPTKEY opt_if_exists:ifExists encryptkey_name:keyName
{:
RESULT = new DropEncryptKeyStmt(ifExists, keyName);
Expand Down Expand Up @@ -3455,6 +3478,135 @@ opt_properties ::=
:}
;

policy_condition_op ::=
EQUAL
{:
RESULT = "=";
:}
| GREATERTHAN
{:
RESULT = ">";
:}
| LESSTHAN
{:
RESULT = "<";
:}
| GREATERTHAN EQUAL
{:
RESULT = ">=";
:}
| LESSTHAN EQUAL
{:
RESULT = "<=";
:};

policy_condition_value ::=
INTEGER_LITERAL:val
{:
RESULT = String.valueOf(val);
:}
| STRING_LITERAL:val
{:
RESULT = val;
:}
| FLOATINGPOINT_LITERAL:val
{:
RESULT = String.valueOf(val);
:}
| DECIMAL_LITERAL:val
{:
RESULT = val.toString();
:}
;

workload_policy_condition_list ::=
ident:metricName policy_condition_op:op policy_condition_value:value
{:
RESULT = new ArrayList<>();
WorkloadConditionMeta cm = new WorkloadConditionMeta(metricName, op, value);
RESULT.add(cm);
:}
| workload_policy_condition_list:list COMMA ident:metricName policy_condition_op:op policy_condition_value:value
{:
WorkloadConditionMeta cm = new WorkloadConditionMeta(metricName, op, value);
list.add(cm);
RESULT = list;
:};

conditions ::=
KW_CONDITIONS LPAREN workload_policy_condition_list:list RPAREN
{:
RESULT = list;
:}
;

opt_conditions ::=
{:
RESULT = null;
:}
| conditions:conditions
{:
RESULT = conditions;
:}
;

workload_policy_action_list ::=
KW_SET_SESSION_VAR STRING_LITERAL:args
{:
RESULT = Lists.newArrayList();
WorkloadActionMeta wam = new WorkloadActionMeta("set_session_variable", args);
RESULT.add(wam);
:}
| workload_policy_action_list:list COMMA KW_SET_SESSION_VAR STRING_LITERAL:args
{:
WorkloadActionMeta wam = new WorkloadActionMeta("set_session_variable", args);
list.add(wam);
RESULT = list;
:}
| ident:firstArgs
{:
RESULT = Lists.newArrayList();
WorkloadActionMeta wam = new WorkloadActionMeta(firstArgs, "");
RESULT.add(wam);
:}
| ident:firstArgs STRING_LITERAL:secondArgs
{:
RESULT = Lists.newArrayList();
WorkloadActionMeta wam = new WorkloadActionMeta(firstArgs, secondArgs);
RESULT.add(wam);
:}
| workload_policy_action_list:list COMMA ident:firstArgs
{:
WorkloadActionMeta wam = new WorkloadActionMeta(firstArgs, "");
list.add(wam);
RESULT = list;
:}
| workload_policy_action_list:list COMMA ident:firstArgs STRING_LITERAL:secondArgs
{:
WorkloadActionMeta wam = new WorkloadActionMeta(firstArgs, secondArgs);
list.add(wam);
RESULT = list;
:}
;


actions ::=
KW_ACTIONS LPAREN workload_policy_action_list:list RPAREN
{:
RESULT = list;
:}
;

opt_actions ::=
{:
RESULT = null;
:}
| actions:actions
{:
RESULT = actions;
:}
;

opt_ext_properties ::=
{:
RESULT = null;
Expand Down Expand Up @@ -4003,6 +4155,10 @@ show_param ::=
{:
RESULT = new ShowWorkloadGroupsStmt();
:}
| KW_WORKLOAD KW_SCHEDULE KW_POLICY
{:
RESULT = new ShowWorkloadSchedPolicyStmt();
:}
| KW_BACKENDS
{:
RESULT = new ShowBackendsStmt();
Expand Down Expand Up @@ -7640,6 +7796,12 @@ keyword ::=
{: RESULT = id; :}
| KW_PROPERTIES:id
{: RESULT = id; :}
| KW_CONDITIONS:id
{: RESULT = id; :}
| KW_ACTIONS:id
{: RESULT = id; :}
| KW_SET_SESSION_VAR:id
{: RESULT = id; :}
| KW_PROPERTY:id
{: RESULT = id; :}
| KW_QUERY:id
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.analysis;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;

import java.util.Map;

public class AlterWorkloadSchedPolicyStmt extends DdlStmt {

private final String policyName;
private final Map<String, String> properties;

public AlterWorkloadSchedPolicyStmt(String policyName, Map<String, String> properties) {
this.policyName = policyName;
this.properties = properties;
}

public String getPolicyName() {
return policyName;
}

public Map<String, String> getProperties() {
return properties;
}

public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);

// check auth
if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
}

if (properties == null || properties.isEmpty()) {
throw new AnalysisException("properties can't be null when alter workload schedule policy");
}
}

@Override
public String toSql() {
StringBuilder sb = new StringBuilder();
sb.append("ALTER WORKLOAD SCHEDULE POLICY ");
sb.append(policyName);
sb.append("PROPERTIES(").append(new PrintableMap<>(properties, " = ", true, false)).append(")");
return sb.toString();
}

}
Loading