Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions docs/en/administrator-guide/config/fe_config.md
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,30 @@ When this error is encountered, it means that the load jobs currently running in

Generally it is not recommended to increase this configuration value. An excessively high number of concurrency may cause excessive system load

### max_running_query_num

Default:Long.MAX_VALUE

IsMutable:true

This configuration is used to control the number of current query number of the system.

### max_running_txn_num

Default:Long.MAX_VALUE

IsMutable:true

This configuration is used to control the number of current load job number of the system.

### report_stats_period

Default:10 * 1000

IsMutable:true

This config should be used with `max_running_query_num` together. It represent the period of statistics sychronized between FE.

### enable_metric_calculator

Default:true
Expand Down
35 changes: 34 additions & 1 deletion docs/en/administrator-guide/multi-tenant.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ FE is mainly responsible for metadata management, cluster management, user reque

BE is mainly responsible for data storage and execution of query plans.

FE does not participate in the processing and calculation of user data, so it is a node with low resource consumption. The BE is responsible for all data calculations and task processing, and is a resource-consuming node. Therefore, the resource division and resource restriction schemes introduced in this article are all aimed at BE nodes. Because the FE node consumes relatively low resources and can also be scaled horizontally, there is usually no need to isolate and restrict resources, and the FE node can be shared by all users.
FE does not participate in the processing and calculation of user data, so it is a node with low resource consumption. The BE is responsible for all data calculations and task processing, and is a resource-consuming node. Therefore, the resource division and resource restriction schemes introduced in this article are all aimed at BE nodes. Because the FE node consumes relatively low resources and can also be scaled horizontally, there is usually no need to isolate and restrict resources, and the FE node can be shared by all users. But if you think the max query number you can request is a kind of resource, we also provide a way to limit it.

## Node resource division

Expand Down Expand Up @@ -220,3 +220,36 @@ Here we give an example of the steps to start using the resource division functi
After the data is redistributed. We can start to set the user's resource label permissions. Because by default, the user's `resource_tags.location` attribute is empty, that is, the BE of any tag can be accessed. Therefore, in the previous steps, the normal query of existing users will not be affected. When the `resource_tags.location` property is not empty, the user will be restricted from accessing the BE of the specified Tag.

Through the above 4 steps, we can smoothly use the resource division function after the original cluster is upgraded.

## Limit operation number of FE

We find that the Doris system stability is often affected by the number of operations. If there is too many operations in
certain time, some nodes will restart or the service will get stuck. So we think there should be a limit of operation to
protect system from outside pressure.

1. limit of query number
limit the number of acceptted query in certain time. If there is too many queries in one period, all query will be rejected in next period.

```
ADMIN SET FRONTEND CONFIG ("max_running_query_num" = "20");
ADMIN SET FRONTEND CONFIG ("report_stats_period" = "10000");
```

The config above means that if there is more than 20 queries in 10 seconds, all query in next 10 seconds will be rejected.
`report_stats_period` shouldn't be too small, because in order to achieve global operation limit, every FE will send
query number to Master in every period, and all FE will get query statistics through metadata synchronize mechanism. If
period is too small, there will be too many unnecessary RPC in system.

2. limit of load number
Include the system level limit and database level limit. The example of system level limit is:

```
ADMIN SET FRONTEND CONFIG ("max_running_txn_num" = "300");
```

The example of database level limit is:
```
ADMIN SET FRONTEND CONFIG ("max_running_txn_num_per_db" = "100");
```

If there is more load than threshold, the new comming load transaction will be rejected.
24 changes: 24 additions & 0 deletions docs/zh-CN/administrator-guide/config/fe_config.md
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,30 @@ current running txns on db xxx is xx, larger than limit xx

一般来说不推荐增大这个配置值。过高的并发数可能导致系统负载过大

### max_running_query_num

默认值:Long.MAX_VALUE

是否可以动态配置:true

这个配置主要是用来控制整个系统同时运行的查询数量的。

### max_running_txn_num

默认值:Long.MAX_VALUE

是否可以动态配置:true

这个配置是用来控制系统级别同时运行的导入任务数量的。

### report_stats_period

默认值:10 * 1000

是否可以动态配置:true

这个配置应该和`max_running_query_num`配置一起使用,代表FE之间同步统计信息的周期。

### enable_metric_calculator

默认值:true
Expand Down
33 changes: 32 additions & 1 deletion docs/zh-CN/administrator-guide/multi-tenant.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ FE 主要负责元数据管理、集群管理、用户请求的接入和查询

BE 主要负责数据存储、查询计划的执行等工作。

