Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/runtime/runtime_query_statistics_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ void RuntimeQueryStatiticsMgr::get_metric_map(
metric_map.emplace(WorkloadMetricType::QUERY_TIME, std::to_string(query_time_ms));
metric_map.emplace(WorkloadMetricType::SCAN_ROWS, std::to_string(ret_qs.get_scan_rows()));
metric_map.emplace(WorkloadMetricType::SCAN_BYTES, std::to_string(ret_qs.get_scan_bytes()));
metric_map.emplace(WorkloadMetricType::QUERY_MEMORY_BYTES,
std::to_string(ret_qs.get_current_used_memory_bytes()));
}

void RuntimeQueryStatiticsMgr::set_workload_group_id(std::string query_id, int64_t wg_id) {
Expand Down
10 changes: 7 additions & 3 deletions be/src/runtime/workload_management/workload_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@
namespace doris {

void WorkloadActionCancelQuery::exec(WorkloadQueryInfo* query_info) {
LOG(INFO) << "[workload_schedule]workload scheduler cancel query " << query_info->query_id;
std::stringstream msg;
msg << "query " << query_info->query_id
<< " cancelled by workload policy: " << query_info->policy_name
<< ", id:" << query_info->policy_id;
std::string msg_str = msg.str();
LOG(INFO) << "[workload_schedule]" << msg_str;
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
query_info->tquery_id, PPlanFragmentCancelReason::INTERNAL_ERROR,
std::string("query canceled by workload scheduler"));
query_info->tquery_id, PPlanFragmentCancelReason::INTERNAL_ERROR, msg_str);
}

void WorkloadActionMoveQuery::exec(WorkloadQueryInfo* query_info) {
Expand Down
13 changes: 13 additions & 0 deletions be/src/runtime/workload_management/workload_condition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,17 @@ bool WorkloadConditionScanBytes::eval(std::string str_val) {
return WorkloadCompareUtils::compare_signed_integer(_op, scan_bytes_args, _scan_bytes);
}

// query memory
WorkloadConditionQueryMemory::WorkloadConditionQueryMemory(WorkloadCompareOperator op,
std::string str_val) {
_op = op;
_query_memory_bytes = std::stol(str_val);
}

bool WorkloadConditionQueryMemory::eval(std::string str_val) {
int64_t query_memory_bytes = std::stol(str_val);
return WorkloadCompareUtils::compare_signed_integer(_op, query_memory_bytes,
_query_memory_bytes);
}

} // namespace doris
17 changes: 16 additions & 1 deletion be/src/runtime/workload_management/workload_condition.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

