diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index 08bcc814bc060d..45341dfff56e1f 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -1338,6 +1338,8 @@ void OlapScanNode::transfer_thread(RuntimeState* state) { } } + ThreadPoolToken* thread_token = state->get_query_fragments_ctx()->get_token(); + /********************************* * 优先级调度基本策略: * 1. 通过查询拆分的Range个数来确定初始nice值 @@ -1348,7 +1350,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) { * nice值越大的,越优先获得的查询资源 * 4. 定期提高队列内残留任务的优先级,避免大查询完全饿死 *********************************/ - PriorityThreadPool* thread_pool = state->exec_env()->thread_pool(); + PriorityThreadPool* thread_pool = state->exec_env()->scan_thread_pool(); _total_assign_num = 0; _nice = 18 + std::max(0, 2 - (int)_olap_scanners.size() / 5); std::list olap_scanners; @@ -1413,17 +1415,30 @@ void OlapScanNode::transfer_thread(RuntimeState* state) { } auto iter = olap_scanners.begin(); - while (iter != olap_scanners.end()) { - PriorityThreadPool::Task task; - task.work_function = std::bind(&OlapScanNode::scanner_thread, this, *iter); - task.priority = _nice; - (*iter)->start_wait_worker_timer(); - if (thread_pool->offer(task)) { - olap_scanners.erase(iter++); - } else { - LOG(FATAL) << "Failed to assign scanner task to thread pool!"; + if (thread_token != nullptr) { + while (iter != olap_scanners.end()) { + auto s = thread_token->submit_func(std::bind(&OlapScanNode::scanner_thread, this, *iter)); + if (s.ok()) { + (*iter)->start_wait_worker_timer(); + olap_scanners.erase(iter++); + } else { + LOG(FATAL) << "Failed to assign scanner task to thread pool! " << s.get_error_msg(); + } + ++_total_assign_num; + } + } else { + while (iter != olap_scanners.end()) { + PriorityThreadPool::Task task; + task.work_function = std::bind(&OlapScanNode::scanner_thread, this, *iter); + task.priority = _nice; + (*iter)->start_wait_worker_timer(); + if (thread_pool->offer(task)) { + olap_scanners.erase(iter++); + } else { + LOG(FATAL) << "Failed to assign scanner task to thread pool!"; + } + ++_total_assign_num; } - ++_total_assign_num; } RowBatchInterface* scan_batch = NULL; @@ -1466,7 +1481,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) { if (NULL != scan_batch) { add_one_batch(scan_batch); } - } + } // end of transfer while state->resource_pool()->release_thread_token(true); VLOG_CRITICAL << "TransferThread finish."; diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index efb073befba3c1..f92f3b676f93f3 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -20,6 +20,7 @@ #include "common/status.h" #include "olap/options.h" +#include "util/threadpool.h" namespace doris { namespace vectorized { @@ -113,7 +114,8 @@ class ExecEnv { std::shared_ptr process_mem_tracker() { return _mem_tracker; } PoolMemTrackerRegistry* pool_mem_trackers() { return _pool_mem_trackers; } ThreadResourceMgr* thread_mgr() { return _thread_mgr; } - PriorityThreadPool* thread_pool() { return _thread_pool; } + PriorityThreadPool* scan_thread_pool() { return _scan_thread_pool; } + ThreadPool* limited_scan_thread_pool() { return _limited_scan_thread_pool.get(); } PriorityThreadPool* etl_thread_pool() { return _etl_thread_pool; } CgroupsMgr* cgroups_mgr() { return _cgroups_mgr; } FragmentMgr* fragment_mgr() { return _fragment_mgr; } @@ -173,7 +175,20 @@ class ExecEnv { std::shared_ptr _mem_tracker; PoolMemTrackerRegistry* _pool_mem_trackers = nullptr; ThreadResourceMgr* _thread_mgr = nullptr; - PriorityThreadPool* _thread_pool = nullptr; + + // The following two thread pools are used in different scenarios. + // _scan_thread_pool is a priority thread pool. + // Scanner threads for common queries will use this thread pool, + // and the priority of each scan task is set according to the size of the query. + + // _limited_scan_thread_pool is also the thread pool used for scanner. + // The difference is that it is no longer a priority queue, but according to the concurrency + // set by the user to control the number of threads that can be used by a query. + + // TODO(cmy): find a better way to unify these 2 pools. + PriorityThreadPool* _scan_thread_pool = nullptr; + std::unique_ptr _limited_scan_thread_pool; + PriorityThreadPool* _etl_thread_pool = nullptr; CgroupsMgr* _cgroups_mgr = nullptr; FragmentMgr* _fragment_mgr = nullptr; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index f6e920a9200987..d4a9d2d259d8f8 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -91,8 +91,15 @@ Status ExecEnv::_init(const std::vector& store_paths) { new ExtDataSourceServiceClientCache(config::max_client_cache_size_per_host); _pool_mem_trackers = new PoolMemTrackerRegistry(); _thread_mgr = new ThreadResourceMgr(); - _thread_pool = new PriorityThreadPool(config::doris_scanner_thread_pool_thread_num, + _scan_thread_pool = new PriorityThreadPool(config::doris_scanner_thread_pool_thread_num, config::doris_scanner_thread_pool_queue_size); + + ThreadPoolBuilder("LimitedScanThreadPool") + .set_min_threads(1) + .set_max_threads(config::doris_scanner_thread_pool_thread_num) + .set_max_queue_size(config::doris_scanner_thread_pool_queue_size) + .build(&_limited_scan_thread_pool); + _etl_thread_pool = new PriorityThreadPool(config::etl_thread_pool_size, config::etl_thread_pool_queue_size); _cgroups_mgr = new CgroupsMgr(this, config::doris_cgroups); @@ -233,7 +240,7 @@ void ExecEnv::_init_buffer_pool(int64_t min_page_size, int64_t capacity, void ExecEnv::_register_metrics() { REGISTER_HOOK_METRIC(scanner_thread_pool_queue_size, [this]() { - return _thread_pool->get_queue_size(); + return _scan_thread_pool->get_queue_size(); }); REGISTER_HOOK_METRIC(etl_thread_pool_queue_size, [this]() { @@ -266,7 +273,7 @@ void ExecEnv::_destroy() { SAFE_DELETE(_fold_constant_mgr); SAFE_DELETE(_cgroups_mgr); SAFE_DELETE(_etl_thread_pool); - SAFE_DELETE(_thread_pool); + SAFE_DELETE(_scan_thread_pool); SAFE_DELETE(_thread_mgr); SAFE_DELETE(_pool_mem_trackers); SAFE_DELETE(_broker_client_cache); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 9e51009b3fc7ec..038e9f0199956b 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -562,7 +562,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi } else { // This may be a first fragment request of the query. // Create the query fragments context. - fragments_ctx.reset(new QueryFragmentsCtx(params.fragment_num_on_host)); + fragments_ctx.reset(new QueryFragmentsCtx(params.fragment_num_on_host, _exec_env)); fragments_ctx->query_id = params.params.query_id; RETURN_IF_ERROR(DescriptorTbl::create(&(fragments_ctx->obj_pool), params.desc_tbl, &(fragments_ctx->desc_tbl))); @@ -577,6 +577,9 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi if (params.__isset.query_options) { fragments_ctx->timeout_second = params.query_options.query_timeout; + if (params.query_options.__isset.resource_limit) { + fragments_ctx->set_thread_token(params.query_options.resource_limit.cpu_limit); + } } { diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index a50744bbde8059..e0ea04db59f77b 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -67,7 +67,7 @@ PlanFragmentExecutor::~PlanFragmentExecutor() { } Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request, - const QueryFragmentsCtx* fragments_ctx) { + QueryFragmentsCtx* fragments_ctx) { const TPlanFragmentExecParams& params = request.params; _query_id = params.query_id; @@ -79,6 +79,7 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request, const TQueryGlobals& query_globals = fragments_ctx == nullptr ? request.query_globals : fragments_ctx->query_globals; _runtime_state.reset(new RuntimeState(params, request.query_options, query_globals, _exec_env)); + _runtime_state->set_query_fragments_ctx(fragments_ctx); RETURN_IF_ERROR(_runtime_state->init_mem_trackers(_query_id)); _runtime_state->set_be_number(request.backend_num); diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h index 96e6170e3f6e38..d3fb2d58a56537 100644 --- a/be/src/runtime/plan_fragment_executor.h +++ b/be/src/runtime/plan_fragment_executor.h @@ -26,6 +26,7 @@ #include "common/object_pool.h" #include "common/status.h" #include "runtime/datetime_value.h" +#include "runtime/query_fragments_ctx.h" #include "runtime/query_statistics.h" #include "runtime/runtime_state.h" #include "util/hash_util.hpp" @@ -97,7 +98,7 @@ class PlanFragmentExecutor { // The query will be aborted (MEM_LIMIT_EXCEEDED) if it goes over that limit. // If fragments_ctx is not null, some components will be got from fragments_ctx. Status prepare(const TExecPlanFragmentParams& request, - const QueryFragmentsCtx* fragments_ctx = nullptr); + QueryFragmentsCtx* fragments_ctx = nullptr); // Start execution. Call this prior to get_next(). // If this fragment has a sink, open() will send all rows produced @@ -264,52 +265,6 @@ class PlanFragmentExecutor { void _collect_query_statistics(); }; -// Save the common components of fragments in a query. -// Some components like DescriptorTbl may be very large -// that will slow down each execution of fragments when DeSer them every time. -class QueryFragmentsCtx { -public: - QueryFragmentsCtx(int total_fragment_num) - : fragment_num(total_fragment_num), timeout_second(-1) { - _start_time = DateTimeValue::local_time(); - } - - bool countdown() { return fragment_num.fetch_sub(1) == 1; } - - bool is_timeout(const DateTimeValue& now) const { - if (timeout_second <= 0) { - return false; - } - if (now.second_diff(_start_time) > timeout_second) { - return true; - } - return false; - } - -public: - TUniqueId query_id; - DescriptorTbl* desc_tbl; - bool set_rsc_info = false; - std::string user; - std::string group; - TNetworkAddress coord_addr; - TQueryGlobals query_globals; - - /// In the current implementation, for multiple fragments executed by a query on the same BE node, - /// we store some common components in QueryFragmentsCtx, and save QueryFragmentsCtx in FragmentMgr. - /// When all Fragments are executed, QueryFragmentsCtx needs to be deleted from FragmentMgr. - /// Here we use a counter to store the number of Fragments that have not yet been completed, - /// and after each Fragment is completed, this value will be reduced by one. - /// When the last Fragment is completed, the counter is cleared, and the worker thread of the last Fragment - /// will clean up QueryFragmentsCtx. - std::atomic fragment_num; - int timeout_second; - ObjectPool obj_pool; - -private: - DateTimeValue _start_time; -}; - } // namespace doris #endif diff --git a/be/src/runtime/query_fragments_ctx.h b/be/src/runtime/query_fragments_ctx.h new file mode 100644 index 00000000000000..720cda414655b9 --- /dev/null +++ b/be/src/runtime/query_fragments_ctx.h @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "common/object_pool.h" +#include "gen_cpp/PaloInternalService_types.h" // for TQueryOptions +#include "gen_cpp/Types_types.h" // for TUniqueId +#include "runtime/datetime_value.h" +#include "runtime/exec_env.h" +#include "util/threadpool.h" + +namespace doris { + +// Save the common components of fragments in a query. +// Some components like DescriptorTbl may be very large +// that will slow down each execution of fragments when DeSer them every time. +class DescriptorTbl; +class QueryFragmentsCtx { +public: + QueryFragmentsCtx(int total_fragment_num, ExecEnv* exec_env) + : fragment_num(total_fragment_num), timeout_second(-1), _exec_env(exec_env) { + _start_time = DateTimeValue::local_time(); + } + + bool countdown() { return fragment_num.fetch_sub(1) == 1; } + + bool is_timeout(const DateTimeValue& now) const { + if (timeout_second <= 0) { + return false; + } + if (now.second_diff(_start_time) > timeout_second) { + return true; + } + return false; + } + + void set_thread_token(int cpu_limit) { + if (cpu_limit > 0) { + // For now, cpu_limit will be the max concurrency of the scan thread pool token. + _thread_token = _exec_env->limited_scan_thread_pool()->new_token( + ThreadPool::ExecutionMode::CONCURRENT, + cpu_limit); + } + } + + ThreadPoolToken* get_token() { + return _thread_token.get(); + } + +public: + TUniqueId query_id; + DescriptorTbl* desc_tbl; + bool set_rsc_info = false; + std::string user; + std::string group; + TNetworkAddress coord_addr; + TQueryGlobals query_globals; + + /// In the current implementation, for multiple fragments executed by a query on the same BE node, + /// we store some common components in QueryFragmentsCtx, and save QueryFragmentsCtx in FragmentMgr. + /// When all Fragments are executed, QueryFragmentsCtx needs to be deleted from FragmentMgr. + /// Here we use a counter to store the number of Fragments that have not yet been completed, + /// and after each Fragment is completed, this value will be reduced by one. + /// When the last Fragment is completed, the counter is cleared, and the worker thread of the last Fragment + /// will clean up QueryFragmentsCtx. + std::atomic fragment_num; + int timeout_second; + ObjectPool obj_pool; +private: + ExecEnv* _exec_env; + DateTimeValue _start_time; + + // A token used to submit olap scanner to the "_limited_scan_thread_pool", + // This thread pool token is created from "_limited_scan_thread_pool" from exec env. + // And will be shared by all instances of this query. + // So that we can control the max thread that a query can be used to execute. + // If this token is not set, the scanner will be executed in "_scan_thread_pool" in exec env. + std::unique_ptr _thread_token; +}; + +} // end of namespace + diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 659741e640af2e..c30baf72f0f319 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -33,6 +33,7 @@ #include "gen_cpp/PaloInternalService_types.h" // for TQueryOptions #include "gen_cpp/Types_types.h" // for TUniqueId #include "runtime/mem_pool.h" +#include "runtime/query_fragments_ctx.h" #include "runtime/thread_resource_mgr.h" #include "util/logging.h" #include "util/runtime_profile.h" @@ -371,6 +372,14 @@ class RuntimeState { int64_t get_load_mem_limit(); RuntimeFilterMgr* runtime_filter_mgr() { return _runtime_filter_mgr.get(); } + + void set_query_fragments_ctx(QueryFragmentsCtx* ctx) { + _query_ctx = ctx; + } + + QueryFragmentsCtx* get_query_fragments_ctx() { + return _query_ctx; + } private: // Use a custom block manager for the query for testing purposes. @@ -516,6 +525,7 @@ class RuntimeState { /// TODO: not needed if we call ReleaseResources() in a timely manner (IMPALA-1575). AtomicInt32 _initial_reservation_refcnt; + QueryFragmentsCtx* _query_ctx; // prohibit copies RuntimeState(const RuntimeState&); }; diff --git a/be/test/runtime/fragment_mgr_test.cpp b/be/test/runtime/fragment_mgr_test.cpp index 419a922ee3d7d7..ffe11b44eb2c82 100644 --- a/be/test/runtime/fragment_mgr_test.cpp +++ b/be/test/runtime/fragment_mgr_test.cpp @@ -38,7 +38,7 @@ PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env, PlanFragmentExecutor::~PlanFragmentExecutor() {} Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request, - const QueryFragmentsCtx* batch_ctx) { + QueryFragmentsCtx* batch_ctx) { return s_prepare_status; } diff --git a/be/test/runtime/test_env.cc b/be/test/runtime/test_env.cc index f7246440c1a427..e0ed0c7444ccc4 100644 --- a/be/test/runtime/test_env.cc +++ b/be/test/runtime/test_env.cc @@ -40,7 +40,7 @@ TestEnv::TestEnv() _exec_env->_mem_tracker = MemTracker::CreateTracker(-1, "TestEnv"); _exec_env->_disk_io_mgr = new DiskIoMgr(1, 1, 1, 10); _exec_env->disk_io_mgr()->init(_io_mgr_tracker); - _exec_env->_thread_pool = new PriorityThreadPool(1, 16); + _exec_env->_scan_thread_pool = new PriorityThreadPool(1, 16); _exec_env->_result_queue_mgr = new ResultQueueMgr(); // TODO may need rpc support, etc. } @@ -62,7 +62,7 @@ void TestEnv::init_buffer_pool(int64_t min_page_len, int64_t capacity, int64_t c TestEnv::~TestEnv() { SAFE_DELETE(_exec_env->_result_queue_mgr); SAFE_DELETE(_exec_env->_buffer_pool); - SAFE_DELETE(_exec_env->_thread_pool); + SAFE_DELETE(_exec_env->_scan_thread_pool); SAFE_DELETE(_exec_env->_disk_io_mgr); SAFE_DELETE(_exec_env->_buffer_reservation); SAFE_DELETE(_exec_env->_thread_mgr); diff --git a/docs/en/administrator-guide/variables.md b/docs/en/administrator-guide/variables.md index 38b6030795e09d..a9c46e485d0ddf 100644 --- a/docs/en/administrator-guide/variables.md +++ b/docs/en/administrator-guide/variables.md @@ -397,3 +397,13 @@ Note that the comment must start with /*+ and can only follow the SELECT. * `enable_fold_constant_by_be` Used to control the calculation method of constant folding. The default is `false`, that is, calculation is performed in `FE`; if it is set to `true`, it will be calculated by `BE` through `RPC` request. + +* `cpu_resource_limit` + + Used to limit the resource overhead of a query. This is an experimental feature. The current implementation is to limit the number of scan threads for a query on a single node. The number of scan threads is limited, and the data returned from the bottom layer slows down, thereby limiting the overall computational resource overhead of the query. Assuming it is set to 2, a query can use up to 2 scan threads on a single node. + + This parameter will override the effect of `parallel_fragment_exec_instance_num`. That is, assuming that `parallel_fragment_exec_instance_num` is set to 4, and this parameter is set to 2. Then 4 execution instances on a single node will share up to 2 scanning threads. + + This parameter will be overridden by the `cpu_resource_limit` configuration in the user property. + + The default is -1, which means no limit. diff --git a/docs/en/sql-reference/sql-statements/Account Management/SET PROPERTY.md b/docs/en/sql-reference/sql-statements/Account Management/SET PROPERTY.md index d70c9c415ebfa6..dcf0abf108a488 100644 --- a/docs/en/sql-reference/sql-statements/Account Management/SET PROPERTY.md +++ b/docs/en/sql-reference/sql-statements/Account Management/SET PROPERTY.md @@ -40,7 +40,9 @@ key: Super user rights: max_user_connections: Maximum number of connections. max_query_instances: Maximum number of query instance user can use when query. -resource.cpu_share: cpu resource assignment. +sql_block_rules: set sql block rules。After setting, if the query user execute match the rules, it will be rejected. +cpu_resource_limit: limit the cpu resource usage of a query. See session variable `cpu_resource_limit`. +resource.cpu_share: cpu resource assignment.(Derepcated) Load_cluster. {cluster_name}. priority: assigns priority to a specified cluster, which can be HIGH or NORMAL Ordinary user rights: @@ -81,6 +83,12 @@ SET PROPERTY FOR 'jack' 'load_cluster.{cluster_name}.priority' = 'HIGH'; 8. Modify the maximum number of query instance for jack to 3000 SET PROPERTY FOR 'jack' 'max_query_instances' = '3000'; +9. Modify the sql block rule for jack +SET PROPERTY FOR 'jack' 'sql_block_rules' = 'rule1, rule2'; + +10. Modify the cpu resource usage limit for jack +SET PROPERTY FOR 'jack' 'cpu_resource_limit' = '2'; + ## keyword SET, PROPERTY diff --git a/docs/zh-CN/administrator-guide/variables.md b/docs/zh-CN/administrator-guide/variables.md index a4d05de5ebd738..3cd87aab827e16 100644 --- a/docs/zh-CN/administrator-guide/variables.md +++ b/docs/zh-CN/administrator-guide/variables.md @@ -392,3 +392,13 @@ SELECT /*+ SET_VAR(query_timeout = 1, enable_partition_cache=true) */ sleep(3); * `enable_fold_constant_by_be` 用于控制常量折叠的计算方式。默认是 `false`,即在 `FE` 进行计算;若设置为 `true`,则通过 `RPC` 请求经 `BE` 计算。 + +* `cpu_resource_limit` + + 用于限制一个查询的资源开销。这是一个实验性质的功能。目前的实现是限制一个查询在单个节点上的scan线程数量。限制了scan线程数,从底层返回的数据速度变慢,从而限制了查询整体的计算资源开销。假设设置为 2,则一个查询在单节点上最多使用2个scan线程。 + + 该参数会覆盖 `parallel_fragment_exec_instance_num` 的效果。即假设 `parallel_fragment_exec_instance_num` 设置为4,而该参数设置为2。则单个节点上的4个执行实例会共享最多2个扫描线程。 + + 该参数会被 user property 中的 `cpu_resource_limit` 配置覆盖。 + + 默认 -1,即不限制。 diff --git a/docs/zh-CN/sql-reference/sql-statements/Account Management/SET PROPERTY.md b/docs/zh-CN/sql-reference/sql-statements/Account Management/SET PROPERTY.md index 1f29a6cb6905ee..95d3954fae8c7f 100644 --- a/docs/zh-CN/sql-reference/sql-statements/Account Management/SET PROPERTY.md +++ b/docs/zh-CN/sql-reference/sql-statements/Account Management/SET PROPERTY.md @@ -40,7 +40,9 @@ under the License. 超级用户权限: max_user_connections: 最大连接数。 max_query_instances: 用户同一时间点执行查询可以使用的instance个数。 - resource.cpu_share: cpu资源分配。 + sql_block_rules: 设置 sql block rules。设置后,该用户发送的查询如果匹配规则,则会被拒绝。 + cpu_resource_limit: 限制查询的cpu资源。详见会话变量 `cpu_resource_limit` 的介绍。 + resource.cpu_share: cpu资源分配。(已废弃) load_cluster.{cluster_name}.priority: 为指定的cluster分配优先级,可以为 HIGH 或 NORMAL 普通用户权限: @@ -81,6 +83,12 @@ under the License. 8. 修改用户jack的查询可用instance个数为3000 SET PROPERTY FOR 'jack' 'max_query_instances' = '3000'; + 9. 修改用户jack的sql block rule + SET PROPERTY FOR 'jack' 'sql_block_rules' = 'rule1, rule2'; + + 10. 修改用户jack的 cpu 使用限制 + SET PROPERTY FOR 'jack' 'cpu_resource_limit' = '2'; + ## keyword SET, PROPERTY diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java index 4bee3f571a7a7c..830c449f41f636 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java @@ -43,7 +43,7 @@ public class SyncChecker extends MasterDaemon { private static Map executors = Maps.newHashMap(); private SyncChecker(JobState jobState, long intervalMs) { - super("sync checker " + jobState.name().toLowerCase(), intervalMs); + super("sync checker " + jobState.name().toLowerCase(), intervalMs * 1000); this.jobState = jobState; } @@ -92,4 +92,4 @@ private void runPendingJobs() { } } } -} \ No newline at end of file +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/CommonUserProperties.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/CommonUserProperties.java index 8ab5ebee68e06b..9206e6ab2644f6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/CommonUserProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/CommonUserProperties.java @@ -37,6 +37,9 @@ public class CommonUserProperties implements Writable { private long maxQueryInstances = -1; @SerializedName("sqlBlockRules") private String sqlBlockRules = ""; + @SerializedName("cpuResourceLimit") + private int cpuResourceLimit = -1; + private String[] sqlBlockRulesSplit = {}; long getMaxConn() { @@ -73,6 +76,14 @@ void setSqlBlockRulesSplit(String sqlBlockRules) { this.sqlBlockRulesSplit = sqlBlockRules.replace(" ", "").split(","); } + public int getCpuResourceLimit() { + return cpuResourceLimit; + } + + public void setCpuResourceLimit(int cpuResourceLimit) { + this.cpuResourceLimit = cpuResourceLimit; + } + public static CommonUserProperties read(DataInput in) throws IOException { String json = Text.readString(in); CommonUserProperties commonUserProperties = GsonUtils.GSON.fromJson(json, CommonUserProperties.class); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/PaloAuth.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/PaloAuth.java index 836ce39128ec1b..80cd5b6c267635 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/PaloAuth.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/PaloAuth.java @@ -24,8 +24,8 @@ import org.apache.doris.analysis.GrantStmt; import org.apache.doris.analysis.ResourcePattern; import org.apache.doris.analysis.RevokeStmt; -import org.apache.doris.analysis.SetPassVar; import org.apache.doris.analysis.SetLdapPassVar; +import org.apache.doris.analysis.SetPassVar; import org.apache.doris.analysis.SetUserPropertyStmt; import org.apache.doris.analysis.TablePattern; import org.apache.doris.analysis.UserIdentity; @@ -50,14 +50,14 @@ import org.apache.doris.thrift.TFetchResourceResult; import org.apache.doris.thrift.TPrivilegeStatus; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -1143,6 +1143,15 @@ public String[] getSqlBlockRules(String qualifiedUser) { } } + public int getCpuResourceLimit(String qualifiedUser) { + readLock(); + try { + return propertyMgr.getCpuResourceLimit(qualifiedUser); + } finally { + readUnlock(); + } + } + public void getAllDomains(Set allDomains) { readLock(); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java index f3a7867ad8eb4c..689b872e98ba4d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java @@ -32,14 +32,14 @@ import org.apache.doris.load.DppConfig; import org.apache.doris.system.SystemInfoService; +import org.apache.commons.lang.StringUtils; + import com.google.common.base.Joiner; import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import org.apache.commons.lang.StringUtils; - import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -57,16 +57,14 @@ */ public class UserProperty implements Writable { - // common properties private static final String PROP_MAX_USER_CONNECTIONS = "max_user_connections"; - private static final String PROP_MAX_QUERY_INSTANCES = "max_query_instances"; - // common properties end - private static final String PROP_RESOURCE = "resource"; + private static final String PROP_LOAD_CLUSTER = "load_cluster"; private static final String PROP_QUOTA = "quota"; private static final String PROP_DEFAULT_LOAD_CLUSTER = "default_load_cluster"; - private static final String PROP_LOAD_CLUSTER = "load_cluster"; private static final String PROP_SQL_BLOCK_RULES = "sql_block_rules"; + private static final String PROP_MAX_QUERY_INSTANCES = "max_query_instances"; + private static final String PROP_CPU_RESOURCE_LIMIT = "cpu_resource_limit"; // for system user public static final Set ADVANCED_PROPERTIES = Sets.newHashSet(); @@ -97,6 +95,7 @@ public class UserProperty implements Writable { + DppConfig.PRIORITY + "$", Pattern.CASE_INSENSITIVE)); ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_MAX_QUERY_INSTANCES + "$", Pattern.CASE_INSENSITIVE)); ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_SQL_BLOCK_RULES + "$", Pattern.CASE_INSENSITIVE)); + ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_CPU_RESOURCE_LIMIT + "$", Pattern.CASE_INSENSITIVE)); COMMON_PROPERTIES.add(Pattern.compile("^" + PROP_QUOTA + ".", Pattern.CASE_INSENSITIVE)); COMMON_PROPERTIES.add(Pattern.compile("^" + PROP_DEFAULT_LOAD_CLUSTER + "$", Pattern.CASE_INSENSITIVE)); @@ -127,6 +126,10 @@ public String[] getSqlBlockRules() { return commonProperties.getSqlBlockRulesSplit(); } + public int getCpuResourceLimit() { + return commonProperties.getCpuResourceLimit(); + } + public WhiteList getWhiteList() { return whiteList; } @@ -150,6 +153,8 @@ public void update(List> properties) throws DdlException { long newMaxConn = this.commonProperties.getMaxConn(); long newMaxQueryInstances = this.commonProperties.getMaxQueryInstances(); String sqlBlockRules = this.commonProperties.getSqlBlockRules(); + int cpuResourceLimit = this.commonProperties.getCpuResourceLimit(); + UserResource newResource = resource.getCopiedUserResource(); String newDefaultLoadCluster = defaultLoadCluster; Map newDppConfigs = Maps.newHashMap(clusterToDppConfig); @@ -240,6 +245,23 @@ public void update(List> properties) throws DdlException { throw new DdlException(PROP_SQL_BLOCK_RULES + " format error"); } sqlBlockRules = value; + } else if (keyArr[0].equalsIgnoreCase(PROP_CPU_RESOURCE_LIMIT)) { + // set property "cpu_resource_limit" = "2"; + if (keyArr.length != 1) { + throw new DdlException(PROP_CPU_RESOURCE_LIMIT + " format error"); + } + int limit = -1; + try { + limit = Integer.parseInt(value); + } catch (NumberFormatException e) { + throw new DdlException(key + " is not number"); + } + + if (limit <= 0) { + throw new DdlException(key + " is not valid"); + } + + cpuResourceLimit = limit; } else { throw new DdlException("Unknown user property(" + key + ")"); } @@ -249,6 +271,8 @@ public void update(List> properties) throws DdlException { this.commonProperties.setMaxConn(newMaxConn); this.commonProperties.setMaxQueryInstances(newMaxQueryInstances); this.commonProperties.setSqlBlockRules(sqlBlockRules); + this.commonProperties.setCpuResourceLimit(cpuResourceLimit); + resource = newResource; if (newDppConfigs.containsKey(newDefaultLoadCluster)) { defaultLoadCluster = newDefaultLoadCluster; @@ -339,6 +363,9 @@ public List> fetchProperty() { // sql block rules result.add(Lists.newArrayList(PROP_SQL_BLOCK_RULES, commonProperties.getSqlBlockRules())); + // cpu resource limit + result.add(Lists.newArrayList(PROP_CPU_RESOURCE_LIMIT, String.valueOf(commonProperties.getCpuResourceLimit()))); + // resource ResourceGroup group = resource.getResource(); for (Map.Entry entry : group.getQuotaMap().entrySet()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyMgr.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyMgr.java index c681d3808e6ea8..1f53dd784bcbd0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyMgr.java @@ -27,12 +27,12 @@ import org.apache.doris.thrift.TAgentServiceVersion; import org.apache.doris.thrift.TFetchResourceResult; -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -228,6 +228,14 @@ public String[] getSqlBlockRules(String qualifiedUser) { return existProperty.getSqlBlockRules(); } + public int getCpuResourceLimit(String qualifiedUser) { + UserProperty existProperty = propertyMap.get(qualifiedUser); + if (existProperty == null) { + return -1; + } + return existProperty.getCpuResourceLimit(); + } + public UserProperty getUserProperty(String qualifiedUserName) { return propertyMap.get(qualifiedUserName); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index a3095bd615013b..4f38687c36f7aa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -74,6 +74,7 @@ import org.apache.doris.thrift.TQueryType; import org.apache.doris.thrift.TReportExecStatusParams; import org.apache.doris.thrift.TResourceInfo; +import org.apache.doris.thrift.TResourceLimit; import org.apache.doris.thrift.TRuntimeFilterParams; import org.apache.doris.thrift.TRuntimeFilterTargetParams; import org.apache.doris.thrift.TScanRangeLocation; @@ -83,6 +84,11 @@ import org.apache.doris.thrift.TTabletCommitInfo; import org.apache.doris.thrift.TUniqueId; +import org.apache.commons.collections.map.HashedMap; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.thrift.TException; + import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.HashMultiset; @@ -92,11 +98,6 @@ import com.google.common.collect.Multiset; import com.google.common.collect.Sets; -import org.apache.commons.collections.map.HashedMap; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.thrift.TException; - import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -224,6 +225,9 @@ public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) { this.descTable = analyzer.getDescTbl().toThrift(); this.returnedAllResults = false; this.queryOptions = context.getSessionVariable().toThrift(); + + setFromUserProperty(analyzer); + this.queryGlobals.setNowString(DATE_FORMAT.format(new Date())); this.queryGlobals.setTimestampMs(new Date().getTime()); if (context.getSessionVariable().getTimeZone().equals("CST")) { @@ -260,6 +264,18 @@ public Coordinator(Long jobId, TUniqueId queryId, DescriptorTable descTable, nextInstanceId.setLo(queryId.lo + 1); } + private void setFromUserProperty(Analyzer analyzer) { + // set cpu resource limit + String qualifiedUser = analyzer.getQualifiedUser(); + int limit = Catalog.getCurrentCatalog().getAuth().getCpuResourceLimit(qualifiedUser); + if (limit > 0) { + // overwrite the cpu resource limit from session variable; + TResourceLimit resourceLimit = new TResourceLimit(); + resourceLimit.setCpuLimit(limit); + this.queryOptions.setResourceLimit(resourceLimit); + } + } + public long getJobId() { return jobId; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 78698fb12d8091..0e15a319690c9c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -24,6 +24,7 @@ import org.apache.doris.common.util.TimeUtils; import org.apache.doris.qe.VariableMgr.VarAttr; import org.apache.doris.thrift.TQueryOptions; +import org.apache.doris.thrift.TResourceLimit; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -149,6 +150,8 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_VECTORIZED_ENGINE = "enable_vectorized_engine"; + public static final String CPU_RESOURCE_LIMIT = "cpu_resource_limit"; + // session origin value public Map sessionOriginValue = new HashMap(); // check stmt is or not [select /*+ SET_VAR(...)*/ ...] @@ -361,6 +364,9 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_VECTORIZED_ENGINE) public boolean enableVectorizedEngine = false; + @VariableMgr.VarAttr(name = CPU_RESOURCE_LIMIT) + public int cpuResourceLimit = -1; + public long getMaxExecMemByte() { return maxExecMemByte; } @@ -757,6 +763,10 @@ public boolean isExtractWideRangeExpr() { return extractWideRangeExpr; } + public int getCpuResourceLimit() { + return cpuResourceLimit; + } + // Serialize to thrift object // used for rest api public TQueryOptions toThrift() { @@ -790,6 +800,13 @@ public TQueryOptions toThrift() { tResult.setRuntimeFilterWaitTimeMs(runtimeFilterWaitTimeMs); tResult.setRuntimeFilterMaxInNum(runtimeFilterMaxInNum); + + if (cpuResourceLimit > 0) { + TResourceLimit resourceLimit = new TResourceLimit(); + resourceLimit.setCpuLimit(cpuResourceLimit); + tResult.setResourceLimit(resourceLimit); + } + return tResult; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/UserPropertyTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/UserPropertyTest.java index a98049da3af8c1..9fbfd5bd4043e2 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/UserPropertyTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/UserPropertyTest.java @@ -23,11 +23,11 @@ import org.apache.doris.load.DppConfig; import org.apache.doris.mysql.privilege.UserProperty; -import com.google.common.collect.Lists; - import org.junit.Assert; import org.junit.Test; +import com.google.common.collect.Lists; + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; @@ -65,6 +65,8 @@ public void testUpdate() throws DdlException { properties.add(Pair.create("load_cluster.dpp-cluster.hadoop_palo_path", "/user/palo2")); properties.add(Pair.create("default_load_cluster", "dpp-cluster")); properties.add(Pair.create("max_qUERY_instances", "3000")); + properties.add(Pair.create("sql_block_rules", "rule1,rule2")); + properties.add(Pair.create("cpu_resource_limit", "2")); UserProperty userProperty = new UserProperty(); userProperty.update(properties); @@ -74,6 +76,8 @@ public void testUpdate() throws DdlException { Assert.assertEquals("/user/palo2", userProperty.getLoadClusterInfo("dpp-cluster").second.getPaloPath()); Assert.assertEquals("dpp-cluster", userProperty.getDefaultLoadCluster()); Assert.assertEquals(3000, userProperty.getMaxQueryInstances()); + Assert.assertEquals(new String[]{"rule1", "rule2"}, userProperty.getSqlBlockRules()); + Assert.assertEquals(2, userProperty.getCpuResourceLimit()); // fetch property List> rows = userProperty.fetchProperty(); @@ -93,6 +97,10 @@ public void testUpdate() throws DdlException { Assert.assertEquals("dpp-cluster", value); } else if (key.equalsIgnoreCase("max_query_instances")) { Assert.assertEquals("3000", value); + } else if (key.equalsIgnoreCase("sql_block_rules")) { + Assert.assertEquals("rule1,rule2", value); + } else if (key.equalsIgnoreCase("cpu_resource_limit")) { + Assert.assertEquals("2", value); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java index 97143fac2e9013..3f7e923a88951e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java @@ -17,8 +17,6 @@ package org.apache.doris.qe; -import mockit.Mocked; - import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.BinaryPredicate; import org.apache.doris.analysis.Expr; @@ -39,7 +37,6 @@ import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.Planner; import org.apache.doris.planner.ScanNode; - import org.apache.doris.service.FrontendOptions; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TNetworkAddress; @@ -49,27 +46,29 @@ import org.apache.doris.thrift.TScanRangeParams; import org.apache.doris.thrift.TUniqueId; +import org.apache.commons.collections.map.HashedMap; +import org.junit.Assert; +import org.junit.Test; + import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; -import org.junit.Assert; -import org.junit.Test; - import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import com.google.common.collect.Maps; -import org.apache.commons.collections.map.HashedMap; + +import mockit.Mocked; public class CoordinatorTest extends Coordinator { static Planner planner = new Planner(); static ConnectContext context = new ConnectContext(null); static { context.setQueryId(new TUniqueId(1, 2)); + context.setQualifiedUser("root"); } @Mocked static Catalog catalog; @@ -77,9 +76,8 @@ public class CoordinatorTest extends Coordinator { static EditLog editLog; @Mocked static FrontendOptions frontendOptions; - static Analyzer analyzer = new Analyzer(catalog, null); - - + static Analyzer analyzer = new Analyzer(catalog, context); + public CoordinatorTest() { super(context, analyzer, planner); } @@ -839,3 +837,4 @@ public void testComputeScanRangeAssignment() { } } + diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index c89ca86770f50b..e3384b4c1c48a1 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -77,6 +77,10 @@ struct TLoadErrorHubInfo { 3: optional TBrokerErrorHubInfo broker_info; } +struct TResourceLimit { + 1: optional i32 cpu_limit +} + // Query options that correspond to PaloService.PaloQueryOptions, // with their respective defaults struct TQueryOptions { @@ -149,6 +153,9 @@ struct TQueryOptions { // whether enable vectorized engine 41: optional bool enable_vectorized_engine = false + + // the resource limitation of this query + 42: optional TResourceLimit resource_limit }