FE 不参与用户数据的处理计算等工作,因此是一个资源消耗较低的节点。而 BE 负责所有的数据计算、任务处理,属于资源消耗型的节点。因此,本文所介绍的资源划分及资源限制方案,都是针对 BE 节点的。FE 节点因为资源消耗相对较低,并且还可以横向扩展,因此通常无需做资源上的隔离和限制,FE 节点由所有用户共享即可。
FE 不参与用户数据的处理计算等工作,因此是一个资源消耗较低的节点。而 BE 负责所有的数据计算、任务处理,属于资源消耗型的节点。因此,本文所介绍的资源划分及资源限制方案,都是针对 BE 节点的。FE 节点因为资源消耗相对较低,并且还可以横向扩展,因此通常无需做资源上的隔离和限制,FE 节点由所有用户共享即可。不过如果我们将允许接入的查询数量等也看作一种资源,那么对于这类资源的限制我们也提供了相应的方式。

## 节点资源划分

Expand Down Expand Up @@ -220,3 +220,34 @@ Tag 划分和 CPU 限制是 0.15 版本中的新功能。为了保证可以从
等数据重分布完毕后。我们就可以开始设置用户的资源标签权限了。因为默认情况下,用户的 `resource_tags.location` 属性为空,即可以访问任意 Tag 的 BE。所以在前面步骤中,不会影响到已有用户的正常查询。当 `resource_tags.location` 属性非空时,用户将被限制访问指定 Tag 的 BE。

通过以上4步,我们可以较为平滑的在原有集群升级后,使用资源划分功能。

## FE 操作数量限制

在实际使用Doris系统的过程中,我们发现该系统的稳定性往往会受到外部操作的影响。比如一段时间内如果查询或导入数量过多,一般就会引发节点重启或者服务卡住等问题。所以我们希望限制单位时间的操作数量,从而在一定程度上保护系统避免受到过大的外部压力。

1. 查询数量限制

限制一定时间周期内允许执行的查询数量,如果在某个周期内执行的查询数量过多,那么下一个时间周期内所有查询均将被拒绝执行。

```
ADMIN SET FRONTEND CONFIG ("max_running_query_num" = "20");
ADMIN SET FRONTEND CONFIG ("report_stats_period" = "10000");
```

上述配置需要在每个FE上执行,表示如果10s内执行的查询数量超过20个,那么下一个10s内的查询将被拒绝执行。

`report_stats_period`的数值不宜配置的过小,因为为了实现全局的查询数量限制,每个FE节点都会依此周期向Master汇总查询数量信息,并通过元数据同步机制将查询数量信息同步给所有的FE,如果周期过短将带来过多的RPC开销,为系统带来不必要的压力。

2. 导入数量限制

包括系统级别的限制和数据库级别的限制。系统级别的限制为:
```
ADMIN SET FRONTEND CONFIG ("max_running_txn_num" = "300");
```

数据库级别的限制为:
```
ADMIN SET FRONTEND CONFIG ("max_running_txn_num_per_db" = "100");
```

表示某一时刻的导入任务数量上限,超过上限的任务将被拒绝。
63 changes: 62 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,9 @@
import org.apache.doris.task.DropReplicaTask;
import org.apache.doris.task.MasterTaskExecutor;
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.FrontendService;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TReportStatsRequest;
import org.apache.doris.thrift.TStorageFormat;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TStorageType;
Expand Down Expand Up @@ -292,7 +294,7 @@
import com.sleepycat.je.rep.InsufficientLogException;
import com.sleepycat.je.rep.NetworkRestore;
import com.sleepycat.je.rep.NetworkRestoreConfig;

import org.apache.thrift.TException;
import org.codehaus.jackson.map.ObjectMapper;