namespace doris {

enum WorkloadMetricType { QUERY_TIME, SCAN_ROWS, SCAN_BYTES };
enum WorkloadMetricType { QUERY_TIME, SCAN_ROWS, SCAN_BYTES, QUERY_MEMORY_BYTES };

class WorkloadCondition {
public:
Expand Down Expand Up @@ -74,6 +74,19 @@ class WorkloadConditionScanBytes : public WorkloadCondition {
WorkloadCompareOperator _op;
};

class WorkloadConditionQueryMemory : public WorkloadCondition {
public:
WorkloadConditionQueryMemory(WorkloadCompareOperator op, std::string str_val);
bool eval(std::string str_val) override;
WorkloadMetricType get_workload_metric_type() override {
return WorkloadMetricType::QUERY_MEMORY_BYTES;
}

private:
int64_t _query_memory_bytes;
WorkloadCompareOperator _op;
};

class WorkloadConditionFactory {
public:
static std::unique_ptr<WorkloadCondition> create_workload_condition(
Expand All @@ -88,6 +101,8 @@ class WorkloadConditionFactory {
return std::make_unique<WorkloadConditionScanRows>(op, str_val);
} else if (TWorkloadMetricType::type::BE_SCAN_BYTES == metric_name) {
return std::make_unique<WorkloadConditionScanBytes>(op, str_val);
} else if (TWorkloadMetricType::type::QUERY_BE_MEMORY_BYTES == metric_name) {
return std::make_unique<WorkloadConditionQueryMemory>(op, str_val);
}
LOG(ERROR) << "not find a metric name " << metric_name;
return nullptr;
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/workload_management/workload_query_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ class WorkloadQueryInfo {
TUniqueId tquery_id;
std::string query_id;
int64_t wg_id;
int64_t policy_id;
std::string policy_name;
};

} // namespace doris
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ bool WorkloadSchedPolicy::is_match(WorkloadQueryInfo* query_info_ptr) {

void WorkloadSchedPolicy::exec_action(WorkloadQueryInfo* query_info) {
for (int i = 0; i < _action_list.size(); i++) {
query_info->policy_id = this->_id;
query_info->policy_name = this->_name;
_action_list[i]->exec(query_info);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,12 @@ public WorkloadActionMeta(String workloadAction, String actionArgs) throws UserE
}

static WorkloadActionType getWorkloadActionType(String strType) throws UserException {
if (WorkloadActionType.CANCEL_QUERY.toString().equalsIgnoreCase(strType)) {
return WorkloadActionType.CANCEL_QUERY;
} else if (WorkloadActionType.MOVE_QUERY_TO_GROUP.toString().equalsIgnoreCase(strType)) {
return WorkloadActionType.MOVE_QUERY_TO_GROUP;
} else if (WorkloadActionType.SET_SESSION_VARIABLE.toString().equalsIgnoreCase(strType)) {
return WorkloadActionType.SET_SESSION_VARIABLE;
WorkloadActionType workloadActionType = WorkloadSchedPolicyMgr.STRING_ACTION_MAP.get(strType.toUpperCase());
if (workloadActionType == null) {
throw new UserException("invalid action type " + strType);
} else {
return workloadActionType;
}
throw new UserException("invalid action type " + strType);
}

public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ static WorkloadCondition createWorkloadCondition(WorkloadConditionMeta cm)
return WorkloadConditionBeScanRows.createWorkloadCondition(cm.op, cm.value);
} else if (WorkloadMetricType.BE_SCAN_BYTES.equals(cm.metricName)) {
return WorkloadConditionBeScanBytes.createWorkloadCondition(cm.op, cm.value);
} else if (WorkloadMetricType.QUERY_BE_MEMORY_BYTES.equals(cm.metricName)) {
return WorkloadConditionQueryBeMemory.createWorkloadCondition(cm.op, cm.value);
}
throw new UserException("invalid metric name:" + cm.metricName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,14 @@ public boolean eval(String strValue) {

public static WorkloadConditionBeScanBytes createWorkloadCondition(WorkloadConditionOperator op, String value)
throws UserException {
long longValue = Long.parseLong(value);
if (longValue < 0) {
throw new UserException("invalid scan bytes value, " + longValue + ", it requires >= 0");
long longValue = -1;
try {
longValue = Long.parseLong(value);
if (longValue < 0) {
throw new NumberFormatException();
}
} catch (NumberFormatException e) {
throw new UserException("invalid scan bytes value: " + value + ", it requires >= 0");
}
return new WorkloadConditionBeScanBytes(op, longValue);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,14 @@ public boolean eval(String strValue) {

public static WorkloadConditionBeScanRows createWorkloadCondition(WorkloadConditionOperator op, String value)
throws UserException {
long longValue = Long.parseLong(value);
if (longValue < 0) {
throw new UserException("invalid scan rows value, " + longValue + ", it requires >= 0");
long longValue = -1;
try {
longValue = Long.parseLong(value);
if (longValue < 0) {
throw new NumberFormatException();
}
} catch (NumberFormatException e) {
throw new UserException("invalid scan rows value: " + value + ", it requires >= 0");
}
return new WorkloadConditionBeScanRows(op, longValue);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,12 @@ public WorkloadConditionMeta(String metricName, String op, String value) throws
}

private static WorkloadMetricType getMetricType(String metricStr) throws UserException {
if (WorkloadMetricType.USERNAME.toString().equalsIgnoreCase(metricStr)) {
return WorkloadMetricType.USERNAME;
} else if (WorkloadMetricType.QUERY_TIME.toString().equalsIgnoreCase(metricStr)) {
return WorkloadMetricType.QUERY_TIME;
} else if (WorkloadMetricType.BE_SCAN_ROWS.toString().equalsIgnoreCase(metricStr)) {
return WorkloadMetricType.BE_SCAN_ROWS;
} else if (WorkloadMetricType.BE_SCAN_BYTES.toString().equalsIgnoreCase(metricStr)) {
return WorkloadMetricType.BE_SCAN_BYTES;
WorkloadMetricType metricType = WorkloadSchedPolicyMgr.STRING_METRIC_MAP.get(metricStr.toUpperCase());
if (metricType == null) {
throw new UserException("invalid metric name:" + metricStr);
} else {
return metricType;
}
throw new UserException("invalid metric name:" + metricStr);
}

public String toString() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.resource.workloadschedpolicy;

import org.apache.doris.common.UserException;

public class WorkloadConditionQueryBeMemory implements WorkloadCondition {

private long value;

private WorkloadConditionOperator op;

public WorkloadConditionQueryBeMemory(WorkloadConditionOperator op, long value) {
this.value = value;
this.op = op;
}

@Override
public boolean eval(String strValue) {
return false;
}

@Override
public WorkloadMetricType getMetricType() {
return WorkloadMetricType.QUERY_BE_MEMORY_BYTES;
}

public static WorkloadConditionQueryBeMemory createWorkloadCondition(WorkloadConditionOperator op,
String value) throws UserException {
long longValue = -1;
try {
longValue = Long.parseLong(value);
if (longValue < 0) {
throw new NumberFormatException();
}
} catch (NumberFormatException e) {
throw new UserException("invalid query be memory value: " + value + ", it requires >= 0");
}
return new WorkloadConditionQueryBeMemory(op, longValue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,14 @@ public boolean eval(String strValue) {

public static WorkloadConditionQueryTime createWorkloadCondition(WorkloadConditionOperator op, String value)
throws UserException {
long longValue = Long.parseLong(value);
if (longValue < 0) {
throw new UserException("invalid query time value, " + longValue + ", it requires >= 0");
long longValue = -1;
try {
longValue = Long.parseLong(value);
if (longValue < 0) {
throw new NumberFormatException();
}
} catch (NumberFormatException e) {
throw new UserException("invalid query time value: " + value + ", it requires >= 0");
}
return new WorkloadConditionQueryTime(op, longValue);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@
package org.apache.doris.resource.workloadschedpolicy;

public enum WorkloadMetricType {
USERNAME, QUERY_TIME, BE_SCAN_ROWS, BE_SCAN_BYTES
USERNAME, QUERY_TIME, BE_SCAN_ROWS, BE_SCAN_BYTES, QUERY_BE_MEMORY_BYTES
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.thrift.TCompareOperator;
import org.apache.doris.thrift.TWorkloadAction;
import org.apache.doris.thrift.TWorkloadActionType;
import org.apache.doris.thrift.TWorkloadCondition;
Expand All @@ -31,7 +30,6 @@
import org.apache.doris.thrift.TopicInfo;

import com.esotericsoftware.minlog.Log;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.gson.annotations.SerializedName;

Expand All @@ -51,25 +49,6 @@ public class WorkloadSchedPolicy implements Writable, GsonPostProcessable {
public static final ImmutableSet<String> POLICY_PROPERTIES = new ImmutableSet.Builder<String>()
.add(ENABLED).add(PRIORITY).add(WORKLOAD_GROUP).build();

// used for convert fe type to thrift type
private static ImmutableMap<WorkloadMetricType, TWorkloadMetricType> METRIC_MAP
= new ImmutableMap.Builder<WorkloadMetricType, TWorkloadMetricType>()
.put(WorkloadMetricType.QUERY_TIME, TWorkloadMetricType.QUERY_TIME)
.put(WorkloadMetricType.BE_SCAN_ROWS, TWorkloadMetricType.BE_SCAN_ROWS)
.put(WorkloadMetricType.BE_SCAN_BYTES, TWorkloadMetricType.BE_SCAN_BYTES).build();
private static ImmutableMap<WorkloadActionType, TWorkloadActionType> ACTION_MAP
= new ImmutableMap.Builder<WorkloadActionType, TWorkloadActionType>()
.put(WorkloadActionType.MOVE_QUERY_TO_GROUP, TWorkloadActionType.MOVE_QUERY_TO_GROUP)
.put(WorkloadActionType.CANCEL_QUERY, TWorkloadActionType.CANCEL_QUERY).build();

private static ImmutableMap<WorkloadConditionOperator, TCompareOperator> OP_MAP
= new ImmutableMap.Builder<WorkloadConditionOperator, TCompareOperator>()
.put(WorkloadConditionOperator.EQUAL, TCompareOperator.EQUAL)
.put(WorkloadConditionOperator.GREATER, TCompareOperator.GREATER)
.put(WorkloadConditionOperator.GREATER_EQUAL, TCompareOperator.GREATER_EQUAL)
.put(WorkloadConditionOperator.LESS, TCompareOperator.LESS)
.put(WorkloadConditionOperator.LESS_EQUAl, TCompareOperator.LESS_EQUAL).build();

@SerializedName(value = "id")
long id;
@SerializedName(value = "name")
Expand Down Expand Up @@ -255,20 +234,20 @@ public TopicInfo toTopicInfo() {
List<TWorkloadCondition> condList = new ArrayList();
for (WorkloadConditionMeta cond : conditionMetaList) {
TWorkloadCondition tCond = new TWorkloadCondition();
TWorkloadMetricType metricType = METRIC_MAP.get(cond.metricName);
TWorkloadMetricType metricType = WorkloadSchedPolicyMgr.METRIC_MAP.get(cond.metricName);
if (metricType == null) {
return null;
}
tCond.setMetricName(metricType);
tCond.setOp(OP_MAP.get(cond.op));
tCond.setOp(WorkloadSchedPolicyMgr.OP_MAP.get(cond.op));
tCond.setValue(cond.value);
condList.add(tCond);
}

List<TWorkloadAction> actionList = new ArrayList();
for (WorkloadActionMeta action : actionMetaList) {
TWorkloadAction tAction = new TWorkloadAction();
TWorkloadActionType tActionType = ACTION_MAP.get(action.action);
TWorkloadActionType tActionType = WorkloadSchedPolicyMgr.ACTION_MAP.get(action.action);
if (tActionType == null) {
return null;
}
Expand Down
Loading