From f74b525c6152e2a2e25e9e44a14a98d28f5ee5a2 Mon Sep 17 00:00:00 2001 From: liulijia Date: Mon, 5 Jul 2021 21:11:23 +0800 Subject: [PATCH 1/2] Fix #6158 add a property to limit user instance num. --- .../administrator-guide/config/fe_config.md | 7 +++ .../config/user_property.md | 4 ++ .../Account Management/SET PROPERTY.md | 6 +- .../administrator-guide/config/fe_config.md | 7 +++ .../config/user_property.md | 4 ++ .../Account Management/SET PROPERTY.md | 4 ++ .../java/org/apache/doris/common/Config.java | 8 ++- .../apache/doris/common/FeMetaVersion.java | 4 +- .../org/apache/doris/metric/MetricRepo.java | 13 ++++ .../doris/mysql/privilege/PaloAuth.java | 9 +++ .../doris/mysql/privilege/UserProperty.java | 33 +++++++++- .../mysql/privilege/UserPropertyMgr.java | 9 +++ .../java/org/apache/doris/qe/Coordinator.java | 2 + .../java/org/apache/doris/qe/QeProcessor.java | 2 + .../org/apache/doris/qe/QeProcessorImpl.java | 61 ++++++++++++++++++- .../doris/catalog/UserPropertyTest.java | 4 ++ 16 files changed, 169 insertions(+), 8 deletions(-) diff --git a/docs/en/administrator-guide/config/fe_config.md b/docs/en/administrator-guide/config/fe_config.md index a0fbb86c19b0ee..ce7fb0be5debb6 100644 --- a/docs/en/administrator-guide/config/fe_config.md +++ b/docs/en/administrator-guide/config/fe_config.md @@ -1997,3 +1997,10 @@ Load label cleaner will run every *label_clean_interval_second* to clean the out Default:30 the transaction will be cleaned after transaction_clean_interval_second seconds if the transaction is visible or aborted we should make this interval as short as possible and each clean cycle as soon as possible + + +### `default_max_query_instances` + +The default value when user property max_query_instances is equal or less than 0. This config is used to limit the max number of instances for a user. This parameter is less than or equal to 0 means unlimited. + +The default value is -1。 \ No newline at end of file diff --git a/docs/en/administrator-guide/config/user_property.md b/docs/en/administrator-guide/config/user_property.md index 872349faf304b8..27a2d3653f85f3 100644 --- a/docs/en/administrator-guide/config/user_property.md +++ b/docs/en/administrator-guide/config/user_property.md @@ -60,6 +60,10 @@ User-level configuration items will only take effect for the specified users, an The maximum number of user connections, the default value is 100 In general, this parameter does not need to be changed unless the number of concurrent queries exceeds the default value. +### max_query_instances + + The maximum number of instances that the user can use at a certain point in time, The default value is -1, negative number means use default_max_query_instances config. + ### resource ### quota diff --git a/docs/en/sql-reference/sql-statements/Account Management/SET PROPERTY.md b/docs/en/sql-reference/sql-statements/Account Management/SET PROPERTY.md index c7c72cd7b94309..d70c9c415ebfa6 100644 --- a/docs/en/sql-reference/sql-statements/Account Management/SET PROPERTY.md +++ b/docs/en/sql-reference/sql-statements/Account Management/SET PROPERTY.md @@ -38,7 +38,8 @@ Importing cluster is only applicable to Baidu internal users. key: Super user rights: -Max_user_connections: Maximum number of connections. +max_user_connections: Maximum number of connections. +max_query_instances: Maximum number of query instance user can use when query. resource.cpu_share: cpu resource assignment. Load_cluster. {cluster_name}. priority: assigns priority to a specified cluster, which can be HIGH or NORMAL @@ -77,6 +78,9 @@ SET PROPERTY FOR 'jack' 'default_load_cluster' = '{cluster_name}'; 7. Modify the cluster priority of user Jack to HIGH SET PROPERTY FOR 'jack' 'load_cluster.{cluster_name}.priority' = 'HIGH'; +8. Modify the maximum number of query instance for jack to 3000 +SET PROPERTY FOR 'jack' 'max_query_instances' = '3000'; + ## keyword SET, PROPERTY diff --git a/docs/zh-CN/administrator-guide/config/fe_config.md b/docs/zh-CN/administrator-guide/config/fe_config.md index 48df2ea9dc33a4..749fd7031c882c 100644 --- a/docs/zh-CN/administrator-guide/config/fe_config.md +++ b/docs/zh-CN/administrator-guide/config/fe_config.md @@ -2003,3 +2003,10 @@ load 标签清理器将每隔 `label_clean_interval_second` 运行一次以清 默认值:30 如果事务 visible 或者 aborted 状态,事务将在 `transaction_clean_interval_second` 秒后被清除 ,我们应该让这个间隔尽可能短,每个清洁周期都尽快 + + +### `default_max_query_instances` + +默认值:-1 + +用户属性max_query_instances小于等于0时,使用该配置,用来限制单个用户同一时刻可使用的查询instance个数。该参数小于等于0表示无限制。 \ No newline at end of file diff --git a/docs/zh-CN/administrator-guide/config/user_property.md b/docs/zh-CN/administrator-guide/config/user_property.md index 76ec9e58e8363c..ff572342967279 100644 --- a/docs/zh-CN/administrator-guide/config/user_property.md +++ b/docs/zh-CN/administrator-guide/config/user_property.md @@ -60,6 +60,10 @@ User 级别的配置项只会对指定用户生效,并不会影响其他用户 用户最大的连接数,默认值为100。一般情况不需要更改该参数,除非查询的并发数超过了默认值。 +### max_query_instances + + 用户同一时间点可使用的instance个数, 默认是-1,小于等于0将会使用配置default_max_query_instances. + ### resource ### quota diff --git a/docs/zh-CN/sql-reference/sql-statements/Account Management/SET PROPERTY.md b/docs/zh-CN/sql-reference/sql-statements/Account Management/SET PROPERTY.md index 41f294fa46651d..1f29a6cb6905ee 100644 --- a/docs/zh-CN/sql-reference/sql-statements/Account Management/SET PROPERTY.md +++ b/docs/zh-CN/sql-reference/sql-statements/Account Management/SET PROPERTY.md @@ -39,6 +39,7 @@ under the License. 超级用户权限: max_user_connections: 最大连接数。 + max_query_instances: 用户同一时间点执行查询可以使用的instance个数。 resource.cpu_share: cpu资源分配。 load_cluster.{cluster_name}.priority: 为指定的cluster分配优先级,可以为 HIGH 或 NORMAL @@ -77,6 +78,9 @@ under the License. 7. 修改用户 jack 的集群优先级为 HIGH SET PROPERTY FOR 'jack' 'load_cluster.{cluster_name}.priority' = 'HIGH'; + 8. 修改用户jack的查询可用instance个数为3000 + SET PROPERTY FOR 'jack' 'max_query_instances' = '3000'; + ## keyword SET, PROPERTY diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 9d351def23c47d..eda8a5612c1959 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1403,9 +1403,15 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static int max_dynamic_partition_num = 500; - /* + /** * Control the max num of backup/restore job per db */ @ConfField(mutable = true, masterOnly = true) public static int max_backup_restore_job_num_per_db = 10; + + /** + * Control the default max num of the instance for a user. + */ + @ConfField(mutable = true) + public static int default_max_query_instances = -1; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java index b26f372cfb8a59..02a7b3063dda62 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -210,6 +210,8 @@ public final class FeMetaVersion { public static final int VERSION_98 = 98; // add audit steam load and change the serialization backend method to json public static final int VERSION_99 = 99; + // for max query instance + public static final int VERSION_100 = 100; // note: when increment meta version, should assign the latest version to VERSION_CURRENT - public static final int VERSION_CURRENT = VERSION_99; + public static final int VERSION_CURRENT = VERSION_100; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java index 9196eeaf599298..4d9b07274ca50e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java @@ -32,10 +32,12 @@ import org.apache.doris.monitor.jvm.JvmService; import org.apache.doris.monitor.jvm.JvmStats; import org.apache.doris.persist.EditLog; +import org.apache.doris.qe.QeProcessorImpl; import org.apache.doris.service.ExecuteEnv; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; +import com.codahale.metrics.Gauge; import com.codahale.metrics.Histogram; import com.codahale.metrics.MetricRegistry; import com.google.common.collect.Sets; @@ -49,6 +51,7 @@ import java.util.SortedMap; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.function.BinaryOperator; public final class MetricRepo { private static final Logger LOG = LogManager.getLogger(MetricRepo.class); @@ -302,6 +305,16 @@ public Long getValue() { HISTO_QUERY_LATENCY = METRIC_REGISTER.histogram(MetricRegistry.name("query", "latency", "ms")); HISTO_EDIT_LOG_WRITE_LATENCY = METRIC_REGISTER.histogram(MetricRegistry.name("editlog", "write", "latency", "ms")); + METRIC_REGISTER.register(MetricRegistry.name("palo", "fe", "query", "max_instances_num_per_user"), (Gauge) () -> { + try{ + return ((QeProcessorImpl)QeProcessorImpl.INSTANCE).getInstancesNumPerUser().values().stream() + .reduce(-1, BinaryOperator.maxBy(Integer::compareTo)); + } catch (Throwable ex) { + LOG.warn("Get max_instances_num_per_user error", ex); + return -2; + } + }); + // init system metrics initSystemMetrics(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/PaloAuth.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/PaloAuth.java index a49329e477176e..29374132ff7c5b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/PaloAuth.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/PaloAuth.java @@ -1048,6 +1048,15 @@ public long getMaxConn(String qualifiedUser) { } } + public long getMaxQueryInstances(String qualifiedUser) { + readLock(); + try { + return propertyMgr.getMaxQueryInstances(qualifiedUser); + } finally { + readUnlock(); + } + } + public void getAllDomains(Set allDomains) { readLock(); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java index 72c9b712f8b23c..a3b7e2b9bfe1ef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java @@ -39,8 +39,6 @@ import com.google.common.collect.Sets; import org.apache.commons.lang.StringUtils; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import java.io.DataInput; import java.io.DataOutput; @@ -58,13 +56,13 @@ * This user is just qualified by cluster name, not host which it connected from. */ public class UserProperty implements Writable { - private static final Logger LOG = LogManager.getLogger(UserProperty.class); private static final String PROP_MAX_USER_CONNECTIONS = "max_user_connections"; private static final String PROP_RESOURCE = "resource"; private static final String PROP_QUOTA = "quota"; private static final String PROP_DEFAULT_LOAD_CLUSTER = "default_load_cluster"; private static final String PROP_LOAD_CLUSTER = "load_cluster"; + private static final String PROP_MAX_QUERY_INSTANCES = "max_query_instances"; // for system user public static final Set ADVANCED_PROPERTIES = Sets.newHashSet(); @@ -74,6 +72,9 @@ public class UserProperty implements Writable { private String qualifiedUser; private long maxConn = 100; + + private long maxQueryInstances = -1; + // Resource belong to this user. private UserResource resource = new UserResource(1000); // load cluster @@ -101,6 +102,7 @@ public class UserProperty implements Writable { ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_RESOURCE + ".", Pattern.CASE_INSENSITIVE)); ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_LOAD_CLUSTER + "." + DppConfig.CLUSTER_NAME_REGEX + "." + DppConfig.PRIORITY + "$", Pattern.CASE_INSENSITIVE)); + ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_MAX_QUERY_INSTANCES + "$", Pattern.CASE_INSENSITIVE)); COMMON_PROPERTIES.add(Pattern.compile("^" + PROP_QUOTA + ".", Pattern.CASE_INSENSITIVE)); COMMON_PROPERTIES.add(Pattern.compile("^" + PROP_DEFAULT_LOAD_CLUSTER + "$", Pattern.CASE_INSENSITIVE)); @@ -123,6 +125,10 @@ public long getMaxConn() { return maxConn; } + public long getMaxQueryInstances() { + return maxQueryInstances; + } + public WhiteList getWhiteList() { return whiteList; } @@ -164,6 +170,7 @@ public void removeDomain(String domain) { public void update(List> properties) throws DdlException { // copy long newMaxConn = maxConn; + long newMaxQueryInstances = maxQueryInstances; UserResource newResource = resource.getCopiedUserResource(); String newDefaultLoadCluster = defaultLoadCluster; Map newDppConfigs = Maps.newHashMap(clusterToDppConfig); @@ -237,6 +244,17 @@ public void update(List> properties) throws DdlException { } newDefaultLoadCluster = value; + } else if (keyArr[0].equalsIgnoreCase(PROP_MAX_QUERY_INSTANCES)) { + // set property "max_query_instances" = "1000" + if (keyArr.length != 1) { + throw new DdlException(PROP_MAX_QUERY_INSTANCES + " format error"); + } + + try { + newMaxQueryInstances = Long.parseLong(value); + } catch (NumberFormatException e) { + throw new DdlException(PROP_MAX_QUERY_INSTANCES + " is not number"); + } } else { throw new DdlException("Unknown user property(" + key + ")"); } @@ -244,6 +262,7 @@ public void update(List> properties) throws DdlException { // set maxConn = newMaxConn; + maxQueryInstances = newMaxQueryInstances; resource = newResource; if (newDppConfigs.containsKey(newDefaultLoadCluster)) { defaultLoadCluster = newDefaultLoadCluster; @@ -328,6 +347,9 @@ public List> fetchProperty() { // max user connections result.add(Lists.newArrayList(PROP_MAX_USER_CONNECTIONS, String.valueOf(maxConn))); + // max query instance + result.add(Lists.newArrayList(PROP_MAX_QUERY_INSTANCES, String.valueOf(maxQueryInstances))); + // resource ResourceGroup group = resource.getResource(); for (Map.Entry entry : group.getQuotaMap().entrySet()) { @@ -427,6 +449,7 @@ public void write(DataOutput out) throws IOException { } whiteList.write(out); + out.writeLong(maxQueryInstances); } public void readFields(DataInput in) throws IOException { if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_43) { @@ -502,5 +525,9 @@ public void readFields(DataInput in) throws IOException { } } } + + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_100) { + maxQueryInstances = in.readLong(); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyMgr.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyMgr.java index ca054705ba54f7..5f9cde8050ee77 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyMgr.java @@ -19,6 +19,7 @@ import org.apache.doris.analysis.UserIdentity; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.Pair; import org.apache.doris.common.io.Writable; @@ -122,6 +123,14 @@ public long getMaxConn(String qualifiedUser) { return existProperty.getMaxConn(); } + public long getMaxQueryInstances(String qualifiedUser) { + UserProperty existProperty = propertyMap.get(qualifiedUser); + if (existProperty == null) { + return Config.default_max_query_instances; + } + return existProperty.getMaxQueryInstances(); + } + public int getPropertyMapSize() { return propertyMap.size(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index e957dd26ed887f..10ae31f7aae079 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -412,6 +412,8 @@ public void exec() throws Exception { traceInstance(); + QeProcessorImpl.INSTANCE.registerInstances(queryId, instanceIds.size()); + // create result receiver PlanFragmentId topId = fragments.get(0).getFragmentId(); FragmentExecParams topParams = fragmentExecParamsMap.get(topId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessor.java index e65af488c0ea2e..30ecbe6da72bb1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessor.java @@ -33,6 +33,8 @@ public interface QeProcessor { void registerQuery(TUniqueId queryId, QeProcessorImpl.QueryInfo info) throws UserException; + void registerInstances(TUniqueId queryId, Integer instancesNum) throws UserException; + void unregisterQuery(TUniqueId queryId); Map getQueryStatistics(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java index 4105afd59e70db..c421b5676dd35b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java @@ -17,6 +17,7 @@ package org.apache.doris.qe; +import org.apache.doris.common.Config; import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; @@ -28,19 +29,25 @@ import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TUniqueId; +import com.google.common.base.Strings; import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; public final class QeProcessorImpl implements QeProcessor { private static final Logger LOG = LogManager.getLogger(QeProcessorImpl.class); private Map coordinatorMap; + private Map queryToInstancesNum; + private Map userToInstancesCount; + public static final QeProcessor INSTANCE; static { @@ -50,10 +57,12 @@ public final class QeProcessorImpl implements QeProcessor { private ExecutorService writeProfileExecutor; private QeProcessorImpl() { - coordinatorMap = Maps.newConcurrentMap(); + coordinatorMap = new ConcurrentHashMap<>(); // write profile to ProfileManager when query is running. writeProfileExecutor = ThreadPoolManager.newDaemonProfileThreadPool(1, 100, "profile-write-pool", true); + queryToInstancesNum = new ConcurrentHashMap<>(); + userToInstancesCount = new ConcurrentHashMap<>(); } @Override @@ -79,10 +88,58 @@ public void registerQuery(TUniqueId queryId, QueryInfo info) throws UserExceptio } } + public void registerInstances(TUniqueId queryId, Integer instancesNum) throws UserException { + if (!coordinatorMap.containsKey(queryId)) { + throw new UserException("query not exists in coordinatorMap:" + DebugUtil.printId(queryId)); + } + QueryInfo queryInfo = coordinatorMap.get(queryId); + if (queryInfo.getConnectContext() != null && + !Strings.isNullOrEmpty(queryInfo.getConnectContext().getQualifiedUser()) + ) { + String user = queryInfo.getConnectContext().getQualifiedUser(); + long maxQueryInstances = queryInfo.getConnectContext().getCatalog().getAuth().getMaxQueryInstances(user); + if (maxQueryInstances <= 0) { + maxQueryInstances = Config.default_max_query_instances; + } + if (maxQueryInstances > 0) { + AtomicInteger currentCount = userToInstancesCount.computeIfAbsent(user, __ -> new AtomicInteger(0)); + // Many query can reach here. + if (instancesNum + currentCount.get() > maxQueryInstances) { + throw new UserException("reach max_query_instances " + maxQueryInstances); + } + } + queryToInstancesNum.put(queryId, instancesNum); + userToInstancesCount.computeIfAbsent(user, __ -> new AtomicInteger(0)).addAndGet(instancesNum); + } + } + + public Map getInstancesNumPerUser() { + return Maps.transformEntries(userToInstancesCount, (__, value) -> value != null ? value.get() : 0); + } + @Override public void unregisterQuery(TUniqueId queryId) { - if (coordinatorMap.remove(queryId) != null) { + QueryInfo queryInfo = coordinatorMap.remove(queryId); + if (queryInfo != null) { LOG.info("deregister query id {}", DebugUtil.printId(queryId)); + if (queryInfo.getConnectContext() != null && + !Strings.isNullOrEmpty(queryInfo.getConnectContext().getQualifiedUser()) + ) { + Integer num = queryToInstancesNum.remove(queryId); + if (num != null) { + String user = queryInfo.getConnectContext().getQualifiedUser(); + AtomicInteger instancesNum = userToInstancesCount.get(user); + if (instancesNum == null) { + LOG.warn("WTF?? query {} in queryToInstancesNum but not in userToInstancesCount", + DebugUtil.printId(queryId) + ); + } else { + instancesNum.addAndGet(-num); + } + } + } + } else { + LOG.warn("not found query {} when unregisterQuery", DebugUtil.printId(queryId)); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/UserPropertyTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/UserPropertyTest.java index bb1eee6320caad..6fd1bf786bd8db 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/UserPropertyTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/UserPropertyTest.java @@ -64,6 +64,7 @@ public void testUpdate() throws DdlException { properties.add(Pair.create("quota.normal", "102")); properties.add(Pair.create("load_cluster.dpp-cluster.hadoop_palo_path", "/user/palo2")); properties.add(Pair.create("default_load_cluster", "dpp-cluster")); + properties.add(Pair.create("max_qUERY_instances", "3000")); UserProperty userProperty = new UserProperty(); userProperty.update(properties); @@ -72,6 +73,7 @@ public void testUpdate() throws DdlException { Assert.assertEquals(102, userProperty.getResource().getShareByGroup().get("normal").intValue()); Assert.assertEquals("/user/palo2", userProperty.getLoadClusterInfo("dpp-cluster").second.getPaloPath()); Assert.assertEquals("dpp-cluster", userProperty.getDefaultLoadCluster()); + Assert.assertEquals(3000, userProperty.getMaxQueryInstances()); // fetch property List> rows = userProperty.fetchProperty(); @@ -89,6 +91,8 @@ public void testUpdate() throws DdlException { Assert.assertEquals("/user/palo2", value); } else if (key.equalsIgnoreCase("default_load_cluster")) { Assert.assertEquals("dpp-cluster", value); + } else if (key.equalsIgnoreCase("max_query_instances")) { + Assert.assertEquals("3000", value); } } From 307c47fb24eaa6c639d01002aacaac593e80000f Mon Sep 17 00:00:00 2001 From: liulijia Date: Thu, 8 Jul 2021 00:14:19 +0800 Subject: [PATCH 2/2] Add CommonUserProperties to serialize common simple user property. --- .../mysql/privilege/CommonUserProperties.java | 65 +++++++++++++++ .../doris/mysql/privilege/UserProperty.java | 83 ++++++++----------- 2 files changed, 99 insertions(+), 49 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/CommonUserProperties.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/CommonUserProperties.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/CommonUserProperties.java new file mode 100644 index 00000000000000..0695f8a0455a84 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/CommonUserProperties.java @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mysql.privilege; + +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.gson.annotations.SerializedName; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * Used in + */ +public class CommonUserProperties implements Writable { + @SerializedName("maxConn") + private long maxConn = 100; + @SerializedName("maxQueryInstances") + private long maxQueryInstances = -1; + + long getMaxConn() { + return maxConn; + } + + long getMaxQueryInstances() { + return maxQueryInstances; + } + + void setMaxConn(long maxConn) { + this.maxConn = maxConn; + } + + void setMaxQueryInstances(long maxQueryInstances) { + this.maxQueryInstances = maxQueryInstances; + } + + public static CommonUserProperties read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, CommonUserProperties.class); + } + + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); + } +} \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java index a3b7e2b9bfe1ef..57578aeb1e2646 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java @@ -57,12 +57,15 @@ */ public class UserProperty implements Writable { + // common properties private static final String PROP_MAX_USER_CONNECTIONS = "max_user_connections"; + private static final String PROP_MAX_QUERY_INSTANCES = "max_query_instances"; + // common properties end + private static final String PROP_RESOURCE = "resource"; private static final String PROP_QUOTA = "quota"; private static final String PROP_DEFAULT_LOAD_CLUSTER = "default_load_cluster"; private static final String PROP_LOAD_CLUSTER = "load_cluster"; - private static final String PROP_MAX_QUERY_INSTANCES = "max_query_instances"; // for system user public static final Set ADVANCED_PROPERTIES = Sets.newHashSet(); @@ -71,9 +74,7 @@ public class UserProperty implements Writable { private String qualifiedUser; - private long maxConn = 100; - - private long maxQueryInstances = -1; + private CommonUserProperties commonProperties = new CommonUserProperties(); // Resource belong to this user. private UserResource resource = new UserResource(1000); @@ -88,15 +89,6 @@ public class UserProperty implements Writable { */ private WhiteList whiteList = new WhiteList(); - @Deprecated - private byte[] password; - @Deprecated - private boolean isAdmin = false; - @Deprecated - private boolean isSuperuser = false; - @Deprecated - private Map dbPrivMap = Maps.newHashMap(); - static { ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_MAX_USER_CONNECTIONS + "$", Pattern.CASE_INSENSITIVE)); ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_RESOURCE + ".", Pattern.CASE_INSENSITIVE)); @@ -122,37 +114,17 @@ public String getQualifiedUser() { } public long getMaxConn() { - return maxConn; + return this.commonProperties.getMaxConn(); } public long getMaxQueryInstances() { - return maxQueryInstances; + return commonProperties.getMaxQueryInstances();// maxQueryInstances; } public WhiteList getWhiteList() { return whiteList; } - @Deprecated - public byte[] getPassword() { - return password; - } - - @Deprecated - public boolean isAdmin() { - return isAdmin; - } - - @Deprecated - public boolean isSuperuser() { - return isSuperuser; - } - - @Deprecated - public Map getDbPrivMap() { - return dbPrivMap; - } - public void setPasswordForDomain(String domain, byte[] password, boolean errOnExist) throws DdlException { if (errOnExist && whiteList.containsDomain(domain)) { throw new DdlException("Domain " + domain + " of user " + qualifiedUser + " already exists"); @@ -169,8 +141,8 @@ public void removeDomain(String domain) { public void update(List> properties) throws DdlException { // copy - long newMaxConn = maxConn; - long newMaxQueryInstances = maxQueryInstances; + long newMaxConn = this.commonProperties.getMaxConn(); + long newMaxQueryInstances = this.commonProperties.getMaxQueryInstances(); UserResource newResource = resource.getCopiedUserResource(); String newDefaultLoadCluster = defaultLoadCluster; Map newDppConfigs = Maps.newHashMap(clusterToDppConfig); @@ -261,8 +233,8 @@ public void update(List> properties) throws DdlException { } // set - maxConn = newMaxConn; - maxQueryInstances = newMaxQueryInstances; + this.commonProperties.setMaxConn(newMaxConn); + this.commonProperties.setMaxQueryInstances(newMaxQueryInstances); resource = newResource; if (newDppConfigs.containsKey(newDefaultLoadCluster)) { defaultLoadCluster = newDefaultLoadCluster; @@ -345,10 +317,10 @@ public List> fetchProperty() { String dot = SetUserPropertyVar.DOT_SEPARATOR; // max user connections - result.add(Lists.newArrayList(PROP_MAX_USER_CONNECTIONS, String.valueOf(maxConn))); + result.add(Lists.newArrayList(PROP_MAX_USER_CONNECTIONS, String.valueOf(commonProperties.getMaxConn()))); // max query instance - result.add(Lists.newArrayList(PROP_MAX_QUERY_INSTANCES, String.valueOf(maxQueryInstances))); + result.add(Lists.newArrayList(PROP_MAX_QUERY_INSTANCES, String.valueOf(commonProperties.getMaxQueryInstances()))); // resource ResourceGroup group = resource.getResource(); @@ -426,10 +398,12 @@ public static UserProperty read(DataInput in) throws IOException { return userProperty; } + + @Override public void write(DataOutput out) throws IOException { + // user name Text.writeString(out, qualifiedUser); - out.writeLong(maxConn); // user resource resource.write(out); @@ -441,16 +415,19 @@ public void write(DataOutput out) throws IOException { out.writeBoolean(true); Text.writeString(out, defaultLoadCluster); } - out.writeInt(clusterToDppConfig.size()); for (Map.Entry entry : clusterToDppConfig.entrySet()) { Text.writeString(out, entry.getKey()); entry.getValue().write(out); } + // whiteList whiteList.write(out); - out.writeLong(maxQueryInstances); + + // common properties + commonProperties.write(out); } + public void readFields(DataInput in) throws IOException { if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_43) { // consume the flag of empty user name @@ -466,19 +443,25 @@ public void readFields(DataInput in) throws IOException { if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_43) { int passwordLen = in.readInt(); - password = new byte[passwordLen]; + byte[] password = new byte[passwordLen]; in.readFully(password); - isAdmin = in.readBoolean(); + // boolean isAdmin + in.readBoolean(); if (Catalog.getCurrentCatalogJournalVersion() >= 1) { - isSuperuser = in.readBoolean(); + // boolean isSuperuser + in.readBoolean(); } } - maxConn = in.readLong(); + if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_100) { + long maxConn = in.readLong(); + this.commonProperties.setMaxConn(maxConn); + } if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_43) { + Map dbPrivMap = Maps.newHashMap(); int numPriv = in.readInt(); for (int i = 0; i < numPriv; ++i) { String dbName = null; @@ -510,6 +493,7 @@ public void readFields(DataInput in) throws IOException { } } + // whiteList if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_21) { whiteList.readFields(in); if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_69) { @@ -526,8 +510,9 @@ public void readFields(DataInput in) throws IOException { } } + // common properties if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_100) { - maxQueryInstances = in.readLong(); + this.commonProperties = CommonUserProperties.read(in); } } }