public class Catalog {
Expand Down Expand Up @@ -347,6 +349,7 @@ public class Catalog {
private MasterDaemon txnCleaner; // To clean aborted or timeout txns
private Daemon replayer;
private Daemon timePrinter;
private Daemon statsSyncer;
private Daemon listener;
private EsRepository esRepository; // it is a daemon, so add it here

Expand Down Expand Up @@ -409,6 +412,7 @@ public class Catalog {

private BrokerMgr brokerMgr;
private ResourceMgr resourceMgr;
private StatsMgr statsMgr;

private GlobalTransactionMgr globalTransactionMgr;

Expand Down Expand Up @@ -569,6 +573,7 @@ private Catalog(boolean isCheckpointCatalog) {

this.brokerMgr = new BrokerMgr();
this.resourceMgr = new ResourceMgr();
this.statsMgr = new StatsMgr();

this.globalTransactionMgr = new GlobalTransactionMgr(this);

Expand Down Expand Up @@ -654,6 +659,10 @@ public ResourceMgr getResourceMgr() {
return resourceMgr;
}

public StatsMgr getStatsMgr() {
return statsMgr;
}

public static GlobalTransactionMgr getCurrentGlobalTransactionMgr() {
return getCurrentCatalog().globalTransactionMgr;
}
Expand Down Expand Up @@ -1389,6 +1398,9 @@ private void startNonMasterDaemonThreads() {
esRepository.start();
// domain resolver
domainResolver.start();
// sync statistics
createStatsSyncer();
statsSyncer.start();
}

private void transferToNonMaster(FrontendNodeType newType) {
Expand Down Expand Up @@ -1955,6 +1967,14 @@ public long loadResources(DataInputStream in, long checksum) throws IOException
return checksum;
}

public long loadStats(DataInputStream in, long checksum) throws IOException {
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_106) {
statsMgr = StatsMgr.read(in);
}
LOG.info("finished replay stats from image");
return checksum;
}

public long loadSmallFiles(DataInputStream in, long checksum) throws IOException {
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_52) {
smallFileMgr.readFields(in);
Expand Down Expand Up @@ -2211,6 +2231,11 @@ public long saveResources(CountingDataOutputStream dos, long checksum) throws IO
return checksum;
}

public long saveStats(CountingDataOutputStream dos, long checksum) throws IOException {
Catalog.getCurrentCatalog().getStatsMgr().write(dos);
return checksum;
}

public long saveSmallFiles(CountingDataOutputStream dos, long checksum) throws IOException {
smallFileMgr.write(dos);
return checksum;
Expand Down Expand Up @@ -2495,6 +2520,42 @@ protected void runAfterCatalogReady() {
};
}

public void createStatsSyncer() {
// stats syncer will send statistics to Master through RPC every certain time.
statsSyncer = new MasterDaemon("statsSyncer", Config.report_stats_period) {
@Override
protected void runAfterCatalogReady() {
String masterHost = Catalog.getCurrentCatalog().getMasterIp();
int masterRpcPort = Catalog.getCurrentCatalog().getMasterRpcPort();
TNetworkAddress thriftAddress = new TNetworkAddress(masterHost, masterRpcPort);

FrontendService.Client client = null;
try {
client = ClientPool.frontendPool.borrowObject(thriftAddress, 300 * 1000);
} catch (Exception e) {
LOG.warn("Send statistics to Master borrow object fail!");
}
TReportStatsRequest request = new TReportStatsRequest();
String feHost = Catalog.getCurrentCatalog().getSelfNode().first;
long queryNum = Catalog.getCurrentCatalog().getStatsMgr().getAndResetQueryNum();
request.setFe(feHost);
request.setQueryNum(queryNum);

try {
client.reportStats(request);
} catch (TException e) {
LOG.warn("Send statistics to Master meet RPC fail, detail message: " + e.getMessage());
} finally {
ClientPool.frontendPool.returnObject(thriftAddress, client);
}
}
};
}

public void setStatsPeriod(long period) {
statsSyncer.setInterval(period);
}

public void addFrontend(FrontendNodeType role, String host, int editLogPort) throws DdlException {
if (!tryLock(false)) {
throw new DdlException("Failed to acquire catalog lock. Try again");
Expand Down
108 changes: 108 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/StatsMgr.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.catalog;

import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.doris.common.Config;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

public class StatsMgr implements Writable {
private static final Logger LOG = LogManager.getLogger(StatsMgr.class);
public StatsMgr() {
}

public static class Stats implements Writable{
@SerializedName("fe")
String fe;

@SerializedName("queryNum")
long queryNum;

public Stats(String fe, long queryNum) {
this.fe = fe;
this.queryNum = queryNum;
}

@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}

public static Stats read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, Stats.class);
}
}

@SerializedName("feToStats")
private final Map<String, Stats> feToStats = Maps.newConcurrentMap();
private final AtomicLong localQueryNum = new AtomicLong(0);
private long totalQueryNum = 0;

public boolean checkQueryAccess() {
totalQueryNum = 0;
for (Map.Entry<String, Stats> entry: feToStats.entrySet()) {
totalQueryNum += entry.getValue().queryNum;
}
return totalQueryNum < Config.max_running_query_num;
}

public long getTotalQueryNum() {
return totalQueryNum;
}

public long increaseQueryNum() {
return this.localQueryNum.getAndIncrement();
}

public long getAndResetQueryNum() {
return this.localQueryNum.getAndSet(0);
}

public void setStats(Stats stats) {
feToStats.put(stats.fe, stats);
Catalog.getCurrentCatalog().getEditLog().logSetStats(stats);
}

public void replaySetStats(Stats stats) {
feToStats.put(stats.fe, stats);
}

@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}

public static StatsMgr read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, StatsMgr.class);
}
}
Loading