From b84a5ed474fc55b5d1d3dc7d34fbbb1acc13ecd3 Mon Sep 17 00:00:00 2001 From: wangbo <506340561@qq.com> Date: Thu, 30 May 2024 11:23:31 +0800 Subject: [PATCH] Add workload metric query_be_memory --- .../runtime/runtime_query_statistics_mgr.cpp | 2 + .../workload_management/workload_action.cpp | 10 +++- .../workload_condition.cpp | 13 +++++ .../workload_management/workload_condition.h | 17 +++++- .../workload_management/workload_query_info.h | 2 + .../workload_sched_policy.cpp | 2 + .../WorkloadActionMeta.java | 12 ++-- .../WorkloadCondition.java | 2 + .../WorkloadConditionBeScanBytes.java | 11 +++- .../WorkloadConditionBeScanRows.java | 11 +++- .../WorkloadConditionMeta.java | 14 ++--- .../WorkloadConditionQueryBeMemory.java | 56 +++++++++++++++++++ .../WorkloadConditionQueryTime.java | 11 +++- .../WorkloadMetricType.java | 2 +- .../WorkloadSchedPolicy.java | 27 +-------- .../WorkloadSchedPolicyMgr.java | 46 ++++++++++++++- gensrc/thrift/BackendService.thrift | 1 + .../test_workload_sched_policy.groovy | 30 ++++++++++ 18 files changed, 214 insertions(+), 55 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionQueryBeMemory.java diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp b/be/src/runtime/runtime_query_statistics_mgr.cpp index 9764b0f0507510..955d1b9a7e8223 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.cpp +++ b/be/src/runtime/runtime_query_statistics_mgr.cpp @@ -190,6 +190,8 @@ void RuntimeQueryStatiticsMgr::get_metric_map( metric_map.emplace(WorkloadMetricType::QUERY_TIME, std::to_string(query_time_ms)); metric_map.emplace(WorkloadMetricType::SCAN_ROWS, std::to_string(ret_qs.get_scan_rows())); metric_map.emplace(WorkloadMetricType::SCAN_BYTES, std::to_string(ret_qs.get_scan_bytes())); + metric_map.emplace(WorkloadMetricType::QUERY_MEMORY_BYTES, + std::to_string(ret_qs.get_current_used_memory_bytes())); } void RuntimeQueryStatiticsMgr::set_workload_group_id(std::string query_id, int64_t wg_id) { diff --git a/be/src/runtime/workload_management/workload_action.cpp b/be/src/runtime/workload_management/workload_action.cpp index 39916bc7cc19db..b36895594dcaad 100644 --- a/be/src/runtime/workload_management/workload_action.cpp +++ b/be/src/runtime/workload_management/workload_action.cpp @@ -22,10 +22,14 @@ namespace doris { void WorkloadActionCancelQuery::exec(WorkloadQueryInfo* query_info) { - LOG(INFO) << "[workload_schedule]workload scheduler cancel query " << query_info->query_id; + std::stringstream msg; + msg << "query " << query_info->query_id + << " cancelled by workload policy: " << query_info->policy_name + << ", id:" << query_info->policy_id; + std::string msg_str = msg.str(); + LOG(INFO) << "[workload_schedule]" << msg_str; ExecEnv::GetInstance()->fragment_mgr()->cancel_query( - query_info->tquery_id, PPlanFragmentCancelReason::INTERNAL_ERROR, - std::string("query canceled by workload scheduler")); + query_info->tquery_id, PPlanFragmentCancelReason::INTERNAL_ERROR, msg_str); } void WorkloadActionMoveQuery::exec(WorkloadQueryInfo* query_info) { diff --git a/be/src/runtime/workload_management/workload_condition.cpp b/be/src/runtime/workload_management/workload_condition.cpp index dff6f2adc24298..62c6072a60c183 100644 --- a/be/src/runtime/workload_management/workload_condition.cpp +++ b/be/src/runtime/workload_management/workload_condition.cpp @@ -56,4 +56,17 @@ bool WorkloadConditionScanBytes::eval(std::string str_val) { return WorkloadCompareUtils::compare_signed_integer(_op, scan_bytes_args, _scan_bytes); } +// query memory +WorkloadConditionQueryMemory::WorkloadConditionQueryMemory(WorkloadCompareOperator op, + std::string str_val) { + _op = op; + _query_memory_bytes = std::stol(str_val); +} + +bool WorkloadConditionQueryMemory::eval(std::string str_val) { + int64_t query_memory_bytes = std::stol(str_val); + return WorkloadCompareUtils::compare_signed_integer(_op, query_memory_bytes, + _query_memory_bytes); +} + } // namespace doris \ No newline at end of file diff --git a/be/src/runtime/workload_management/workload_condition.h b/be/src/runtime/workload_management/workload_condition.h index 96387a2af410b2..a85268a8dc3a6a 100644 --- a/be/src/runtime/workload_management/workload_condition.h +++ b/be/src/runtime/workload_management/workload_condition.h @@ -23,7 +23,7 @@ namespace doris { -enum WorkloadMetricType { QUERY_TIME, SCAN_ROWS, SCAN_BYTES }; +enum WorkloadMetricType { QUERY_TIME, SCAN_ROWS, SCAN_BYTES, QUERY_MEMORY_BYTES }; class WorkloadCondition { public: @@ -74,6 +74,19 @@ class WorkloadConditionScanBytes : public WorkloadCondition { WorkloadCompareOperator _op; }; +class WorkloadConditionQueryMemory : public WorkloadCondition { +public: + WorkloadConditionQueryMemory(WorkloadCompareOperator op, std::string str_val); + bool eval(std::string str_val) override; + WorkloadMetricType get_workload_metric_type() override { + return WorkloadMetricType::QUERY_MEMORY_BYTES; + } + +private: + int64_t _query_memory_bytes; + WorkloadCompareOperator _op; +}; + class WorkloadConditionFactory { public: static std::unique_ptr create_workload_condition( @@ -88,6 +101,8 @@ class WorkloadConditionFactory { return std::make_unique(op, str_val); } else if (TWorkloadMetricType::type::BE_SCAN_BYTES == metric_name) { return std::make_unique(op, str_val); + } else if (TWorkloadMetricType::type::QUERY_BE_MEMORY_BYTES == metric_name) { + return std::make_unique(op, str_val); } LOG(ERROR) << "not find a metric name " << metric_name; return nullptr; diff --git a/be/src/runtime/workload_management/workload_query_info.h b/be/src/runtime/workload_management/workload_query_info.h index f2da31b6196e33..e544668e1039ed 100644 --- a/be/src/runtime/workload_management/workload_query_info.h +++ b/be/src/runtime/workload_management/workload_query_info.h @@ -29,6 +29,8 @@ class WorkloadQueryInfo { TUniqueId tquery_id; std::string query_id; int64_t wg_id; + int64_t policy_id; + std::string policy_name; }; } // namespace doris \ No newline at end of file diff --git a/be/src/runtime/workload_management/workload_sched_policy.cpp b/be/src/runtime/workload_management/workload_sched_policy.cpp index b97eb85c06827c..efa8965dd77121 100644 --- a/be/src/runtime/workload_management/workload_sched_policy.cpp +++ b/be/src/runtime/workload_management/workload_sched_policy.cpp @@ -75,6 +75,8 @@ bool WorkloadSchedPolicy::is_match(WorkloadQueryInfo* query_info_ptr) { void WorkloadSchedPolicy::exec_action(WorkloadQueryInfo* query_info) { for (int i = 0; i < _action_list.size(); i++) { + query_info->policy_id = this->_id; + query_info->policy_name = this->_name; _action_list[i]->exec(query_info); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMeta.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMeta.java index 57f6ba379937e9..2ce05412844d38 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMeta.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMeta.java @@ -37,14 +37,12 @@ public WorkloadActionMeta(String workloadAction, String actionArgs) throws UserE } static WorkloadActionType getWorkloadActionType(String strType) throws UserException { - if (WorkloadActionType.CANCEL_QUERY.toString().equalsIgnoreCase(strType)) { - return WorkloadActionType.CANCEL_QUERY; - } else if (WorkloadActionType.MOVE_QUERY_TO_GROUP.toString().equalsIgnoreCase(strType)) { - return WorkloadActionType.MOVE_QUERY_TO_GROUP; - } else if (WorkloadActionType.SET_SESSION_VARIABLE.toString().equalsIgnoreCase(strType)) { - return WorkloadActionType.SET_SESSION_VARIABLE; + WorkloadActionType workloadActionType = WorkloadSchedPolicyMgr.STRING_ACTION_MAP.get(strType.toUpperCase()); + if (workloadActionType == null) { + throw new UserException("invalid action type " + strType); + } else { + return workloadActionType; } - throw new UserException("invalid action type " + strType); } public String toString() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java index 5d89d2afae9190..c790a4013080d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java @@ -37,6 +37,8 @@ static WorkloadCondition createWorkloadCondition(WorkloadConditionMeta cm) return WorkloadConditionBeScanRows.createWorkloadCondition(cm.op, cm.value); } else if (WorkloadMetricType.BE_SCAN_BYTES.equals(cm.metricName)) { return WorkloadConditionBeScanBytes.createWorkloadCondition(cm.op, cm.value); + } else if (WorkloadMetricType.QUERY_BE_MEMORY_BYTES.equals(cm.metricName)) { + return WorkloadConditionQueryBeMemory.createWorkloadCondition(cm.op, cm.value); } throw new UserException("invalid metric name:" + cm.metricName); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanBytes.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanBytes.java index 7431f2e0c4f1d3..bd914baf54e599 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanBytes.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanBytes.java @@ -38,9 +38,14 @@ public boolean eval(String strValue) { public static WorkloadConditionBeScanBytes createWorkloadCondition(WorkloadConditionOperator op, String value) throws UserException { - long longValue = Long.parseLong(value); - if (longValue < 0) { - throw new UserException("invalid scan bytes value, " + longValue + ", it requires >= 0"); + long longValue = -1; + try { + longValue = Long.parseLong(value); + if (longValue < 0) { + throw new NumberFormatException(); + } + } catch (NumberFormatException e) { + throw new UserException("invalid scan bytes value: " + value + ", it requires >= 0"); } return new WorkloadConditionBeScanBytes(op, longValue); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanRows.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanRows.java index c2fb638e0820b2..8b99e40d04dfdf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanRows.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanRows.java @@ -38,9 +38,14 @@ public boolean eval(String strValue) { public static WorkloadConditionBeScanRows createWorkloadCondition(WorkloadConditionOperator op, String value) throws UserException { - long longValue = Long.parseLong(value); - if (longValue < 0) { - throw new UserException("invalid scan rows value, " + longValue + ", it requires >= 0"); + long longValue = -1; + try { + longValue = Long.parseLong(value); + if (longValue < 0) { + throw new NumberFormatException(); + } + } catch (NumberFormatException e) { + throw new UserException("invalid scan rows value: " + value + ", it requires >= 0"); } return new WorkloadConditionBeScanRows(op, longValue); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java index 52f50f924fc2ac..81e0f6c2188005 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java @@ -40,16 +40,12 @@ public WorkloadConditionMeta(String metricName, String op, String value) throws } private static WorkloadMetricType getMetricType(String metricStr) throws UserException { - if (WorkloadMetricType.USERNAME.toString().equalsIgnoreCase(metricStr)) { - return WorkloadMetricType.USERNAME; - } else if (WorkloadMetricType.QUERY_TIME.toString().equalsIgnoreCase(metricStr)) { - return WorkloadMetricType.QUERY_TIME; - } else if (WorkloadMetricType.BE_SCAN_ROWS.toString().equalsIgnoreCase(metricStr)) { - return WorkloadMetricType.BE_SCAN_ROWS; - } else if (WorkloadMetricType.BE_SCAN_BYTES.toString().equalsIgnoreCase(metricStr)) { - return WorkloadMetricType.BE_SCAN_BYTES; + WorkloadMetricType metricType = WorkloadSchedPolicyMgr.STRING_METRIC_MAP.get(metricStr.toUpperCase()); + if (metricType == null) { + throw new UserException("invalid metric name:" + metricStr); + } else { + return metricType; } - throw new UserException("invalid metric name:" + metricStr); } public String toString() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionQueryBeMemory.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionQueryBeMemory.java new file mode 100644 index 00000000000000..2274b35ca51e46 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionQueryBeMemory.java @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.resource.workloadschedpolicy; + +import org.apache.doris.common.UserException; + +public class WorkloadConditionQueryBeMemory implements WorkloadCondition { + + private long value; + + private WorkloadConditionOperator op; + + public WorkloadConditionQueryBeMemory(WorkloadConditionOperator op, long value) { + this.value = value; + this.op = op; + } + + @Override + public boolean eval(String strValue) { + return false; + } + + @Override + public WorkloadMetricType getMetricType() { + return WorkloadMetricType.QUERY_BE_MEMORY_BYTES; + } + + public static WorkloadConditionQueryBeMemory createWorkloadCondition(WorkloadConditionOperator op, + String value) throws UserException { + long longValue = -1; + try { + longValue = Long.parseLong(value); + if (longValue < 0) { + throw new NumberFormatException(); + } + } catch (NumberFormatException e) { + throw new UserException("invalid query be memory value: " + value + ", it requires >= 0"); + } + return new WorkloadConditionQueryBeMemory(op, longValue); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionQueryTime.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionQueryTime.java index e61484508df166..6c3a5c653aaf9b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionQueryTime.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionQueryTime.java @@ -37,9 +37,14 @@ public boolean eval(String strValue) { public static WorkloadConditionQueryTime createWorkloadCondition(WorkloadConditionOperator op, String value) throws UserException { - long longValue = Long.parseLong(value); - if (longValue < 0) { - throw new UserException("invalid query time value, " + longValue + ", it requires >= 0"); + long longValue = -1; + try { + longValue = Long.parseLong(value); + if (longValue < 0) { + throw new NumberFormatException(); + } + } catch (NumberFormatException e) { + throw new UserException("invalid query time value: " + value + ", it requires >= 0"); } return new WorkloadConditionQueryTime(op, longValue); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java index ed17414ec45b3f..93e612a85c2ddd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java @@ -18,5 +18,5 @@ package org.apache.doris.resource.workloadschedpolicy; public enum WorkloadMetricType { - USERNAME, QUERY_TIME, BE_SCAN_ROWS, BE_SCAN_BYTES + USERNAME, QUERY_TIME, BE_SCAN_ROWS, BE_SCAN_BYTES, QUERY_BE_MEMORY_BYTES } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java index 55759e90972279..ff27a08706be8d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java @@ -22,7 +22,6 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; -import org.apache.doris.thrift.TCompareOperator; import org.apache.doris.thrift.TWorkloadAction; import org.apache.doris.thrift.TWorkloadActionType; import org.apache.doris.thrift.TWorkloadCondition; @@ -31,7 +30,6 @@ import org.apache.doris.thrift.TopicInfo; import com.esotericsoftware.minlog.Log; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.gson.annotations.SerializedName; @@ -51,25 +49,6 @@ public class WorkloadSchedPolicy implements Writable, GsonPostProcessable { public static final ImmutableSet POLICY_PROPERTIES = new ImmutableSet.Builder() .add(ENABLED).add(PRIORITY).add(WORKLOAD_GROUP).build(); - // used for convert fe type to thrift type - private static ImmutableMap METRIC_MAP - = new ImmutableMap.Builder() - .put(WorkloadMetricType.QUERY_TIME, TWorkloadMetricType.QUERY_TIME) - .put(WorkloadMetricType.BE_SCAN_ROWS, TWorkloadMetricType.BE_SCAN_ROWS) - .put(WorkloadMetricType.BE_SCAN_BYTES, TWorkloadMetricType.BE_SCAN_BYTES).build(); - private static ImmutableMap ACTION_MAP - = new ImmutableMap.Builder() - .put(WorkloadActionType.MOVE_QUERY_TO_GROUP, TWorkloadActionType.MOVE_QUERY_TO_GROUP) - .put(WorkloadActionType.CANCEL_QUERY, TWorkloadActionType.CANCEL_QUERY).build(); - - private static ImmutableMap OP_MAP - = new ImmutableMap.Builder() - .put(WorkloadConditionOperator.EQUAL, TCompareOperator.EQUAL) - .put(WorkloadConditionOperator.GREATER, TCompareOperator.GREATER) - .put(WorkloadConditionOperator.GREATER_EQUAL, TCompareOperator.GREATER_EQUAL) - .put(WorkloadConditionOperator.LESS, TCompareOperator.LESS) - .put(WorkloadConditionOperator.LESS_EQUAl, TCompareOperator.LESS_EQUAL).build(); - @SerializedName(value = "id") long id; @SerializedName(value = "name") @@ -255,12 +234,12 @@ public TopicInfo toTopicInfo() { List condList = new ArrayList(); for (WorkloadConditionMeta cond : conditionMetaList) { TWorkloadCondition tCond = new TWorkloadCondition(); - TWorkloadMetricType metricType = METRIC_MAP.get(cond.metricName); + TWorkloadMetricType metricType = WorkloadSchedPolicyMgr.METRIC_MAP.get(cond.metricName); if (metricType == null) { return null; } tCond.setMetricName(metricType); - tCond.setOp(OP_MAP.get(cond.op)); + tCond.setOp(WorkloadSchedPolicyMgr.OP_MAP.get(cond.op)); tCond.setValue(cond.value); condList.add(tCond); } @@ -268,7 +247,7 @@ public TopicInfo toTopicInfo() { List actionList = new ArrayList(); for (WorkloadActionMeta action : actionMetaList) { TWorkloadAction tAction = new TWorkloadAction(); - TWorkloadActionType tActionType = ACTION_MAP.get(action.action); + TWorkloadActionType tActionType = WorkloadSchedPolicyMgr.ACTION_MAP.get(action.action); if (tActionType == null) { return null; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java index 4aa7563f8d7848..3879dd83b9adfb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java @@ -35,11 +35,15 @@ import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ConnectContext; import org.apache.doris.service.ExecuteEnv; +import org.apache.doris.thrift.TCompareOperator; import org.apache.doris.thrift.TUserIdentity; +import org.apache.doris.thrift.TWorkloadActionType; +import org.apache.doris.thrift.TWorkloadMetricType; import org.apache.doris.thrift.TopicInfo; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; @@ -80,6 +84,14 @@ public WorkloadSchedPolicyMgr() { .add("WorkloadGroup") .build(); + public static final ImmutableMap OP_MAP + = new ImmutableMap.Builder() + .put(WorkloadConditionOperator.EQUAL, TCompareOperator.EQUAL) + .put(WorkloadConditionOperator.GREATER, TCompareOperator.GREATER) + .put(WorkloadConditionOperator.GREATER_EQUAL, TCompareOperator.GREATER_EQUAL) + .put(WorkloadConditionOperator.LESS, TCompareOperator.LESS) + .put(WorkloadConditionOperator.LESS_EQUAl, TCompareOperator.LESS_EQUAL).build(); + public static final ImmutableSet FE_ACTION_SET = new ImmutableSet.Builder().add(WorkloadActionType.SET_SESSION_VARIABLE).build(); @@ -93,7 +105,39 @@ public WorkloadSchedPolicyMgr() { public static final ImmutableSet BE_METRIC_SET = new ImmutableSet.Builder().add(WorkloadMetricType.BE_SCAN_ROWS) - .add(WorkloadMetricType.BE_SCAN_BYTES).add(WorkloadMetricType.QUERY_TIME).build(); + .add(WorkloadMetricType.BE_SCAN_BYTES).add(WorkloadMetricType.QUERY_TIME) + .add(WorkloadMetricType.QUERY_BE_MEMORY_BYTES).build(); + + // used for convert fe type to thrift type + public static final ImmutableMap METRIC_MAP + = new ImmutableMap.Builder() + .put(WorkloadMetricType.QUERY_TIME, TWorkloadMetricType.QUERY_TIME) + .put(WorkloadMetricType.BE_SCAN_ROWS, TWorkloadMetricType.BE_SCAN_ROWS) + .put(WorkloadMetricType.BE_SCAN_BYTES, TWorkloadMetricType.BE_SCAN_BYTES) + .put(WorkloadMetricType.QUERY_BE_MEMORY_BYTES, TWorkloadMetricType.QUERY_BE_MEMORY_BYTES).build(); + public static final ImmutableMap ACTION_MAP + = new ImmutableMap.Builder() + .put(WorkloadActionType.MOVE_QUERY_TO_GROUP, TWorkloadActionType.MOVE_QUERY_TO_GROUP) + .put(WorkloadActionType.CANCEL_QUERY, TWorkloadActionType.CANCEL_QUERY).build(); + + public static final Map STRING_METRIC_MAP = new HashMap<>(); + public static final Map STRING_ACTION_MAP = new HashMap<>(); + + static { + for (WorkloadMetricType metricType : FE_METRIC_SET) { + STRING_METRIC_MAP.put(metricType.toString(), metricType); + } + for (WorkloadMetricType metricType : BE_METRIC_SET) { + STRING_METRIC_MAP.put(metricType.toString(), metricType); + } + + for (WorkloadActionType actionType : FE_ACTION_SET) { + STRING_ACTION_MAP.put(actionType.toString(), actionType); + } + for (WorkloadActionType actionType : BE_ACTION_SET) { + STRING_ACTION_MAP.put(actionType.toString(), actionType); + } + } private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index d93520206c59e4..0a2edb8ccbf12d 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -187,6 +187,7 @@ enum TWorkloadMetricType { QUERY_TIME BE_SCAN_ROWS BE_SCAN_BYTES + QUERY_BE_MEMORY_BYTES } enum TCompareOperator { diff --git a/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy b/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy index d3f9b35426ad49..2536b06ce7a5e5 100644 --- a/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy +++ b/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy @@ -23,6 +23,9 @@ suite("test_workload_sched_policy") { sql "drop workload policy if exists set_action_policy;" sql "drop workload policy if exists fe_policy;" sql "drop workload policy if exists be_policy;" + sql "drop workload policy if exists be_scan_row_policy;" + sql "drop workload policy if exists be_scan_bytes_policy;" + sql "drop workload policy if exists query_be_memory_used;" // 1 create cancel policy sql "create workload policy test_cancel_policy " + @@ -106,11 +109,38 @@ suite("test_workload_sched_policy") { exception "duplicate set_session_variable action args one policy" } + test { + sql "create workload policy invalid_metric_value_policy conditions(query_be_memory_bytes > '-1') actions(cancel_query);" + exception "invalid" + } + + test { + sql "create workload policy invalid_metric_value_policy conditions(query_time > '-1') actions(cancel_query);" + exception "invalid" + } + + test { + sql "create workload policy invalid_metric_value_policy conditions(be_scan_rows > '-1') actions(cancel_query);" + exception "invalid" + } + + test { + sql "create workload policy invalid_metric_value_policy conditions(be_scan_bytes > '-1') actions(cancel_query);" + exception "invalid" + } + + sql "create workload policy be_scan_row_policy conditions(be_scan_rows > 1) actions(cancel_query) properties('enabled'='false');" + sql "create workload policy be_scan_bytes_policy conditions(be_scan_bytes > 1) actions(cancel_query) properties('enabled'='false');" + sql "create workload policy query_be_memory_used conditions(query_be_memory_bytes > 1) actions(cancel_query) properties('enabled'='false');" + // drop sql "drop workload policy test_cancel_policy;" sql "drop workload policy set_action_policy;" sql "drop workload policy fe_policy;" sql "drop workload policy be_policy;" + sql "drop workload policy be_scan_row_policy;" + sql "drop workload policy be_scan_bytes_policy;" + sql "drop workload policy query_be_memory_used;" qt_select_policy_tvf_after_drop "select name,condition,action,priority,enabled,version from information_schema.workload_policy where name in('be_policy','fe_policy','set_action_policy','test_cancel_policy') order by name;"