diff --git a/docs/en/administrator-guide/config/fe_config.md b/docs/en/administrator-guide/config/fe_config.md index 33eaf223847f07..4f4e3377e1b38a 100644 --- a/docs/en/administrator-guide/config/fe_config.md +++ b/docs/en/administrator-guide/config/fe_config.md @@ -626,6 +626,30 @@ When this error is encountered, it means that the load jobs currently running in Generally it is not recommended to increase this configuration value. An excessively high number of concurrency may cause excessive system load +### max_running_query_num + +Default:Long.MAX_VALUE + +IsMutable:true + +This configuration is used to control the number of current query number of the system. + +### max_running_txn_num + +Default:Long.MAX_VALUE + +IsMutable:true + +This configuration is used to control the number of current load job number of the system. + +### report_stats_period + +Default:10 * 1000 + +IsMutable:true + +This config should be used with `max_running_query_num` together. It represent the period of statistics sychronized between FE. + ### enable_metric_calculator Default:true diff --git a/docs/en/administrator-guide/multi-tenant.md b/docs/en/administrator-guide/multi-tenant.md index 8c37afad23a4d0..7bb69378550ebf 100644 --- a/docs/en/administrator-guide/multi-tenant.md +++ b/docs/en/administrator-guide/multi-tenant.md @@ -38,7 +38,7 @@ FE is mainly responsible for metadata management, cluster management, user reque BE is mainly responsible for data storage and execution of query plans. -FE does not participate in the processing and calculation of user data, so it is a node with low resource consumption. The BE is responsible for all data calculations and task processing, and is a resource-consuming node. Therefore, the resource division and resource restriction schemes introduced in this article are all aimed at BE nodes. Because the FE node consumes relatively low resources and can also be scaled horizontally, there is usually no need to isolate and restrict resources, and the FE node can be shared by all users. +FE does not participate in the processing and calculation of user data, so it is a node with low resource consumption. The BE is responsible for all data calculations and task processing, and is a resource-consuming node. Therefore, the resource division and resource restriction schemes introduced in this article are all aimed at BE nodes. Because the FE node consumes relatively low resources and can also be scaled horizontally, there is usually no need to isolate and restrict resources, and the FE node can be shared by all users. But if you think the max query number you can request is a kind of resource, we also provide a way to limit it. ## Node resource division @@ -220,3 +220,36 @@ Here we give an example of the steps to start using the resource division functi After the data is redistributed. We can start to set the user's resource label permissions. Because by default, the user's `resource_tags.location` attribute is empty, that is, the BE of any tag can be accessed. Therefore, in the previous steps, the normal query of existing users will not be affected. When the `resource_tags.location` property is not empty, the user will be restricted from accessing the BE of the specified Tag. Through the above 4 steps, we can smoothly use the resource division function after the original cluster is upgraded. + +## Limit operation number of FE + +We find that the Doris system stability is often affected by the number of operations. If there is too many operations in +certain time, some nodes will restart or the service will get stuck. So we think there should be a limit of operation to +protect system from outside pressure. + +1. limit of query number + limit the number of acceptted query in certain time. If there is too many queries in one period, all query will be rejected in next period. + + ``` + ADMIN SET FRONTEND CONFIG ("max_running_query_num" = "20"); + ADMIN SET FRONTEND CONFIG ("report_stats_period" = "10000"); + ``` + + The config above means that if there is more than 20 queries in 10 seconds, all query in next 10 seconds will be rejected. + `report_stats_period` shouldn't be too small, because in order to achieve global operation limit, every FE will send + query number to Master in every period, and all FE will get query statistics through metadata synchronize mechanism. If + period is too small, there will be too many unnecessary RPC in system. + +2. limit of load number + Include the system level limit and database level limit. The example of system level limit is: + + ``` + ADMIN SET FRONTEND CONFIG ("max_running_txn_num" = "300"); + ``` + + The example of database level limit is: + ``` + ADMIN SET FRONTEND CONFIG ("max_running_txn_num_per_db" = "100"); + ``` + + If there is more load than threshold, the new comming load transaction will be rejected. \ No newline at end of file diff --git a/docs/zh-CN/administrator-guide/config/fe_config.md b/docs/zh-CN/administrator-guide/config/fe_config.md index 6e73f402530da7..5b537c0b5cfbd8 100644 --- a/docs/zh-CN/administrator-guide/config/fe_config.md +++ b/docs/zh-CN/administrator-guide/config/fe_config.md @@ -614,6 +614,30 @@ current running txns on db xxx is xx, larger than limit xx 一般来说不推荐增大这个配置值。过高的并发数可能导致系统负载过大 +### max_running_query_num + +默认值:Long.MAX_VALUE + +是否可以动态配置:true + +这个配置主要是用来控制整个系统同时运行的查询数量的。 + +### max_running_txn_num + +默认值:Long.MAX_VALUE + +是否可以动态配置:true + +这个配置是用来控制系统级别同时运行的导入任务数量的。 + +### report_stats_period + +默认值:10 * 1000 + +是否可以动态配置:true + +这个配置应该和`max_running_query_num`配置一起使用,代表FE之间同步统计信息的周期。 + ### enable_metric_calculator 默认值:true diff --git a/docs/zh-CN/administrator-guide/multi-tenant.md b/docs/zh-CN/administrator-guide/multi-tenant.md index fb1cac4d62f673..5b286ec7f525b4 100644 --- a/docs/zh-CN/administrator-guide/multi-tenant.md +++ b/docs/zh-CN/administrator-guide/multi-tenant.md @@ -38,7 +38,7 @@ FE 主要负责元数据管理、集群管理、用户请求的接入和查询 BE 主要负责数据存储、查询计划的执行等工作。 -FE 不参与用户数据的处理计算等工作,因此是一个资源消耗较低的节点。而 BE 负责所有的数据计算、任务处理,属于资源消耗型的节点。因此,本文所介绍的资源划分及资源限制方案,都是针对 BE 节点的。FE 节点因为资源消耗相对较低,并且还可以横向扩展,因此通常无需做资源上的隔离和限制,FE 节点由所有用户共享即可。 +FE 不参与用户数据的处理计算等工作,因此是一个资源消耗较低的节点。而 BE 负责所有的数据计算、任务处理,属于资源消耗型的节点。因此,本文所介绍的资源划分及资源限制方案,都是针对 BE 节点的。FE 节点因为资源消耗相对较低,并且还可以横向扩展,因此通常无需做资源上的隔离和限制,FE 节点由所有用户共享即可。不过如果我们将允许接入的查询数量等也看作一种资源,那么对于这类资源的限制我们也提供了相应的方式。 ## 节点资源划分 @@ -220,3 +220,34 @@ Tag 划分和 CPU 限制是 0.15 版本中的新功能。为了保证可以从 等数据重分布完毕后。我们就可以开始设置用户的资源标签权限了。因为默认情况下,用户的 `resource_tags.location` 属性为空,即可以访问任意 Tag 的 BE。所以在前面步骤中,不会影响到已有用户的正常查询。当 `resource_tags.location` 属性非空时,用户将被限制访问指定 Tag 的 BE。 通过以上4步,我们可以较为平滑的在原有集群升级后,使用资源划分功能。 + +## FE 操作数量限制 + +在实际使用Doris系统的过程中,我们发现该系统的稳定性往往会受到外部操作的影响。比如一段时间内如果查询或导入数量过多,一般就会引发节点重启或者服务卡住等问题。所以我们希望限制单位时间的操作数量,从而在一定程度上保护系统避免受到过大的外部压力。 + +1. 查询数量限制 + + 限制一定时间周期内允许执行的查询数量,如果在某个周期内执行的查询数量过多,那么下一个时间周期内所有查询均将被拒绝执行。 + + ``` + ADMIN SET FRONTEND CONFIG ("max_running_query_num" = "20"); + ADMIN SET FRONTEND CONFIG ("report_stats_period" = "10000"); + ``` + + 上述配置需要在每个FE上执行,表示如果10s内执行的查询数量超过20个,那么下一个10s内的查询将被拒绝执行。 + + `report_stats_period`的数值不宜配置的过小,因为为了实现全局的查询数量限制,每个FE节点都会依此周期向Master汇总查询数量信息,并通过元数据同步机制将查询数量信息同步给所有的FE,如果周期过短将带来过多的RPC开销,为系统带来不必要的压力。 + +2. 导入数量限制 + + 包括系统级别的限制和数据库级别的限制。系统级别的限制为: + ``` + ADMIN SET FRONTEND CONFIG ("max_running_txn_num" = "300"); + ``` + + 数据库级别的限制为: + ``` + ADMIN SET FRONTEND CONFIG ("max_running_txn_num_per_db" = "100"); + ``` + + 表示某一时刻的导入任务数量上限,超过上限的任务将被拒绝。 diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index 29e5dd734a4759..b3db17ee13aa57 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -236,7 +236,9 @@ import org.apache.doris.task.DropReplicaTask; import org.apache.doris.task.MasterTaskExecutor; import org.apache.doris.thrift.BackendService; +import org.apache.doris.thrift.FrontendService; import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TReportStatsRequest; import org.apache.doris.thrift.TStorageFormat; import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TStorageType; @@ -292,7 +294,7 @@ import com.sleepycat.je.rep.InsufficientLogException; import com.sleepycat.je.rep.NetworkRestore; import com.sleepycat.je.rep.NetworkRestoreConfig; - +import org.apache.thrift.TException; import org.codehaus.jackson.map.ObjectMapper; public class Catalog { @@ -347,6 +349,7 @@ public class Catalog { private MasterDaemon txnCleaner; // To clean aborted or timeout txns private Daemon replayer; private Daemon timePrinter; + private Daemon statsSyncer; private Daemon listener; private EsRepository esRepository; // it is a daemon, so add it here @@ -409,6 +412,7 @@ public class Catalog { private BrokerMgr brokerMgr; private ResourceMgr resourceMgr; + private StatsMgr statsMgr; private GlobalTransactionMgr globalTransactionMgr; @@ -569,6 +573,7 @@ private Catalog(boolean isCheckpointCatalog) { this.brokerMgr = new BrokerMgr(); this.resourceMgr = new ResourceMgr(); + this.statsMgr = new StatsMgr(); this.globalTransactionMgr = new GlobalTransactionMgr(this); @@ -654,6 +659,10 @@ public ResourceMgr getResourceMgr() { return resourceMgr; } + public StatsMgr getStatsMgr() { + return statsMgr; + } + public static GlobalTransactionMgr getCurrentGlobalTransactionMgr() { return getCurrentCatalog().globalTransactionMgr; } @@ -1389,6 +1398,9 @@ private void startNonMasterDaemonThreads() { esRepository.start(); // domain resolver domainResolver.start(); + // sync statistics + createStatsSyncer(); + statsSyncer.start(); } private void transferToNonMaster(FrontendNodeType newType) { @@ -1955,6 +1967,14 @@ public long loadResources(DataInputStream in, long checksum) throws IOException return checksum; } + public long loadStats(DataInputStream in, long checksum) throws IOException { + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_106) { + statsMgr = StatsMgr.read(in); + } + LOG.info("finished replay stats from image"); + return checksum; + } + public long loadSmallFiles(DataInputStream in, long checksum) throws IOException { if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_52) { smallFileMgr.readFields(in); @@ -2211,6 +2231,11 @@ public long saveResources(CountingDataOutputStream dos, long checksum) throws IO return checksum; } + public long saveStats(CountingDataOutputStream dos, long checksum) throws IOException { + Catalog.getCurrentCatalog().getStatsMgr().write(dos); + return checksum; + } + public long saveSmallFiles(CountingDataOutputStream dos, long checksum) throws IOException { smallFileMgr.write(dos); return checksum; @@ -2495,6 +2520,42 @@ protected void runAfterCatalogReady() { }; } + public void createStatsSyncer() { + // stats syncer will send statistics to Master through RPC every certain time. + statsSyncer = new MasterDaemon("statsSyncer", Config.report_stats_period) { + @Override + protected void runAfterCatalogReady() { + String masterHost = Catalog.getCurrentCatalog().getMasterIp(); + int masterRpcPort = Catalog.getCurrentCatalog().getMasterRpcPort(); + TNetworkAddress thriftAddress = new TNetworkAddress(masterHost, masterRpcPort); + + FrontendService.Client client = null; + try { + client = ClientPool.frontendPool.borrowObject(thriftAddress, 300 * 1000); + } catch (Exception e) { + LOG.warn("Send statistics to Master borrow object fail!"); + } + TReportStatsRequest request = new TReportStatsRequest(); + String feHost = Catalog.getCurrentCatalog().getSelfNode().first; + long queryNum = Catalog.getCurrentCatalog().getStatsMgr().getAndResetQueryNum(); + request.setFe(feHost); + request.setQueryNum(queryNum); + + try { + client.reportStats(request); + } catch (TException e) { + LOG.warn("Send statistics to Master meet RPC fail, detail message: " + e.getMessage()); + } finally { + ClientPool.frontendPool.returnObject(thriftAddress, client); + } + } + }; + } + + public void setStatsPeriod(long period) { + statsSyncer.setInterval(period); + } + public void addFrontend(FrontendNodeType role, String host, int editLogPort) throws DdlException { if (!tryLock(false)) { throw new DdlException("Failed to acquire catalog lock. Try again"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/StatsMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/StatsMgr.java new file mode 100644 index 00000000000000..f273f6a79f2e0a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/StatsMgr.java @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog; + +import com.google.common.collect.Maps; +import com.google.gson.annotations.SerializedName; +import org.apache.doris.common.Config; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +public class StatsMgr implements Writable { + private static final Logger LOG = LogManager.getLogger(StatsMgr.class); + public StatsMgr() { + } + + public static class Stats implements Writable{ + @SerializedName("fe") + String fe; + + @SerializedName("queryNum") + long queryNum; + + public Stats(String fe, long queryNum) { + this.fe = fe; + this.queryNum = queryNum; + } + + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); + } + + public static Stats read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, Stats.class); + } + } + + @SerializedName("feToStats") + private final Map feToStats = Maps.newConcurrentMap(); + private final AtomicLong localQueryNum = new AtomicLong(0); + private long totalQueryNum = 0; + + public boolean checkQueryAccess() { + totalQueryNum = 0; + for (Map.Entry entry: feToStats.entrySet()) { + totalQueryNum += entry.getValue().queryNum; + } + return totalQueryNum < Config.max_running_query_num; + } + + public long getTotalQueryNum() { + return totalQueryNum; + } + + public long increaseQueryNum() { + return this.localQueryNum.getAndIncrement(); + } + + public long getAndResetQueryNum() { + return this.localQueryNum.getAndSet(0); + } + + public void setStats(Stats stats) { + feToStats.put(stats.fe, stats); + Catalog.getCurrentCatalog().getEditLog().logSetStats(stats); + } + + public void replaySetStats(Stats stats) { + feToStats.put(stats.fe, stats); + } + + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); + } + + public static StatsMgr read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, StatsMgr.class); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 21a84308804b1a..1621e11b2b0630 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -18,10 +18,21 @@ package org.apache.doris.common; import org.apache.doris.PaloFe; +import org.apache.doris.catalog.Catalog; import org.apache.doris.http.HttpServer; +import java.lang.reflect.Field; + public class Config extends ConfigBase { + static class StatsPeriodConfHandler implements ConfHandler { + @Override + public void handle(Field field, String confVal) throws Exception{ + setConfigField(field, confVal); + Catalog.getCurrentCatalog().setStatsPeriod(Long.parseLong(confVal)); + } + } + /** * Dir of custom config file */ @@ -719,6 +730,26 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static int max_running_txn_num_per_db = 100; + /** + * period for report statistics in each FE to Master. + */ + @ConfField(mutable = true, callback = StatsPeriodConfHandler.class) + public static long report_stats_period = 10 * 1000; + + /** + * maximum concurrent running query num, + * query processor will reject coming query. + */ + @ConfField(mutable = true) + public static long max_running_query_num = Long.MAX_VALUE; + + /** + * maximum concurrent running stream load num, + * load controller will reject coming stream load. + */ + @ConfField(mutable = true) + public static long max_running_txn_num = Long.MAX_VALUE; + /** * This configuration is just for compatible with old version, this config has been replaced by async_loading_load_task_pool_size, * it will be removed in the future. diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java b/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java index 8950fca519e0f1..15b8d5030ac23b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java @@ -1685,7 +1685,9 @@ public enum ErrorCode { "data cannot be inserted into table with empty partition. " + "Use `SHOW PARTITIONS FROM %s` to see the currently partitions of this table. "), ERROR_SQL_AND_LIMITATIONS_SET_IN_ONE_RULE(5084, new byte[]{'4', '2', '0', '0', '0'}, - "sql/sqlHash and partition_num/tablet_num/cardinality cannot be set in one rule.") + "sql/sqlHash and partition_num/tablet_num/cardinality cannot be set in one rule."), + ERR_RUNNING_QUERY_NUM_EXCEED(5085, new byte[]{'4', '2', '0', '0', '0'}, + "Current running query num is %d exceeds threshold %d.") ; // This is error code diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java index e05ba86eaa5a15..43250129d4b609 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -238,6 +238,9 @@ public final class FeMetaVersion { public static final int VERSION_105 = 105; // add ldap info public static final int VERSION_106 = 106; + // add query stats + public static final int VERSION_107 = 107; + // note: when increment meta version, should assign the latest version to VERSION_CURRENT - public static final int VERSION_CURRENT = VERSION_106; + public static final int VERSION_CURRENT = VERSION_107; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/MetaReader.java b/fe/fe-core/src/main/java/org/apache/doris/common/MetaReader.java index be646f96ff6f81..4c4b8ea19d42d8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/MetaReader.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/MetaReader.java @@ -90,6 +90,7 @@ public static void read(File imageFile, Catalog catalog) throws IOException, Ddl checksum = catalog.loadCluster(dis, checksum); checksum = catalog.loadBrokers(dis, checksum); checksum = catalog.loadResources(dis, checksum); + checksum = catalog.loadStats(dis, checksum); checksum = catalog.loadExportJob(dis, checksum); checksum = catalog.loadSyncJobs(dis,checksum); checksum = catalog.loadBackupHandler(dis, checksum); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/MetaWriter.java b/fe/fe-core/src/main/java/org/apache/doris/common/MetaWriter.java index 6b9d9aa79106ea..b0844a7a5e5664 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/MetaWriter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/MetaWriter.java @@ -116,6 +116,7 @@ public static void write(File imageFile, Catalog catalog) throws IOException { checksum.setRef(writer.doWork("cluster", () -> catalog.saveCluster(dos, checksum.getRef()))); checksum.setRef(writer.doWork("broker", () -> catalog.saveBrokers(dos, checksum.getRef()))); checksum.setRef(writer.doWork("resources", () -> catalog.saveResources(dos, checksum.getRef()))); + checksum.setRef(writer.doWork("stats", () -> catalog.saveStats(dos, checksum.getRef()))); checksum.setRef(writer.doWork("exportJob", () -> catalog.saveExportJob(dos, checksum.getRef()))); checksum.setRef(writer.doWork("syncJob", () -> catalog.saveSyncJobs(dos, checksum.getRef()))); checksum.setRef(writer.doWork("backupHandler", () -> catalog.saveBackupHandler(dos, checksum.getRef()))); diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java index 801e75d6c798a4..6549f42a537e4b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -31,6 +31,7 @@ import org.apache.doris.catalog.Function; import org.apache.doris.catalog.FunctionSearchDesc; import org.apache.doris.catalog.Resource; +import org.apache.doris.catalog.StatsMgr; import org.apache.doris.cluster.BaseParam; import org.apache.doris.cluster.Cluster; import org.apache.doris.common.io.Text; @@ -537,6 +538,11 @@ public void readFields(DataInput in) throws IOException { isRead = true; break; } + case OperationType.OP_SET_STATS: { + data = StatsMgr.Stats.read(in); + isRead = true; + break; + } case OperationType.OP_CREATE_SMALL_FILE: case OperationType.OP_DROP_SMALL_FILE: { data = SmallFile.read(in); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index c0a591a99a6d7c..d2cde0d1595224 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -24,6 +24,7 @@ import org.apache.doris.backup.Repository; import org.apache.doris.backup.RestoreJob; import org.apache.doris.blockrule.SqlBlockRule; +import org.apache.doris.catalog.StatsMgr; import org.apache.doris.catalog.BrokerMgr; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; @@ -677,6 +678,11 @@ public static void loadJournal(Catalog catalog, JournalEntity journal) { catalog.getResourceMgr().replayDropResource(operationLog); break; } + case OperationType.OP_SET_STATS: { + final StatsMgr.Stats stats = (StatsMgr.Stats) journal.getData(); + catalog.getStatsMgr().replaySetStats(stats); + break; + } case OperationType.OP_CREATE_SMALL_FILE: { SmallFile smallFile = (SmallFile) journal.getData(); catalog.getSmallFileMgr().replayCreateFile(smallFile); @@ -1301,6 +1307,10 @@ public void logDropResource(DropResourceOperationLog operationLog) { logEdit(OperationType.OP_DROP_RESOURCE, operationLog); } + public void logSetStats(StatsMgr.Stats stats) { + logEdit(OperationType.OP_SET_STATS, stats); + } + public void logCreateSmallFile(SmallFile info) { logEdit(OperationType.OP_CREATE_SMALL_FILE, info); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java index 2a5f3a23143223..7a3cb512ddfab0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java @@ -216,6 +216,8 @@ public class OperationType { public static final short OP_ALTER_SQL_BLOCK_RULE = 301; public static final short OP_DROP_SQL_BLOCK_RULE = 302; + public static final short OP_SET_STATS = 311; + // get opcode name by op codeStri public static String getOpName(short opCode) { try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 5b4be019da5401..015127f0569380 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -939,6 +939,15 @@ private void handleQueryStmt() throws Exception { return; } + // check query access + if (!Catalog.getCurrentCatalog().getStatsMgr().checkQueryAccess()) { + long totalQueryNum = Catalog.getCurrentCatalog().getStatsMgr().getTotalQueryNum(); + ErrorReport.report(ErrorCode.ERR_RUNNING_QUERY_NUM_EXCEED, totalQueryNum, Config.max_running_query_num); + LOG.warn("Current running query num is " + totalQueryNum +" exceeds threshold " + Config.max_running_query_num); + return; + } + Catalog.getCurrentCatalog().getStatsMgr().increaseQueryNum(); + // send result // 1. If this is a query with OUTFILE clause, eg: select * from tbl1 into outfile xxx, // We will not send real query result to client. Instead, we only send OK to client with diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index ac8ee57adb73e5..62312502a1bba9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -23,6 +23,7 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.StatsMgr; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.Table.TableType; import org.apache.doris.cluster.ClusterNamespace; @@ -94,6 +95,7 @@ import org.apache.doris.thrift.TReportExecStatusParams; import org.apache.doris.thrift.TReportExecStatusResult; import org.apache.doris.thrift.TReportRequest; +import org.apache.doris.thrift.TReportStatsRequest; import org.apache.doris.thrift.TShowVariableRequest; import org.apache.doris.thrift.TShowVariableResult; import org.apache.doris.thrift.TSnapshotLoaderReportRequest; @@ -791,6 +793,10 @@ private TLoadTxnBeginResult loadTxnBeginImpl(TLoadTxnBeginRequest request, Strin // begin long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : Config.stream_load_default_timeout_second; MetricRepo.COUNTER_LOAD_ADD.increase(1L); + // check txn num in system level + if (Catalog.getCurrentCatalog().getGlobalTransactionMgr().getRunningTxnNum() > Config.max_running_txn_num) { + throw new UserException("Current load transaction number exceeds threshold " + Config.max_running_txn_num); + } long txnId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction( db.getId(), Lists.newArrayList(table.getId()), request.getLabel(), request.getRequestId(), new TxnCoordinator(TxnSourceType.BE, clientIp), @@ -999,6 +1005,14 @@ public TStatus snapshotLoaderReport(TSnapshotLoaderReportRequest request) throws return new TStatus(TStatusCode.CANCELLED); } + @Override + public TStatus reportStats(TReportStatsRequest request) throws TException { + // run in Master only. + Catalog.getCurrentCatalog().getStatsMgr().setStats(new StatsMgr.Stats(request.getFe(), + request.getQueryNum())); + return new TStatus(TStatusCode.OK); + } + @Override public TFrontendPingFrontendResult ping(TFrontendPingFrontendRequest request) throws TException { boolean isReady = Catalog.getCurrentCatalog().isReady(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 87cd9812123dfa..1eaae675c5b227 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -889,6 +889,7 @@ protected void unprotectUpsertTransactionState(TransactionState transactionState runningRoutineLoadTxnNums++; } else { runningTxnNums++; + catalog.getGlobalTransactionMgr().increaseRunningTxnNum(); } } } else { @@ -897,6 +898,7 @@ protected void unprotectUpsertTransactionState(TransactionState transactionState runningRoutineLoadTxnNums--; } else { runningTxnNums--; + catalog.getGlobalTransactionMgr().decreaseRunningTxnNum(); } } idToFinalStatusTransactionState.put(transactionState.getTransactionId(), transactionState); diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index 6aedf62a73a7d8..ecc07f3f321052 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -56,6 +56,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; /** * Transaction Manager @@ -72,8 +73,22 @@ public class GlobalTransactionMgr implements Writable { private TransactionIdGenerator idGenerator = new TransactionIdGenerator(); private TxnStateCallbackFactory callbackFactory = new TxnStateCallbackFactory(); + private AtomicLong runningTxnNum = new AtomicLong(0); + private Catalog catalog; + public long getRunningTxnNum() { + return runningTxnNum.get(); + } + + public long increaseRunningTxnNum() { + return runningTxnNum.incrementAndGet(); + } + + public long decreaseRunningTxnNum() { + return runningTxnNum.decrementAndGet(); + } + public GlobalTransactionMgr(Catalog catalog) { this.catalog = catalog; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java index 035c3c5ba2a413..226d1448376b8c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java @@ -17,6 +17,12 @@ package org.apache.doris.load; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import mockit.Expectations; +import mockit.Mock; +import mockit.MockUp; +import mockit.Mocked; import org.apache.doris.analysis.AccessTestUtil; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.BinaryPredicate; @@ -32,7 +38,6 @@ import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.catalog.TabletMeta; -import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.MarkedCountDownLatch; @@ -52,25 +57,16 @@ import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionStatus; import org.apache.doris.transaction.TxnCommitAttachment; - import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; -import mockit.Expectations; -import mockit.Mock; -import mockit.MockUp; -import mockit.Mocked; - public class DeleteHandlerTest { private DeleteHandler deleteHandler; @@ -158,6 +154,10 @@ public void logInsertTransactionState(TransactionState transactionState) { catalog.getEditLog(); minTimes = 0; result = editLog; + + catalog.getGlobalTransactionMgr(); + minTimes = 0; + result = globalTransactionMgr; } }; globalTransactionMgr.addDatabaseTransactionMgr(db.getId()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/task/LoadPendingTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/task/LoadPendingTaskTest.java index dec07d8f3f689f..2f95d9b3b28347 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/task/LoadPendingTaskTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/task/LoadPendingTaskTest.java @@ -25,7 +25,6 @@ import org.apache.doris.catalog.Partition; import org.apache.doris.common.Config; import org.apache.doris.common.util.UnitTestUtil; -import org.apache.doris.load.DppConfig; import org.apache.doris.load.DppScheduler; import org.apache.doris.load.EtlSubmitResult; import org.apache.doris.load.Load; @@ -37,7 +36,6 @@ import org.apache.doris.persist.EditLog; import org.apache.doris.thrift.TStatus; import org.apache.doris.thrift.TStatusCode; - import org.apache.doris.transaction.GlobalTransactionMgr; import org.junit.Assert; import org.junit.Before; @@ -105,6 +103,10 @@ public void testRunPendingTask() throws Exception { minTimes = 0; result = editLog; + catalog.getGlobalTransactionMgr(); + minTimes = 0; + result = globalTransactionMgr; + Catalog.getCurrentCatalog(); minTimes = 0; result = catalog; diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 2818196a149b2a..0b0f3fc72bc17b 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -720,6 +720,11 @@ struct TWaitingTxnStatusResult { 2: optional i32 txn_status_id } +struct TReportStatsRequest { + 1: required string fe + 2: required i64 query_num +} + service FrontendService { TGetDbsResult getDbNames(1: TGetDbsParams params) TGetTablesResult getTableNames(1: TGetTablesParams params) @@ -757,6 +762,7 @@ service FrontendService { TStreamLoadPutResult streamLoadPut(1: TStreamLoadPutRequest request) Status.TStatus snapshotLoaderReport(1: TSnapshotLoaderReportRequest request) + Status.TStatus reportStats(1: TReportStatsRequest request) TFrontendPingFrontendResult ping(1: TFrontendPingFrontendRequest request) }