Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion be/src/agent/workload_sched_policy_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,15 @@ void WorkloadschedPolicyListener::handle_topic_info(const std::vector<TopicInfo>
continue;
}

std::set<int64_t> wg_id_set;
for (int64_t wg_id : tpolicy.wg_id_list) {
wg_id_set.insert(wg_id);
}

std::shared_ptr<WorkloadSchedPolicy> policy_ptr = std::make_shared<WorkloadSchedPolicy>();
policy_ptr->init(tpolicy.id, tpolicy.name, tpolicy.version, tpolicy.enabled,
tpolicy.priority, std::move(cond_ptr_list), std::move(action_ptr_list));
tpolicy.priority, wg_id_set, std::move(cond_ptr_list),
std::move(action_ptr_list));
policy_map.emplace(tpolicy.id, std::move(policy_ptr));
}
size_t new_policy_size = policy_map.size();
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1607,6 +1607,8 @@ void FragmentMgr::get_runtime_query_info(std::vector<WorkloadQueryInfo>* query_i
WorkloadQueryInfo workload_query_info;
workload_query_info.query_id = print_id(q.first);
workload_query_info.tquery_id = q.first;
workload_query_info.wg_id =
q.second->workload_group() == nullptr ? -1 : q.second->workload_group()->id();
query_info_list->push_back(workload_query_info);
}
}
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/workload_management/workload_query_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class WorkloadQueryInfo {
std::map<WorkloadMetricType, std::string> metric_map;
TUniqueId tquery_id;
std::string query_id;
int64_t wg_id;
};

} // namespace doris
11 changes: 10 additions & 1 deletion be/src/runtime/workload_management/workload_sched_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
namespace doris {

void WorkloadSchedPolicy::init(int64_t id, std::string name, int version, bool enabled,
int priority,
int priority, std::set<int64_t> wg_id_set,
std::vector<std::unique_ptr<WorkloadCondition>> condition_list,
std::vector<std::unique_ptr<WorkloadAction>> action_list) {
_id = id;
Expand All @@ -30,6 +30,7 @@ void WorkloadSchedPolicy::init(int64_t id, std::string name, int version, bool e
_priority = priority;
_condition_list = std::move(condition_list);
_action_list = std::move(action_list);
_wg_id_set = wg_id_set;

_first_action_type = _action_list[0]->get_action_type();
if (_first_action_type != WorkloadActionType::MOVE_QUERY_TO_GROUP &&
Expand All @@ -50,6 +51,14 @@ bool WorkloadSchedPolicy::is_match(WorkloadQueryInfo* query_info_ptr) {
if (!_enabled) {
return false;
}

// 1 when policy has no group(_wg_id_set.size() < 0), it should match all query
// 2 when policy has group, it can only match the query which has the same group
if (_wg_id_set.size() > 0 && (query_info_ptr->wg_id <= 0 ||
_wg_id_set.find(query_info_ptr->wg_id) == _wg_id_set.end())) {
return false;
}

auto& metric_val_map = query_info_ptr->metric_map;
for (auto& cond : _condition_list) {
if (metric_val_map.find(cond->get_workload_metric_type()) == metric_val_map.end()) {
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/workload_management/workload_sched_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class WorkloadSchedPolicy {
~WorkloadSchedPolicy() = default;

void init(int64_t id, std::string name, int version, bool enabled, int priority,
std::set<int64_t> wg_id_set,
std::vector<std::unique_ptr<WorkloadCondition>> condition_list,
std::vector<std::unique_ptr<WorkloadAction>> action_list);

Expand All @@ -50,6 +51,7 @@ class WorkloadSchedPolicy {
int _version;
bool _enabled;
int _priority;
std::set<int64_t> _wg_id_set;

std::vector<std::unique_ptr<WorkloadCondition>> _condition_list;
std::vector<std::unique_ptr<WorkloadAction>> _action_list;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ void WorkloadSchedPolicyMgr::_schedule_workload() {
while (!_stop_latch.wait_for(std::chrono::milliseconds(500))) {
// 1 get query info
std::vector<WorkloadQueryInfo> list;
//todo(wb) maybe we can get runtime queryinfo from RuntimeQueryStatiticsMgr directly
_exec_env->fragment_mgr()->get_runtime_query_info(&list);
// todo: add timer
if (list.size() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,17 @@ public void dropWorkloadGroup(DropWorkloadGroupStmt stmt) throws DdlException {
throw new DdlException("workload group " + workloadGroupName + " is set for user " + ret.second);
}

// A group with related policies should not be deleted.
Long wgId = getWorkloadGroupIdByName(workloadGroupName);
if (wgId != null) {
boolean groupHasPolicy = Env.getCurrentEnv().getWorkloadSchedPolicyMgr()
.checkWhetherGroupHasPolicy(wgId.longValue());
if (groupHasPolicy) {
throw new DdlException(
"workload group " + workloadGroupName + " can't be dropped, because it has related policy");
}
}

writeLock();
try {
if (!nameToWorkloadGroup.containsKey(workloadGroupName)) {
Expand Down Expand Up @@ -484,6 +495,19 @@ public Long getWorkloadGroupIdByName(String name) {
}
}

public Map<Long, String> getIdToNameMap() {
Map<Long, String> ret = Maps.newHashMap();
readLock();
try {
for (Map.Entry<Long, WorkloadGroup> entry : idToWorkloadGroup.entrySet()) {
ret.put(entry.getKey(), entry.getValue().getName());
}
return ret;
} finally {
readUnlock();
}
}

public String getWorkloadGroupNameById(Long id) {
readLock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ public class WorkloadSchedPolicy implements Writable, GsonPostProcessable {

public static final String ENABLED = "enabled";
public static final String PRIORITY = "priority";
public static final String WORKLOAD_GROUP = "workload_group";

public static final ImmutableSet<String> POLICY_PROPERTIES = new ImmutableSet.Builder<String>()
.add(ENABLED).add(PRIORITY).build();
.add(ENABLED).add(PRIORITY).add(WORKLOAD_GROUP).build();

// used for convert fe type to thrift type
private static ImmutableMap<WorkloadMetricType, TWorkloadMetricType> METRIC_MAP
Expand Down Expand Up @@ -80,6 +82,9 @@ public class WorkloadSchedPolicy implements Writable, GsonPostProcessable {
@SerializedName(value = "priority")
private volatile int priority;

@SerializedName(value = "wgIdList")
private List<Long> workloadGroupIdList = new ArrayList<>();

@SerializedName(value = "conditionMetaList")
List<WorkloadConditionMeta> conditionMetaList;
// we regard action as a command, map's key is command, map's value is args, so it's a command list
Expand All @@ -101,13 +106,18 @@ public void setWorkloadConditionList(List<WorkloadCondition> workloadConditionLi
}

public WorkloadSchedPolicy(long id, String name, List<WorkloadCondition> workloadConditionList,
List<WorkloadAction> workloadActionList, Map<String, String> properties) throws UserException {
List<WorkloadAction> workloadActionList, Map<String, String> properties, List<Long> wgIdList) {
this.id = id;
this.name = name;
this.workloadConditionList = workloadConditionList;
this.workloadActionList = workloadActionList;
// set enable and priority
parseAndSetProperties(properties);

String enabledStr = properties.get(ENABLED);
this.enabled = enabledStr == null ? true : Boolean.parseBoolean(enabledStr);

String priorityStr = properties.get(PRIORITY);
this.priority = priorityStr == null ? 0 : Integer.parseInt(priorityStr);
this.workloadGroupIdList = wgIdList;
this.version = 0;
}

Expand Down Expand Up @@ -162,12 +172,20 @@ public WorkloadActionType getFirstActionType() {
return retType;
}

public void parseAndSetProperties(Map<String, String> properties) throws UserException {
String enabledStr = properties.get(ENABLED);
this.enabled = enabledStr == null ? true : Boolean.parseBoolean(enabledStr);
public void updateProperty(Map<String, String> property, List<Long> wgIdList) {
String enabledStr = property.get(ENABLED);
if (enabledStr != null) {
this.enabled = Boolean.parseBoolean(enabledStr);
}

String priorityStr = properties.get(PRIORITY);
this.priority = priorityStr == null ? 0 : Integer.parseInt(priorityStr);
String priorityStr = property.get(PRIORITY);
if (priorityStr != null) {
this.priority = Integer.parseInt(priorityStr);
}

if (wgIdList.size() > 0) {
this.workloadGroupIdList = wgIdList;
}
}

void incrementVersion() {
Expand All @@ -194,6 +212,10 @@ public long getVersion() {
return version;
}

public List<Long> getWorkloadGroupIdList() {
return this.workloadGroupIdList;
}

public List<WorkloadConditionMeta> getConditionMetaList() {
return conditionMetaList;
}
Expand Down Expand Up @@ -224,6 +246,7 @@ public TopicInfo toTopicInfo() {
tPolicy.setVersion(version);
tPolicy.setPriority(priority);
tPolicy.setEnabled(enabled);
tPolicy.setWgIdList(workloadGroupIdList);

List<TWorkloadCondition> condList = new ArrayList();
for (WorkloadConditionMeta cond : conditionMetaList) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable {
public static final ImmutableList<String> WORKLOAD_SCHED_POLICY_NODE_TITLE_NAMES
= new ImmutableList.Builder<String>()
.add("Id").add("Name").add("Condition").add("Action").add("Priority").add("Enabled").add("Version")
.add("WorkloadGroup")
.build();

public static final ImmutableSet<WorkloadActionType> FE_ACTION_SET
Expand Down Expand Up @@ -185,8 +186,9 @@ public void createWorkloadSchedPolicy(CreateWorkloadSchedPolicyStmt createStmt)
if (propMap == null) {
propMap = new HashMap<>();
}
List<Long> wgIdList = new ArrayList<>();
if (propMap.size() != 0) {
checkProperties(propMap);
checkProperties(propMap, wgIdList);
}
writeLock();
try {
Expand All @@ -203,7 +205,7 @@ public void createWorkloadSchedPolicy(CreateWorkloadSchedPolicyStmt createStmt)
}
long id = Env.getCurrentEnv().getNextId();
WorkloadSchedPolicy policy = new WorkloadSchedPolicy(id, policyName,
policyConditionList, policyActionList, propMap);
policyConditionList, policyActionList, propMap, wgIdList);
policy.setConditionMeta(originConditions);
policy.setActionMeta(originActions);
Env.getCurrentEnv().getEditLog().logCreateWorkloadSchedPolicy(policy);
Expand Down Expand Up @@ -382,7 +384,7 @@ List<WorkloadSchedPolicy> pickPolicy(Map<WorkloadActionType, Queue<WorkloadSched
return ret;
}

private void checkProperties(Map<String, String> properties) throws UserException {
private void checkProperties(Map<String, String> properties, List<Long> wgIdList) throws UserException {
Set<String> allInputPropKeySet = new HashSet<>();
allInputPropKeySet.addAll(properties.keySet());

Expand Down Expand Up @@ -410,6 +412,15 @@ private void checkProperties(Map<String, String> properties) throws UserExceptio
"invalid priority property value, it must be a number, input value=" + priorityStr);
}
}

String workloadGroupNameStr = properties.get(WorkloadSchedPolicy.WORKLOAD_GROUP);
if (workloadGroupNameStr != null && !workloadGroupNameStr.isEmpty()) {
Long wgId = Env.getCurrentEnv().getWorkloadGroupMgr().getWorkloadGroupIdByName(workloadGroupNameStr);
if (wgId == null) {
throw new UserException("unknown workload group:" + workloadGroupNameStr);
}
wgIdList.add(wgId);
}
}

public void alterWorkloadSchedPolicy(AlterWorkloadSchedPolicyStmt alterStmt) throws UserException {
Expand All @@ -422,8 +433,9 @@ public void alterWorkloadSchedPolicy(AlterWorkloadSchedPolicyStmt alterStmt) thr
}

Map<String, String> properties = alterStmt.getProperties();
checkProperties(properties);
policy.parseAndSetProperties(properties);
List<Long> wgIdList = new ArrayList<>();
checkProperties(properties, wgIdList);
policy.updateProperty(properties, wgIdList);
policy.incrementVersion();
Env.getCurrentEnv().getEditLog().logAlterWorkloadSchedPolicy(policy);
} finally {
Expand Down Expand Up @@ -530,10 +542,25 @@ public List<List<String>> getWorkloadSchedPolicyTvfInfo(TUserIdentity tcurrentUs
return policyProcNode.fetchResult(currentUserIdentity).getRows();
}

public boolean checkWhetherGroupHasPolicy(long wgId) {
readLock();
try {
for (Map.Entry<Long, WorkloadSchedPolicy> entry : idToPolicy.entrySet()) {
if (entry.getValue().getWorkloadGroupIdList().contains(wgId)) {
return true;
}
}
} finally {
readUnlock();
}
return false;
}

public class PolicyProcNode {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个proc 功能,我们如果有了系统表,感觉可以不要了,直接删了吧。我们不维护了。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

等加上policy的schema table时就删

public ProcResult fetchResult(UserIdentity currentUserIdentity) {
BaseProcResult result = new BaseProcResult();
result.setNames(WORKLOAD_SCHED_POLICY_NODE_TITLE_NAMES);
Map<Long, String> idToNameMap = Env.getCurrentEnv().getWorkloadGroupMgr().getIdToNameMap();
readLock();
try {
for (WorkloadSchedPolicy policy : idToPolicy.values()) {
Expand Down Expand Up @@ -566,6 +593,19 @@ public ProcResult fetchResult(UserIdentity currentUserIdentity) {
row.add(String.valueOf(policy.getPriority()));
row.add(String.valueOf(policy.isEnabled()));
row.add(String.valueOf(policy.getVersion()));

List<Long> wgIdList = policy.getWorkloadGroupIdList();
if (wgIdList.size() == 0) {
row.add("");
} else {
Long wgId = wgIdList.get(0);
String wgName = idToNameMap.get(wgId);
if (wgName == null) {
row.add("null");
} else {
row.add(wgName);
}
}
result.addRow(row);
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,7 @@ private static TFetchSchemaTableDataResult workloadSchedPolicyMetadataResult(TMe
trow.addToColumnValue(new TCell().setIntVal(Integer.valueOf(policyRow.get(4)))); // priority
trow.addToColumnValue(new TCell().setBoolVal(Boolean.valueOf(policyRow.get(5)))); // enabled
trow.addToColumnValue(new TCell().setIntVal(Integer.valueOf(policyRow.get(6)))); // version
trow.addToColumnValue(new TCell().setStringVal(policyRow.get(7))); // workload group id
dataBatch.add(trow);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public class WorkloadSchedPolicyTableValuedFunction extends MetadataTableValuedF
new Column("Action", ScalarType.createType(PrimitiveType.STRING)),
new Column("Priority", ScalarType.createType(PrimitiveType.INT)),
new Column("Enabled", ScalarType.createType(PrimitiveType.BOOLEAN)),
new Column("Version", ScalarType.createType(PrimitiveType.INT)));
new Column("Version", ScalarType.createType(PrimitiveType.INT)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

需要修改对应的regression test。

  1. 新建policy的时候指定group name, group 不存在异常的处理。
  2. 修改已经存在的policy,指定group name。
  3. drop 已经存在的group,关联policy和不关联policy。

new Column("WorkloadGroup", ScalarType.createType(PrimitiveType.STRING)));

private static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;

Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/BackendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ struct TWorkloadSchedPolicy {
5: optional bool enabled
6: optional list<TWorkloadCondition> condition_list
7: optional list<TWorkloadAction> action_list
8: optional list<i64> wg_id_list
}

struct TopicInfo {
Expand Down
Loading