From 348d8d14f903890626ad8de6cf41a3cc8bd428d9 Mon Sep 17 00:00:00 2001 From: Yulei-Yang Date: Tue, 21 Feb 2023 18:29:16 +0800 Subject: [PATCH 1/7] [Improvement](meta) support return total statistics of all databases --- .../Show-Statements/SHOW-PROC.md | 2 +- .../Show-Statements/SHOW-PROC.md | 2 +- .../doris/common/proc/JobsDbProcDir.java | 3 +- .../apache/doris/common/proc/JobsProcDir.java | 75 +++++++++++++++++++ .../java/org/apache/doris/load/ExportMgr.java | 15 ++++ .../main/java/org/apache/doris/load/Load.java | 20 +++++ .../apache/doris/load/loadv2/LoadManager.java | 19 +++++ 7 files changed, 133 insertions(+), 3 deletions(-) diff --git a/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-PROC.md b/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-PROC.md index 6b555cbb07afa4..f170da70e035a4 100644 --- a/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-PROC.md +++ b/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-PROC.md @@ -87,7 +87,7 @@ illustrate: 3. frontends: Display all FE node information in the cluster, including IP address, role, status, whether it is a mater, etc., equivalent to [SHOW FRONTENDS](./SHOW-FRONTENDS.md) 4. routine_loads: Display all routine load job information, including job name, status, etc. 5. auth: User name and corresponding permission information -6. jobs: +6. jobs: show statistics of all kind of jobs. If a specific `dbId` is given, will return statistics data of the database. If `dbId` is -1, will return total statistics data of all databases 7. bdbje: To view the bdbje database list, you need to modify the `fe.conf` file to add `enable_bdbje_debug_mode=true`, and then start `FE` through `sh start_fe.sh --daemon` to enter the `debug` mode. After entering `debug` mode, only `http server` and `MySQLServer` will be started and the `BDBJE` instance will be opened, but no metadata loading and subsequent startup processes will be entered. 8. dbs: Mainly used to view the metadata information of each database and the tables in the Doris cluster. This information includes table structure, partitions, materialized views, data shards and replicas, and more. Through this directory and its subdirectories, you can clearly display the table metadata in the cluster, and locate some problems such as data skew, replica failure, etc. 9. resources : View system resources, ordinary accounts can only see resources that they have USAGE_PRIV permission to use. Only the root and admin accounts can see all resources. Equivalent to [SHOW RESOURCES](./SHOW-RESOURCES.md) diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-PROC.md b/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-PROC.md index ba46b4b630e4b7..7ddee944ae599b 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-PROC.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-PROC.md @@ -87,7 +87,7 @@ mysql> show proc "/"; 3. frontends :显示集群中所有的 FE 节点信息,包括IP地址、角色、状态、是否是mater等,等同于 [SHOW FRONTENDS](./SHOW-FRONTENDS.md) 4. routine_loads : 显示所有的 routine load 作业信息,包括作业名称、状态等 5. auth:用户名称及对应的权限信息 -6. jobs : +6. jobs :各类任务的统计信息,可查看指定数据库的 Job 的统计信息,如果 `dbId` = -1, 则返回所有库的汇总信息 7. bdbje:查看 bdbje 数据库列表,需要修改 `fe.conf` 文件增加 `enable_bdbje_debug_mode=true` , 然后通过 `sh start_fe.sh --daemon` 启动 `FE` 即可进入 `debug` 模式。 进入 `debug` 模式之后,仅会启动 `http server` 和 `MySQLServer` 并打开 `BDBJE` 实例,但不会进入任何元数据的加载及后续其他启动流程, 8. dbs : 主要用于查看 Doris 集群中各个数据库以及其中的表的元数据信息。这些信息包括表结构、分区、物化视图、数据分片和副本等等。通过这个目录和其子目录,可以清楚的展示集群中的表元数据情况,以及定位一些如数据倾斜、副本故障等问题 9. resources : 查看系统资源,普通账户只能看到自己有 USAGE_PRIV 使用权限的资源。只有root和admin账户可以看到所有的资源。等同于 [SHOW RESOURCES](./SHOW-RESOURCES.md) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsDbProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsDbProcDir.java index 2957d7133eb862..414d6912d9ec5f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsDbProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsDbProcDir.java @@ -60,7 +60,8 @@ public ProcNodeInterface lookup(String dbIdStr) throws AnalysisException { throw new AnalysisException("Invalid db id format: " + dbIdStr); } - Database db = env.getInternalCatalog().getDbOrAnalysisException(dbId); + // dbId = -1 means need total result of all databases + Database db = dbId == -1 ? null : env.getInternalCatalog().getDbOrAnalysisException(dbId); return new JobsProcDir(env, db); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java index 3d93d31e7dcd32..ea23a8aa4739ce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java @@ -90,7 +90,13 @@ public ProcResult fetchResult() throws AnalysisException { result.setNames(TITLE_NAMES); + // db is null means need total result of all databases + if (db == null) { + return fetchResultForAllDbs(); + } + long dbId = db.getId(); + // load Load load = Env.getCurrentEnv().getLoadInstance(); LoadManager loadManager = Env.getCurrentEnv().getLoadManager(); @@ -155,4 +161,73 @@ public ProcResult fetchResult() throws AnalysisException { return result; } + + public ProcResult fetchResultForAllDbs() { + BaseProcResult result = new BaseProcResult(); + + result.setNames(TITLE_NAMES); + // load + Load load = Env.getCurrentEnv().getLoadInstance(); + LoadManager loadManager = Env.getCurrentEnv().getLoadManager(); + Long pendingNum = load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.PENDING) + + loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.PENDING); + Long runningNum = load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.ETL) + + load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.LOADING) + + loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.LOADING); + Long finishedNum = load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.QUORUM_FINISHED) + + load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.FINISHED) + + loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.FINISHED); + Long cancelledNum = load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.CANCELLED) + + loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.CANCELLED); + Long totalNum = pendingNum + runningNum + finishedNum + cancelledNum; + result.addRow(Lists.newArrayList(LOAD, pendingNum.toString(), runningNum.toString(), finishedNum.toString(), + cancelledNum.toString(), totalNum.toString())); + + // delete + // TODO: find it from delete handler + pendingNum = 0L; + runningNum = 0L; + finishedNum = 0L; + cancelledNum = 0L; + totalNum = pendingNum + runningNum + finishedNum + cancelledNum; + result.addRow(Lists.newArrayList(DELETE, pendingNum.toString(), runningNum.toString(), finishedNum.toString(), + cancelledNum.toString(), totalNum.toString())); + + // rollup + MaterializedViewHandler materializedViewHandler = Env.getCurrentEnv().getMaterializedViewHandler(); + pendingNum = materializedViewHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.PENDING); + runningNum = materializedViewHandler.getAlterJobV2Num( + org.apache.doris.alter.AlterJobV2.JobState.WAITING_TXN) + + materializedViewHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.RUNNING); + finishedNum = materializedViewHandler.getAlterJobV2Num( + org.apache.doris.alter.AlterJobV2.JobState.FINISHED); + cancelledNum = materializedViewHandler.getAlterJobV2Num( + org.apache.doris.alter.AlterJobV2.JobState.CANCELLED); + totalNum = pendingNum + runningNum + finishedNum + cancelledNum; + result.addRow(Lists.newArrayList(ROLLUP, pendingNum.toString(), runningNum.toString(), finishedNum.toString(), + cancelledNum.toString(), totalNum.toString())); + + // schema change + SchemaChangeHandler schemaChangeHandler = Env.getCurrentEnv().getSchemaChangeHandler(); + pendingNum = schemaChangeHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.PENDING); + runningNum = schemaChangeHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.WAITING_TXN) + + schemaChangeHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.RUNNING); + finishedNum = schemaChangeHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.FINISHED); + cancelledNum = schemaChangeHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.CANCELLED); + totalNum = pendingNum + runningNum + finishedNum + cancelledNum; + result.addRow(Lists.newArrayList(SCHEMA_CHANGE, pendingNum.toString(), runningNum.toString(), + finishedNum.toString(), cancelledNum.toString(), totalNum.toString())); + + // export + ExportMgr exportMgr = Env.getCurrentEnv().getExportMgr(); + pendingNum = exportMgr.getJobNum(ExportJob.JobState.PENDING); + runningNum = exportMgr.getJobNum(ExportJob.JobState.EXPORTING); + finishedNum = exportMgr.getJobNum(ExportJob.JobState.FINISHED); + cancelledNum = exportMgr.getJobNum(ExportJob.JobState.CANCELLED); + totalNum = pendingNum + runningNum + finishedNum + cancelledNum; + result.addRow(Lists.newArrayList(EXPORT, pendingNum.toString(), runningNum.toString(), finishedNum.toString(), + cancelledNum.toString(), totalNum.toString())); + + return result; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java index a6862a9b2b024f..a5581b5ce3fa99 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java @@ -376,4 +376,19 @@ public long getJobNum(ExportJob.JobState state, long dbId) { } return size; } + + public long getJobNum(ExportJob.JobState state) { + int size = 0; + readLock(); + try { + for (ExportJob job : idToJob.values()) { + if (job.getState() == state) { + ++size; + } + } + } finally { + readUnlock(); + } + return size; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java index 854d96cb2aaf1e..10792586d91624 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java @@ -1326,6 +1326,26 @@ public long getLoadJobNum(JobState jobState, long dbId) { } } + public long getLoadJobNum(JobState jobState) { + readLock(); + try { + List loadJobs = new ArrayList<>(); + for (Long dbId : dbToLoadJobs.keySet()) { + loadJobs.addAll(this.dbToLoadJobs.get(dbId)); + } + + int jobNum = 0; + for (LoadJob job : loadJobs) { + if (job.getState() == jobState) { + ++jobNum; + } + } + return jobNum; + } finally { + readUnlock(); + } + } + public LoadJob getLoadJob(long jobId) { readLock(); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index 75d65b6225c5f8..c649fc0d9c5fa7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -61,6 +61,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; +import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -339,6 +340,24 @@ public int getLoadJobNum(JobState jobState, long dbId) { } } + /** + * Get load job num, used by proc. + **/ + public int getLoadJobNum(JobState jobState) { + readLock(); + try { + Map> labelToLoadJobs = new HashMap<>(); + for (Long dbId : dbIdToLabelToLoadJobs.keySet()) { + labelToLoadJobs.putAll(dbIdToLabelToLoadJobs.get(dbId)); + } + + List loadJobList = + labelToLoadJobs.values().stream().flatMap(entity -> entity.stream()).collect(Collectors.toList()); + return (int) loadJobList.stream().filter(entity -> entity.getState() == jobState).count(); + } finally { + readUnlock(); + } + } /** * Get load job num, used by metric. From fbeb384f1ce10a8e3da723aea2cfafbffa1bb2ea Mon Sep 17 00:00:00 2001 From: Yulei-Yang Date: Thu, 23 Feb 2023 13:29:44 +0800 Subject: [PATCH 2/7] support show proc '/jobs/-1/[load/delete/rollup/schema_change/export]' --- .../org/apache/doris/alter/AlterHandler.java | 17 +- .../doris/alter/MaterializedViewHandler.java | 19 ++ .../doris/alter/SchemaChangeHandler.java | 20 ++ .../doris/common/proc/ExportProcNode.java | 8 +- .../apache/doris/common/proc/JobsProcDir.java | 3 +- .../apache/doris/common/proc/LoadProcDir.java | 18 +- .../doris/common/proc/RollupProcDir.java | 9 +- .../common/proc/SchemaChangeProcDir.java | 9 +- .../org/apache/doris/load/DeleteHandler.java | 15 +- .../java/org/apache/doris/load/ExportMgr.java | 179 ++++++++----- .../main/java/org/apache/doris/load/Load.java | 240 +++++++++++------- .../apache/doris/load/loadv2/LoadManager.java | 42 +++ 12 files changed, 408 insertions(+), 171 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java index c408f9690d4955..bba06ef994ddb0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java @@ -34,8 +34,10 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.persist.RemoveAlterJobV2OperationLog; import org.apache.doris.persist.ReplicaPersistInfo; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.task.AlterReplicaTask; import com.google.common.base.Preconditions; @@ -140,7 +142,20 @@ public Long getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState state, l } public Long getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState state) { - return alterJobsV2.values().stream().filter(e -> e.getJobState() == state).count(); + Long counter = 0L; + + for (AlterJobV2 job : alterJobsV2.values()) { + if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), + Env.getCurrentEnv().getCatalogMgr().getDbNullable(job.getDbId()).getFullName(), + PrivPredicate.LOAD)) { + continue; + } + + if (job.getJobState() == state) { + counter++; + } + } + return counter; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java index c2b654e23f6e40..0c84f53a6860bc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java @@ -65,6 +65,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -1144,6 +1145,24 @@ public List> getAlterJobInfosByDb(Database db) { return rollupJobInfos; } + public List> getAllAlterJobInfos() { + List> rollupJobInfos = new LinkedList>(); + + ConnectContext ctx = ConnectContext.get(); + for (AlterJobV2 alterJob : ImmutableList.copyOf(alterJobsV2.values())) { + if (ctx != null) { + if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ctx, + Env.getCurrentEnv().getCatalogMgr().getDbNullable(alterJob.getDbId()).getFullName(), + alterJob.getTableName(), PrivPredicate.ALTER)) { + continue; + } + } + alterJob.getInfo(rollupJobInfos); + } + + return rollupJobInfos; + } + private void getAlterJobV2Infos(Database db, List> rollupJobInfos) { ConnectContext ctx = ConnectContext.get(); for (AlterJobV2 alterJob : alterJobsV2.values()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index a675db5215cf09..b7518cfd88b28d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -1597,6 +1597,26 @@ private void runAlterJobV2() { }); } + public List> getAllAlterJobInfos() { + List> schemaChangeJobInfos = new LinkedList<>(); + ConnectContext ctx = ConnectContext.get(); + for (AlterJobV2 alterJob : ImmutableList.copyOf(alterJobsV2.values())) { + if (ctx != null) { + if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ctx, + Env.getCurrentEnv().getCatalogMgr().getDbNullable(alterJob.getDbId()).getFullName(), + alterJob.getTableName(), PrivPredicate.ALTER)) { + continue; + } + } + alterJob.getInfo(schemaChangeJobInfos); + } + + // sort by "JobId", "PartitionName", "CreateTime", "FinishTime", "IndexName", "IndexState" + ListComparator> comparator = new ListComparator>(0, 1, 2, 3, 4, 5); + schemaChangeJobInfos.sort(comparator); + return schemaChangeJobInfos; + } + @Override public List> getAlterJobInfosByDb(Database db) { List> schemaChangeJobInfos = new LinkedList<>(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ExportProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ExportProcNode.java index 4c469a3039d304..1a154c8fd92265 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ExportProcNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ExportProcNode.java @@ -51,14 +51,18 @@ public ExportProcNode(ExportMgr exportMgr, Database db) { @Override public ProcResult fetchResult() throws AnalysisException { - Preconditions.checkNotNull(db); Preconditions.checkNotNull(exportMgr); BaseProcResult result = new BaseProcResult(); result.setNames(TITLE_NAMES); - List> jobInfos = exportMgr.getExportJobInfosByIdOrState( + List> jobInfos; + if (db == null) { + jobInfos = exportMgr.getExportJobInfos(LIMIT); + } else { + jobInfos = exportMgr.getExportJobInfosByIdOrState( db.getId(), 0, "", false, null, null, LIMIT); + } result.setRows(jobInfos); return result; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java index ea23a8aa4739ce..13baba524c1508 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java @@ -70,7 +70,8 @@ public ProcNodeInterface lookup(String jobTypeName) throws AnalysisException { if (jobTypeName.equals(LOAD)) { return new LoadProcDir(env.getLoadInstance(), db); } else if (jobTypeName.equals(DELETE)) { - return new DeleteInfoProcDir(env.getDeleteHandler(), env.getLoadInstance(), db.getId()); + Long dbId = db == null ? -1 : db.getId(); + return new DeleteInfoProcDir(env.getDeleteHandler(), env.getLoadInstance(), dbId); } else if (jobTypeName.equals(ROLLUP)) { return new RollupProcDir(env.getMaterializedViewHandler(), db); } else if (jobTypeName.equals(SCHEMA_CHANGE)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java index fa4ce9567b77ec..cb6c42d7e38cc5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java @@ -57,18 +57,26 @@ public LoadProcDir(Load load, Database db) { @Override public ProcResult fetchResult() throws AnalysisException { - Preconditions.checkNotNull(db); Preconditions.checkNotNull(load); BaseProcResult result = new BaseProcResult(); result.setNames(TITLE_NAMES); // merge load job from load and loadManager - LinkedList> loadJobInfos = load.getLoadJobInfosByDb(db.getId(), db.getFullName(), + LinkedList> loadJobInfos; + + // db is null means need total result of all databases + if (db == null) { + loadJobInfos = load.getAllLoadJobInfos(); + loadJobInfos.addAll(Env.getCurrentEnv().getLoadManager().getAllLoadJobInfos()); + } else { + loadJobInfos = load.getLoadJobInfosByDb(db.getId(), db.getFullName(), null, false, null); - loadJobInfos.addAll(Env.getCurrentEnv().getLoadManager().getLoadJobInfosByDb(db.getId(), null, - false, - null)); + loadJobInfos.addAll(Env.getCurrentEnv().getLoadManager().getLoadJobInfosByDb(db.getId(), null, + false, + null)); + } + int counter = 0; Iterator> iterator = loadJobInfos.descendingIterator(); while (iterator.hasNext()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/RollupProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/RollupProcDir.java index fb65f5231cdb9b..896d6349be946d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/RollupProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/RollupProcDir.java @@ -62,13 +62,18 @@ public RollupProcDir(MaterializedViewHandler materializedViewHandler, Database d @Override public ProcResult fetchResult() throws AnalysisException { - Preconditions.checkNotNull(db); Preconditions.checkNotNull(materializedViewHandler); BaseProcResult result = new BaseProcResult(); result.setNames(TITLE_NAMES); - List> rollupJobInfos = materializedViewHandler.getAlterJobInfosByDb(db); + List> rollupJobInfos; + // db is null means need total result of all databases + if (db == null) { + rollupJobInfos = materializedViewHandler.getAllAlterJobInfos(); + } else { + rollupJobInfos = materializedViewHandler.getAlterJobInfosByDb(db); + } for (List infoStr : rollupJobInfos) { List oneInfo = new ArrayList(TITLE_NAMES.size()); for (Comparable element : infoStr) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/SchemaChangeProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/SchemaChangeProcDir.java index 6bf05a02aefa13..d89792ddf59471 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/SchemaChangeProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/SchemaChangeProcDir.java @@ -176,13 +176,18 @@ public ProcResult fetchResultByFilter(HashMap filter, ArrayList> schemaChangeJobInfos = schemaChangeHandler.getAlterJobInfosByDb(db); + List> schemaChangeJobInfos; + // db is null means need total result of all databases + if (db == null) { + schemaChangeJobInfos = schemaChangeHandler.getAllAlterJobInfos(); + } else { + schemaChangeJobInfos = schemaChangeHandler.getAlterJobInfosByDb(db); + } for (List infoStr : schemaChangeJobInfos) { List oneInfo = new ArrayList(TITLE_NAMES.size()); for (Comparable element : infoStr) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java index 3e8d6c631830b7..bd793e0808ce5a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java @@ -833,7 +833,20 @@ public List> getDeleteInfosByDb(long dbId) { } String dbName = db.getFullName(); - List deleteInfoList = dbToDeleteInfos.get(dbId); + List deleteInfoList = new ArrayList<>(); + if (dbId == -1) { + for (Long tempDbId : dbToDeleteInfos.keySet()) { + if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), + Env.getCurrentEnv().getCatalogMgr().getDbNullable(tempDbId).getFullName(), + PrivPredicate.LOAD)) { + continue; + } + + deleteInfoList.addAll(dbToDeleteInfos.get(tempDbId)); + } + } else { + deleteInfoList = dbToDeleteInfos.get(dbId); + } readLock(); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java index a5581b5ce3fa99..a56337ce9bdd8d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java @@ -223,78 +223,17 @@ public List> getExportJobInfosByIdOrState( } } - // check auth - TableName tableName = job.getTableName(); - if (tableName == null || tableName.getTbl().equals("DUMMY")) { - // forward compatibility, no table name is saved before - Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); - if (db == null) { - continue; - } - if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), - db.getFullName(), PrivPredicate.SHOW)) { - continue; - } - } else { - if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(), - tableName.getDb(), tableName.getTbl(), - PrivPredicate.SHOW)) { - continue; - } - } - if (states != null) { if (!states.contains(state)) { continue; } } - List jobInfo = new ArrayList(); - - jobInfo.add(id); - jobInfo.add(jobLabel); - jobInfo.add(state.name()); - jobInfo.add(job.getProgress() + "%"); - - // task infos - Map infoMap = Maps.newHashMap(); - List partitions = job.getPartitions(); - if (partitions == null) { - partitions = Lists.newArrayList(); - partitions.add("*"); - } - infoMap.put("db", job.getTableName().getDb()); - infoMap.put("tbl", job.getTableName().getTbl()); - if (job.getWhereExpr() != null) { - infoMap.put("where expr", job.getWhereExpr().toMySql()); - } - infoMap.put("partitions", partitions); - infoMap.put("broker", job.getBrokerDesc().getName()); - infoMap.put("column separator", job.getColumnSeparator()); - infoMap.put("line delimiter", job.getLineDelimiter()); - infoMap.put("exec mem limit", job.getExecMemLimit()); - infoMap.put("columns", job.getColumns()); - infoMap.put("coord num", job.getCoordList().size()); - infoMap.put("tablet num", job.getTabletLocations() == null ? -1 : job.getTabletLocations().size()); - jobInfo.add(new Gson().toJson(infoMap)); - // path - jobInfo.add(job.getShowExportPath()); - - jobInfo.add(TimeUtils.longToTimeString(job.getCreateTimeMs())); - jobInfo.add(TimeUtils.longToTimeString(job.getStartTimeMs())); - jobInfo.add(TimeUtils.longToTimeString(job.getFinishTimeMs())); - jobInfo.add(job.getTimeoutSecond()); - - // error msg - if (job.getState() == ExportJob.JobState.CANCELLED) { - ExportFailMsg failMsg = job.getFailMsg(); - jobInfo.add("type:" + failMsg.getCancelType() + "; msg:" + failMsg.getMsg()); - } else { - jobInfo.add(FeConstants.null_string); + // check auth + if (isJobShowable(job)) { + exportJobInfos.add(composeExportJobInfo(job)); } - exportJobInfos.add(jobInfo); - if (++counter >= resultNum) { break; } @@ -322,6 +261,112 @@ public List> getExportJobInfosByIdOrState( return results; } + public List> getExportJobInfos(long limit) { + long resultNum = limit == -1L ? Integer.MAX_VALUE : limit; + LinkedList> exportJobInfos = new LinkedList>(); + + readLock(); + try { + int counter = 0; + for (ExportJob job : idToJob.values()) { + // check auth + if (isJobShowable(job)) { + exportJobInfos.add(composeExportJobInfo(job)); + } + + if (++counter >= resultNum) { + break; + } + } + } finally { + readUnlock(); + } + + // order by + ListComparator> comparator = null; + // sort by id asc + comparator = new ListComparator>(0); + Collections.sort(exportJobInfos, comparator); + + List> results = Lists.newArrayList(); + for (List list : exportJobInfos) { + results.add(list.stream().map(e -> e.toString()).collect(Collectors.toList())); + } + + return results; + } + + public boolean isJobShowable(ExportJob job) { + TableName tableName = job.getTableName(); + if (tableName == null || tableName.getTbl().equals("DUMMY")) { + // forward compatibility, no table name is saved before + Database db = Env.getCurrentInternalCatalog().getDbNullable(job.getDbId()); + if (db == null) { + return false; + } + if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), + db.getFullName(), PrivPredicate.SHOW)) { + return false; + } + } else { + if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(), + tableName.getDb(), tableName.getTbl(), + PrivPredicate.SHOW)) { + return false; + } + } + + return true; + } + + private List composeExportJobInfo(ExportJob job) { + List jobInfo = new ArrayList(); + + jobInfo.add(job.getId()); + jobInfo.add(job.getLabel()); + jobInfo.add(job.getState().name()); + jobInfo.add(job.getProgress() + "%"); + + // task infos + Map infoMap = Maps.newHashMap(); + List partitions = job.getPartitions(); + if (partitions == null) { + partitions = Lists.newArrayList(); + partitions.add("*"); + } + infoMap.put("db", job.getTableName().getDb()); + infoMap.put("tbl", job.getTableName().getTbl()); + if (job.getWhereExpr() != null) { + infoMap.put("where expr", job.getWhereExpr().toMySql()); + } + infoMap.put("partitions", partitions); + infoMap.put("broker", job.getBrokerDesc().getName()); + infoMap.put("column separator", job.getColumnSeparator()); + infoMap.put("line delimiter", job.getLineDelimiter()); + infoMap.put("exec mem limit", job.getExecMemLimit()); + infoMap.put("columns", job.getColumns()); + infoMap.put("coord num", job.getCoordList().size()); + infoMap.put("tablet num", job.getTabletLocations() == null ? -1 : job.getTabletLocations().size()); + jobInfo.add(new Gson().toJson(infoMap)); + // path + jobInfo.add(job.getShowExportPath()); + + jobInfo.add(TimeUtils.longToTimeString(job.getCreateTimeMs())); + jobInfo.add(TimeUtils.longToTimeString(job.getStartTimeMs())); + jobInfo.add(TimeUtils.longToTimeString(job.getFinishTimeMs())); + jobInfo.add(job.getTimeoutSecond()); + + // error msg + if (job.getState() == ExportJob.JobState.CANCELLED) { + ExportFailMsg failMsg = job.getFailMsg(); + jobInfo.add("type:" + failMsg.getCancelType() + "; msg:" + failMsg.getMsg()); + } else { + jobInfo.add(FeConstants.null_string); + } + + return jobInfo; + } + public void removeOldExportJobs() { long currentTimeMs = System.currentTimeMillis(); @@ -382,6 +427,12 @@ public long getJobNum(ExportJob.JobState state) { readLock(); try { for (ExportJob job : idToJob.values()) { + if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), + Env.getCurrentEnv().getCatalogMgr().getDbNullable(job.getDbId()).getFullName(), + PrivPredicate.LOAD)) { + continue; + } + if (job.getState() == state) { ++size; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java index 10792586d91624..7de591e4b0ac82 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java @@ -1331,6 +1331,11 @@ public long getLoadJobNum(JobState jobState) { try { List loadJobs = new ArrayList<>(); for (Long dbId : dbToLoadJobs.keySet()) { + if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), + Env.getCurrentEnv().getCatalogMgr().getDbNullable(dbId).getFullName(), + PrivPredicate.LOAD)) { + continue; + } loadJobs.addAll(this.dbToLoadJobs.get(dbId)); } @@ -1355,6 +1360,147 @@ public LoadJob getLoadJob(long jobId) { } } + public LinkedList> getAllLoadJobInfos() { + LinkedList> loadJobInfos = new LinkedList>(); + readLock(); + try { + List loadJobs = new ArrayList<>(); + for (Long dbId : dbToLoadJobs.keySet()) { + if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), + Env.getCurrentEnv().getCatalogMgr().getDbNullable(dbId).getFullName(), + PrivPredicate.LOAD)) { + continue; + } + + loadJobs.addAll(this.dbToLoadJobs.get(dbId)); + } + if (loadJobs.size() == 0) { + return loadJobInfos; + } + + long start = System.currentTimeMillis(); + LOG.debug("begin to get load job info, size: {}", loadJobs.size()); + + for (LoadJob loadJob : loadJobs) { + // filter first + String dbName = Env.getCurrentEnv().getCatalogMgr().getDbNullable(loadJob.getDbId()).getFullName(); + // check auth + Set tableNames = loadJob.getTableNames(); + boolean auth = true; + for (String tblName : tableNames) { + if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(), dbName, + tblName, PrivPredicate.LOAD)) { + auth = false; + break; + } + } + if (!auth) { + continue; + } + + loadJobInfos.add(composeJobInfoByLoadJob(loadJob)); + } // end for loadJobs + + LOG.debug("finished to get load job info, cost: {}", (System.currentTimeMillis() - start)); + } finally { + readUnlock(); + } + + return loadJobInfos; + } + + private List composeJobInfoByLoadJob(LoadJob loadJob) { + List jobInfo = new ArrayList(); + + // jobId + jobInfo.add(loadJob.getId()); + // label + jobInfo.add(loadJob.getLabel()); + // state + jobInfo.add(loadJob.getState().name()); + + // progress + switch (loadJob.getState()) { + case PENDING: + jobInfo.add("ETL:0%; LOAD:0%"); + break; + case ETL: + jobInfo.add("ETL:" + loadJob.getProgress() + "%; LOAD:0%"); + break; + case LOADING: + jobInfo.add("ETL:100%; LOAD:" + loadJob.getProgress() + "%"); + break; + case QUORUM_FINISHED: + case FINISHED: + jobInfo.add("ETL:100%; LOAD:100%"); + break; + case CANCELLED: + default: + jobInfo.add("ETL:N/A; LOAD:N/A"); + break; + } + + // type + jobInfo.add(loadJob.getEtlJobType().name()); + + // etl info + EtlStatus status = loadJob.getEtlJobStatus(); + if (status == null || status.getState() == TEtlState.CANCELLED) { + jobInfo.add(FeConstants.null_string); + } else { + Map counters = status.getCounters(); + List info = Lists.newArrayList(); + for (String key : counters.keySet()) { + // XXX: internal etl job return all counters + if (key.equalsIgnoreCase("HDFS bytes read") + || key.equalsIgnoreCase("Map input records") + || key.startsWith("dpp.") + || loadJob.getEtlJobType() == EtlJobType.MINI) { + info.add(key + "=" + counters.get(key)); + } + } // end for counters + if (info.isEmpty()) { + jobInfo.add(FeConstants.null_string); + } else { + jobInfo.add(StringUtils.join(info, "; ")); + } + } + + // task info + jobInfo.add("cluster:" + loadJob.getHadoopCluster() + + "; timeout(s):" + loadJob.getTimeoutSecond() + + "; max_filter_ratio:" + loadJob.getMaxFilterRatio()); + + // error msg + if (loadJob.getState() == JobState.CANCELLED) { + FailMsg failMsg = loadJob.getFailMsg(); + jobInfo.add("type:" + failMsg.getCancelType() + "; msg:" + failMsg.getMsg()); + } else { + jobInfo.add(FeConstants.null_string); + } + + // create time + jobInfo.add(TimeUtils.longToTimeString(loadJob.getCreateTimeMs())); + // etl start time + jobInfo.add(TimeUtils.longToTimeString(loadJob.getEtlStartTimeMs())); + // etl end time + jobInfo.add(TimeUtils.longToTimeString(loadJob.getEtlFinishTimeMs())); + // load start time + jobInfo.add(TimeUtils.longToTimeString(loadJob.getLoadStartTimeMs())); + // load end time + jobInfo.add(TimeUtils.longToTimeString(loadJob.getLoadFinishTimeMs())); + // tracking url + jobInfo.add(status.getTrackingUrl()); + // job detail(not used for hadoop load, just return an empty string) + jobInfo.add(""); + // transaction id + jobInfo.add(loadJob.getTransactionId()); + // error tablets(not used for hadoop load, just return an empty string) + jobInfo.add(""); + + return jobInfo; + } + public LinkedList> getLoadJobInfosByDb(long dbId, String dbName, String labelValue, boolean accurateMatch, Set states) throws AnalysisException { LinkedList> loadJobInfos = new LinkedList>(); @@ -1418,99 +1564,7 @@ public LinkedList> getLoadJobInfosByDb(long dbId, String dbName } } - List jobInfo = new ArrayList(); - - // jobId - jobInfo.add(loadJob.getId()); - // label - jobInfo.add(label); - // state - jobInfo.add(state.name()); - - // progress - switch (loadJob.getState()) { - case PENDING: - jobInfo.add("ETL:0%; LOAD:0%"); - break; - case ETL: - jobInfo.add("ETL:" + loadJob.getProgress() + "%; LOAD:0%"); - break; - case LOADING: - jobInfo.add("ETL:100%; LOAD:" + loadJob.getProgress() + "%"); - break; - case QUORUM_FINISHED: - jobInfo.add("ETL:100%; LOAD:100%"); - break; - case FINISHED: - jobInfo.add("ETL:100%; LOAD:100%"); - break; - case CANCELLED: - jobInfo.add("ETL:N/A; LOAD:N/A"); - break; - default: - jobInfo.add("ETL:N/A; LOAD:N/A"); - break; - } - - // type - jobInfo.add(loadJob.getEtlJobType().name()); - - // etl info - EtlStatus status = loadJob.getEtlJobStatus(); - if (status == null || status.getState() == TEtlState.CANCELLED) { - jobInfo.add(FeConstants.null_string); - } else { - Map counters = status.getCounters(); - List info = Lists.newArrayList(); - for (String key : counters.keySet()) { - // XXX: internal etl job return all counters - if (key.equalsIgnoreCase("HDFS bytes read") - || key.equalsIgnoreCase("Map input records") - || key.startsWith("dpp.") - || loadJob.getEtlJobType() == EtlJobType.MINI) { - info.add(key + "=" + counters.get(key)); - } - } // end for counters - if (info.isEmpty()) { - jobInfo.add(FeConstants.null_string); - } else { - jobInfo.add(StringUtils.join(info, "; ")); - } - } - - // task info - jobInfo.add("cluster:" + loadJob.getHadoopCluster() - + "; timeout(s):" + loadJob.getTimeoutSecond() - + "; max_filter_ratio:" + loadJob.getMaxFilterRatio()); - - // error msg - if (loadJob.getState() == JobState.CANCELLED) { - FailMsg failMsg = loadJob.getFailMsg(); - jobInfo.add("type:" + failMsg.getCancelType() + "; msg:" + failMsg.getMsg()); - } else { - jobInfo.add(FeConstants.null_string); - } - - // create time - jobInfo.add(TimeUtils.longToTimeString(loadJob.getCreateTimeMs())); - // etl start time - jobInfo.add(TimeUtils.longToTimeString(loadJob.getEtlStartTimeMs())); - // etl end time - jobInfo.add(TimeUtils.longToTimeString(loadJob.getEtlFinishTimeMs())); - // load start time - jobInfo.add(TimeUtils.longToTimeString(loadJob.getLoadStartTimeMs())); - // load end time - jobInfo.add(TimeUtils.longToTimeString(loadJob.getLoadFinishTimeMs())); - // tracking url - jobInfo.add(status.getTrackingUrl()); - // job detail(not used for hadoop load, just return an empty string) - jobInfo.add(""); - // transaction id - jobInfo.add(loadJob.getTransactionId()); - // error tablets(not used for hadoop load, just return an empty string) - jobInfo.add(""); - - loadJobInfos.add(jobInfo); + loadJobInfos.add(composeJobInfoByLoadJob(loadJob)); } // end for loadJobs LOG.debug("finished to get load job info, cost: {}", (System.currentTimeMillis() - start)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index c649fc0d9c5fa7..183792b06872bc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -41,7 +41,9 @@ import org.apache.doris.load.FailMsg; import org.apache.doris.load.FailMsg.CancelType; import org.apache.doris.load.Load; +import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.persist.CleanLabelOperationLog; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.DatabaseTransactionMgr; import org.apache.doris.transaction.TransactionState; @@ -348,6 +350,12 @@ public int getLoadJobNum(JobState jobState) { try { Map> labelToLoadJobs = new HashMap<>(); for (Long dbId : dbIdToLabelToLoadJobs.keySet()) { + if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), + Env.getCurrentEnv().getCatalogMgr().getDbNullable(dbId).getFullName(), + PrivPredicate.LOAD)) { + continue; + } + labelToLoadJobs.putAll(dbIdToLabelToLoadJobs.get(dbId)); } @@ -518,6 +526,40 @@ public List> getLoadJobInfosByDb(long dbId, String labelValue, } } + public List> getAllLoadJobInfos() { + LinkedList> loadJobInfos = new LinkedList>(); + + readLock(); + try { + Map> labelToLoadJobs = new HashMap<>(); + for (Long dbId : dbIdToLabelToLoadJobs.keySet()) { + if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), + Env.getCurrentEnv().getCatalogMgr().getDbNullable(dbId).getFullName(), + PrivPredicate.LOAD)) { + continue; + } + + labelToLoadJobs.putAll(dbIdToLabelToLoadJobs.get(dbId)); + } + List loadJobList = Lists.newArrayList(); + loadJobList.addAll( + labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList())); + + // check state + for (LoadJob loadJob : loadJobList) { + try { + // add load job info + loadJobInfos.add(loadJob.getShowInfo()); + } catch (DdlException e) { + continue; + } + } + return loadJobInfos; + } finally { + readUnlock(); + } + } + /** * Get load job info. **/ From 49ec7b9b620c7d989ee651c02064f108527bd5a1 Mon Sep 17 00:00:00 2001 From: Yulei-Yang Date: Tue, 21 Feb 2023 18:29:16 +0800 Subject: [PATCH 3/7] [Improvement](meta) support return total statistics of all databases --- .../Show-Statements/SHOW-PROC.md | 2 +- .../Show-Statements/SHOW-PROC.md | 2 +- .../doris/common/proc/JobsDbProcDir.java | 3 +- .../apache/doris/common/proc/JobsProcDir.java | 75 +++++++++++++++++++ .../java/org/apache/doris/load/ExportMgr.java | 15 ++++ .../main/java/org/apache/doris/load/Load.java | 20 +++++ .../apache/doris/load/loadv2/LoadManager.java | 19 +++++ 7 files changed, 133 insertions(+), 3 deletions(-) diff --git a/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-PROC.md b/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-PROC.md index 6b555cbb07afa4..f170da70e035a4 100644 --- a/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-PROC.md +++ b/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-PROC.md @@ -87,7 +87,7 @@ illustrate: 3. frontends: Display all FE node information in the cluster, including IP address, role, status, whether it is a mater, etc., equivalent to [SHOW FRONTENDS](./SHOW-FRONTENDS.md) 4. routine_loads: Display all routine load job information, including job name, status, etc. 5. auth: User name and corresponding permission information -6. jobs: +6. jobs: show statistics of all kind of jobs. If a specific `dbId` is given, will return statistics data of the database. If `dbId` is -1, will return total statistics data of all databases 7. bdbje: To view the bdbje database list, you need to modify the `fe.conf` file to add `enable_bdbje_debug_mode=true`, and then start `FE` through `sh start_fe.sh --daemon` to enter the `debug` mode. After entering `debug` mode, only `http server` and `MySQLServer` will be started and the `BDBJE` instance will be opened, but no metadata loading and subsequent startup processes will be entered. 8. dbs: Mainly used to view the metadata information of each database and the tables in the Doris cluster. This information includes table structure, partitions, materialized views, data shards and replicas, and more. Through this directory and its subdirectories, you can clearly display the table metadata in the cluster, and locate some problems such as data skew, replica failure, etc. 9. resources : View system resources, ordinary accounts can only see resources that they have USAGE_PRIV permission to use. Only the root and admin accounts can see all resources. Equivalent to [SHOW RESOURCES](./SHOW-RESOURCES.md) diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-PROC.md b/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-PROC.md index ba46b4b630e4b7..7ddee944ae599b 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-PROC.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-PROC.md @@ -87,7 +87,7 @@ mysql> show proc "/"; 3. frontends :显示集群中所有的 FE 节点信息,包括IP地址、角色、状态、是否是mater等,等同于 [SHOW FRONTENDS](./SHOW-FRONTENDS.md) 4. routine_loads : 显示所有的 routine load 作业信息,包括作业名称、状态等 5. auth:用户名称及对应的权限信息 -6. jobs : +6. jobs :各类任务的统计信息,可查看指定数据库的 Job 的统计信息,如果 `dbId` = -1, 则返回所有库的汇总信息 7. bdbje:查看 bdbje 数据库列表,需要修改 `fe.conf` 文件增加 `enable_bdbje_debug_mode=true` , 然后通过 `sh start_fe.sh --daemon` 启动 `FE` 即可进入 `debug` 模式。 进入 `debug` 模式之后,仅会启动 `http server` 和 `MySQLServer` 并打开 `BDBJE` 实例,但不会进入任何元数据的加载及后续其他启动流程, 8. dbs : 主要用于查看 Doris 集群中各个数据库以及其中的表的元数据信息。这些信息包括表结构、分区、物化视图、数据分片和副本等等。通过这个目录和其子目录,可以清楚的展示集群中的表元数据情况,以及定位一些如数据倾斜、副本故障等问题 9. resources : 查看系统资源,普通账户只能看到自己有 USAGE_PRIV 使用权限的资源。只有root和admin账户可以看到所有的资源。等同于 [SHOW RESOURCES](./SHOW-RESOURCES.md) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsDbProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsDbProcDir.java index 2957d7133eb862..414d6912d9ec5f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsDbProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsDbProcDir.java @@ -60,7 +60,8 @@ public ProcNodeInterface lookup(String dbIdStr) throws AnalysisException { throw new AnalysisException("Invalid db id format: " + dbIdStr); } - Database db = env.getInternalCatalog().getDbOrAnalysisException(dbId); + // dbId = -1 means need total result of all databases + Database db = dbId == -1 ? null : env.getInternalCatalog().getDbOrAnalysisException(dbId); return new JobsProcDir(env, db); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java index 3d93d31e7dcd32..ea23a8aa4739ce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java @@ -90,7 +90,13 @@ public ProcResult fetchResult() throws AnalysisException { result.setNames(TITLE_NAMES); + // db is null means need total result of all databases + if (db == null) { + return fetchResultForAllDbs(); + } + long dbId = db.getId(); + // load Load load = Env.getCurrentEnv().getLoadInstance(); LoadManager loadManager = Env.getCurrentEnv().getLoadManager(); @@ -155,4 +161,73 @@ public ProcResult fetchResult() throws AnalysisException { return result; } + + public ProcResult fetchResultForAllDbs() { + BaseProcResult result = new BaseProcResult(); + + result.setNames(TITLE_NAMES); + // load + Load load = Env.getCurrentEnv().getLoadInstance(); + LoadManager loadManager = Env.getCurrentEnv().getLoadManager(); + Long pendingNum = load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.PENDING) + + loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.PENDING); + Long runningNum = load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.ETL) + + load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.LOADING) + + loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.LOADING); + Long finishedNum = load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.QUORUM_FINISHED) + + load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.FINISHED) + + loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.FINISHED); + Long cancelledNum = load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.CANCELLED) + + loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.CANCELLED); + Long totalNum = pendingNum + runningNum + finishedNum + cancelledNum; + result.addRow(Lists.newArrayList(LOAD, pendingNum.toString(), runningNum.toString(), finishedNum.toString(), + cancelledNum.toString(), totalNum.toString())); + + // delete + // TODO: find it from delete handler + pendingNum = 0L; + runningNum = 0L; + finishedNum = 0L; + cancelledNum = 0L; + totalNum = pendingNum + runningNum + finishedNum + cancelledNum; + result.addRow(Lists.newArrayList(DELETE, pendingNum.toString(), runningNum.toString(), finishedNum.toString(), + cancelledNum.toString(), totalNum.toString())); + + // rollup + MaterializedViewHandler materializedViewHandler = Env.getCurrentEnv().getMaterializedViewHandler(); + pendingNum = materializedViewHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.PENDING); + runningNum = materializedViewHandler.getAlterJobV2Num( + org.apache.doris.alter.AlterJobV2.JobState.WAITING_TXN) + + materializedViewHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.RUNNING); + finishedNum = materializedViewHandler.getAlterJobV2Num( + org.apache.doris.alter.AlterJobV2.JobState.FINISHED); + cancelledNum = materializedViewHandler.getAlterJobV2Num( + org.apache.doris.alter.AlterJobV2.JobState.CANCELLED); + totalNum = pendingNum + runningNum + finishedNum + cancelledNum; + result.addRow(Lists.newArrayList(ROLLUP, pendingNum.toString(), runningNum.toString(), finishedNum.toString(), + cancelledNum.toString(), totalNum.toString())); + + // schema change + SchemaChangeHandler schemaChangeHandler = Env.getCurrentEnv().getSchemaChangeHandler(); + pendingNum = schemaChangeHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.PENDING); + runningNum = schemaChangeHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.WAITING_TXN) + + schemaChangeHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.RUNNING); + finishedNum = schemaChangeHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.FINISHED); + cancelledNum = schemaChangeHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.CANCELLED); + totalNum = pendingNum + runningNum + finishedNum + cancelledNum; + result.addRow(Lists.newArrayList(SCHEMA_CHANGE, pendingNum.toString(), runningNum.toString(), + finishedNum.toString(), cancelledNum.toString(), totalNum.toString())); + + // export + ExportMgr exportMgr = Env.getCurrentEnv().getExportMgr(); + pendingNum = exportMgr.getJobNum(ExportJob.JobState.PENDING); + runningNum = exportMgr.getJobNum(ExportJob.JobState.EXPORTING); + finishedNum = exportMgr.getJobNum(ExportJob.JobState.FINISHED); + cancelledNum = exportMgr.getJobNum(ExportJob.JobState.CANCELLED); + totalNum = pendingNum + runningNum + finishedNum + cancelledNum; + result.addRow(Lists.newArrayList(EXPORT, pendingNum.toString(), runningNum.toString(), finishedNum.toString(), + cancelledNum.toString(), totalNum.toString())); + + return result; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java index a6862a9b2b024f..a5581b5ce3fa99 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java @@ -376,4 +376,19 @@ public long getJobNum(ExportJob.JobState state, long dbId) { } return size; } + + public long getJobNum(ExportJob.JobState state) { + int size = 0; + readLock(); + try { + for (ExportJob job : idToJob.values()) { + if (job.getState() == state) { + ++size; + } + } + } finally { + readUnlock(); + } + return size; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java index c6d08b305456c6..64201f74399a39 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java @@ -1331,6 +1331,26 @@ public long getLoadJobNum(JobState jobState, long dbId) { } } + public long getLoadJobNum(JobState jobState) { + readLock(); + try { + List loadJobs = new ArrayList<>(); + for (Long dbId : dbToLoadJobs.keySet()) { + loadJobs.addAll(this.dbToLoadJobs.get(dbId)); + } + + int jobNum = 0; + for (LoadJob job : loadJobs) { + if (job.getState() == jobState) { + ++jobNum; + } + } + return jobNum; + } finally { + readUnlock(); + } + } + public LoadJob getLoadJob(long jobId) { readLock(); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index 1eb5d24a757872..0e2ec94d55f408 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -66,6 +66,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; +import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -345,6 +346,24 @@ public int getLoadJobNum(JobState jobState, long dbId) { } } + /** + * Get load job num, used by proc. + **/ + public int getLoadJobNum(JobState jobState) { + readLock(); + try { + Map> labelToLoadJobs = new HashMap<>(); + for (Long dbId : dbIdToLabelToLoadJobs.keySet()) { + labelToLoadJobs.putAll(dbIdToLabelToLoadJobs.get(dbId)); + } + + List loadJobList = + labelToLoadJobs.values().stream().flatMap(entity -> entity.stream()).collect(Collectors.toList()); + return (int) loadJobList.stream().filter(entity -> entity.getState() == jobState).count(); + } finally { + readUnlock(); + } + } /** * Get load job num, used by metric. From c34f32a033dd90711be8ef102732c842426fb549 Mon Sep 17 00:00:00 2001 From: Yulei-Yang Date: Thu, 23 Feb 2023 13:29:44 +0800 Subject: [PATCH 4/7] support show proc '/jobs/-1/[load/delete/rollup/schema_change/export]' --- .../org/apache/doris/alter/AlterHandler.java | 17 +- .../doris/alter/MaterializedViewHandler.java | 19 ++ .../doris/alter/SchemaChangeHandler.java | 20 ++ .../doris/common/proc/ExportProcNode.java | 8 +- .../apache/doris/common/proc/JobsProcDir.java | 3 +- .../apache/doris/common/proc/LoadProcDir.java | 18 +- .../doris/common/proc/RollupProcDir.java | 9 +- .../common/proc/SchemaChangeProcDir.java | 9 +- .../org/apache/doris/load/DeleteHandler.java | 15 +- .../java/org/apache/doris/load/ExportMgr.java | 179 ++++++++----- .../main/java/org/apache/doris/load/Load.java | 248 +++++++++++------- .../apache/doris/load/loadv2/LoadManager.java | 42 +++ 12 files changed, 412 insertions(+), 175 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java index c408f9690d4955..bba06ef994ddb0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java @@ -34,8 +34,10 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.persist.RemoveAlterJobV2OperationLog; import org.apache.doris.persist.ReplicaPersistInfo; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.task.AlterReplicaTask; import com.google.common.base.Preconditions; @@ -140,7 +142,20 @@ public Long getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState state, l } public Long getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState state) { - return alterJobsV2.values().stream().filter(e -> e.getJobState() == state).count(); + Long counter = 0L; + + for (AlterJobV2 job : alterJobsV2.values()) { + if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), + Env.getCurrentEnv().getCatalogMgr().getDbNullable(job.getDbId()).getFullName(), + PrivPredicate.LOAD)) { + continue; + } + + if (job.getJobState() == state) { + counter++; + } + } + return counter; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java index 5a7ff739cacb3f..8c0f21b2c14d97 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java @@ -65,6 +65,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -1146,6 +1147,24 @@ public List> getAlterJobInfosByDb(Database db) { return rollupJobInfos; } + public List> getAllAlterJobInfos() { + List> rollupJobInfos = new LinkedList>(); + + ConnectContext ctx = ConnectContext.get(); + for (AlterJobV2 alterJob : ImmutableList.copyOf(alterJobsV2.values())) { + if (ctx != null) { + if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ctx, + Env.getCurrentEnv().getCatalogMgr().getDbNullable(alterJob.getDbId()).getFullName(), + alterJob.getTableName(), PrivPredicate.ALTER)) { + continue; + } + } + alterJob.getInfo(rollupJobInfos); + } + + return rollupJobInfos; + } + private void getAlterJobV2Infos(Database db, List> rollupJobInfos) { ConnectContext ctx = ConnectContext.get(); for (AlterJobV2 alterJob : alterJobsV2.values()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index 11fb77c3ee0d79..5682d7556a2b28 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -1599,6 +1599,26 @@ private void runAlterJobV2() { }); } + public List> getAllAlterJobInfos() { + List> schemaChangeJobInfos = new LinkedList<>(); + ConnectContext ctx = ConnectContext.get(); + for (AlterJobV2 alterJob : ImmutableList.copyOf(alterJobsV2.values())) { + if (ctx != null) { + if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ctx, + Env.getCurrentEnv().getCatalogMgr().getDbNullable(alterJob.getDbId()).getFullName(), + alterJob.getTableName(), PrivPredicate.ALTER)) { + continue; + } + } + alterJob.getInfo(schemaChangeJobInfos); + } + + // sort by "JobId", "PartitionName", "CreateTime", "FinishTime", "IndexName", "IndexState" + ListComparator> comparator = new ListComparator>(0, 1, 2, 3, 4, 5); + schemaChangeJobInfos.sort(comparator); + return schemaChangeJobInfos; + } + @Override public List> getAlterJobInfosByDb(Database db) { List> schemaChangeJobInfos = new LinkedList<>(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ExportProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ExportProcNode.java index 4c469a3039d304..1a154c8fd92265 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ExportProcNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ExportProcNode.java @@ -51,14 +51,18 @@ public ExportProcNode(ExportMgr exportMgr, Database db) { @Override public ProcResult fetchResult() throws AnalysisException { - Preconditions.checkNotNull(db); Preconditions.checkNotNull(exportMgr); BaseProcResult result = new BaseProcResult(); result.setNames(TITLE_NAMES); - List> jobInfos = exportMgr.getExportJobInfosByIdOrState( + List> jobInfos; + if (db == null) { + jobInfos = exportMgr.getExportJobInfos(LIMIT); + } else { + jobInfos = exportMgr.getExportJobInfosByIdOrState( db.getId(), 0, "", false, null, null, LIMIT); + } result.setRows(jobInfos); return result; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java index ea23a8aa4739ce..13baba524c1508 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java @@ -70,7 +70,8 @@ public ProcNodeInterface lookup(String jobTypeName) throws AnalysisException { if (jobTypeName.equals(LOAD)) { return new LoadProcDir(env.getLoadInstance(), db); } else if (jobTypeName.equals(DELETE)) { - return new DeleteInfoProcDir(env.getDeleteHandler(), env.getLoadInstance(), db.getId()); + Long dbId = db == null ? -1 : db.getId(); + return new DeleteInfoProcDir(env.getDeleteHandler(), env.getLoadInstance(), dbId); } else if (jobTypeName.equals(ROLLUP)) { return new RollupProcDir(env.getMaterializedViewHandler(), db); } else if (jobTypeName.equals(SCHEMA_CHANGE)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java index c2cd21d81575eb..7021f45b04c4c5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java @@ -57,18 +57,26 @@ public LoadProcDir(Load load, Database db) { @Override public ProcResult fetchResult() throws AnalysisException { - Preconditions.checkNotNull(db); Preconditions.checkNotNull(load); BaseProcResult result = new BaseProcResult(); result.setNames(TITLE_NAMES); // merge load job from load and loadManager - LinkedList> loadJobInfos = load.getLoadJobInfosByDb(db.getId(), db.getFullName(), + LinkedList> loadJobInfos; + + // db is null means need total result of all databases + if (db == null) { + loadJobInfos = load.getAllLoadJobInfos(); + loadJobInfos.addAll(Env.getCurrentEnv().getLoadManager().getAllLoadJobInfos()); + } else { + loadJobInfos = load.getLoadJobInfosByDb(db.getId(), db.getFullName(), null, false, null); - loadJobInfos.addAll(Env.getCurrentEnv().getLoadManager().getLoadJobInfosByDb(db.getId(), null, - false, - null)); + loadJobInfos.addAll(Env.getCurrentEnv().getLoadManager().getLoadJobInfosByDb(db.getId(), null, + false, + null)); + } + int counter = 0; Iterator> iterator = loadJobInfos.descendingIterator(); while (iterator.hasNext()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/RollupProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/RollupProcDir.java index fb65f5231cdb9b..896d6349be946d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/RollupProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/RollupProcDir.java @@ -62,13 +62,18 @@ public RollupProcDir(MaterializedViewHandler materializedViewHandler, Database d @Override public ProcResult fetchResult() throws AnalysisException { - Preconditions.checkNotNull(db); Preconditions.checkNotNull(materializedViewHandler); BaseProcResult result = new BaseProcResult(); result.setNames(TITLE_NAMES); - List> rollupJobInfos = materializedViewHandler.getAlterJobInfosByDb(db); + List> rollupJobInfos; + // db is null means need total result of all databases + if (db == null) { + rollupJobInfos = materializedViewHandler.getAllAlterJobInfos(); + } else { + rollupJobInfos = materializedViewHandler.getAlterJobInfosByDb(db); + } for (List infoStr : rollupJobInfos) { List oneInfo = new ArrayList(TITLE_NAMES.size()); for (Comparable element : infoStr) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/SchemaChangeProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/SchemaChangeProcDir.java index 6bf05a02aefa13..d89792ddf59471 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/SchemaChangeProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/SchemaChangeProcDir.java @@ -176,13 +176,18 @@ public ProcResult fetchResultByFilter(HashMap filter, ArrayList> schemaChangeJobInfos = schemaChangeHandler.getAlterJobInfosByDb(db); + List> schemaChangeJobInfos; + // db is null means need total result of all databases + if (db == null) { + schemaChangeJobInfos = schemaChangeHandler.getAllAlterJobInfos(); + } else { + schemaChangeJobInfos = schemaChangeHandler.getAlterJobInfosByDb(db); + } for (List infoStr : schemaChangeJobInfos) { List oneInfo = new ArrayList(TITLE_NAMES.size()); for (Comparable element : infoStr) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java index 3e8d6c631830b7..bd793e0808ce5a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java @@ -833,7 +833,20 @@ public List> getDeleteInfosByDb(long dbId) { } String dbName = db.getFullName(); - List deleteInfoList = dbToDeleteInfos.get(dbId); + List deleteInfoList = new ArrayList<>(); + if (dbId == -1) { + for (Long tempDbId : dbToDeleteInfos.keySet()) { + if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), + Env.getCurrentEnv().getCatalogMgr().getDbNullable(tempDbId).getFullName(), + PrivPredicate.LOAD)) { + continue; + } + + deleteInfoList.addAll(dbToDeleteInfos.get(tempDbId)); + } + } else { + deleteInfoList = dbToDeleteInfos.get(dbId); + } readLock(); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java index a5581b5ce3fa99..a56337ce9bdd8d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java @@ -223,78 +223,17 @@ public List> getExportJobInfosByIdOrState( } } - // check auth - TableName tableName = job.getTableName(); - if (tableName == null || tableName.getTbl().equals("DUMMY")) { - // forward compatibility, no table name is saved before - Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); - if (db == null) { - continue; - } - if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), - db.getFullName(), PrivPredicate.SHOW)) { - continue; - } - } else { - if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(), - tableName.getDb(), tableName.getTbl(), - PrivPredicate.SHOW)) { - continue; - } - } - if (states != null) { if (!states.contains(state)) { continue; } } - List jobInfo = new ArrayList(); - - jobInfo.add(id); - jobInfo.add(jobLabel); - jobInfo.add(state.name()); - jobInfo.add(job.getProgress() + "%"); - - // task infos - Map infoMap = Maps.newHashMap(); - List partitions = job.getPartitions(); - if (partitions == null) { - partitions = Lists.newArrayList(); - partitions.add("*"); - } - infoMap.put("db", job.getTableName().getDb()); - infoMap.put("tbl", job.getTableName().getTbl()); - if (job.getWhereExpr() != null) { - infoMap.put("where expr", job.getWhereExpr().toMySql()); - } - infoMap.put("partitions", partitions); - infoMap.put("broker", job.getBrokerDesc().getName()); - infoMap.put("column separator", job.getColumnSeparator()); - infoMap.put("line delimiter", job.getLineDelimiter()); - infoMap.put("exec mem limit", job.getExecMemLimit()); - infoMap.put("columns", job.getColumns()); - infoMap.put("coord num", job.getCoordList().size()); - infoMap.put("tablet num", job.getTabletLocations() == null ? -1 : job.getTabletLocations().size()); - jobInfo.add(new Gson().toJson(infoMap)); - // path - jobInfo.add(job.getShowExportPath()); - - jobInfo.add(TimeUtils.longToTimeString(job.getCreateTimeMs())); - jobInfo.add(TimeUtils.longToTimeString(job.getStartTimeMs())); - jobInfo.add(TimeUtils.longToTimeString(job.getFinishTimeMs())); - jobInfo.add(job.getTimeoutSecond()); - - // error msg - if (job.getState() == ExportJob.JobState.CANCELLED) { - ExportFailMsg failMsg = job.getFailMsg(); - jobInfo.add("type:" + failMsg.getCancelType() + "; msg:" + failMsg.getMsg()); - } else { - jobInfo.add(FeConstants.null_string); + // check auth + if (isJobShowable(job)) { + exportJobInfos.add(composeExportJobInfo(job)); } - exportJobInfos.add(jobInfo); - if (++counter >= resultNum) { break; } @@ -322,6 +261,112 @@ public List> getExportJobInfosByIdOrState( return results; } + public List> getExportJobInfos(long limit) { + long resultNum = limit == -1L ? Integer.MAX_VALUE : limit; + LinkedList> exportJobInfos = new LinkedList>(); + + readLock(); + try { + int counter = 0; + for (ExportJob job : idToJob.values()) { + // check auth + if (isJobShowable(job)) { + exportJobInfos.add(composeExportJobInfo(job)); + } + + if (++counter >= resultNum) { + break; + } + } + } finally { + readUnlock(); + } + + // order by + ListComparator> comparator = null; + // sort by id asc + comparator = new ListComparator>(0); + Collections.sort(exportJobInfos, comparator); + + List> results = Lists.newArrayList(); + for (List list : exportJobInfos) { + results.add(list.stream().map(e -> e.toString()).collect(Collectors.toList())); + } + + return results; + } + + public boolean isJobShowable(ExportJob job) { + TableName tableName = job.getTableName(); + if (tableName == null || tableName.getTbl().equals("DUMMY")) { + // forward compatibility, no table name is saved before + Database db = Env.getCurrentInternalCatalog().getDbNullable(job.getDbId()); + if (db == null) { + return false; + } + if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), + db.getFullName(), PrivPredicate.SHOW)) { + return false; + } + } else { + if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(), + tableName.getDb(), tableName.getTbl(), + PrivPredicate.SHOW)) { + return false; + } + } + + return true; + } + + private List composeExportJobInfo(ExportJob job) { + List jobInfo = new ArrayList(); + + jobInfo.add(job.getId()); + jobInfo.add(job.getLabel()); + jobInfo.add(job.getState().name()); + jobInfo.add(job.getProgress() + "%"); + + // task infos + Map infoMap = Maps.newHashMap(); + List partitions = job.getPartitions(); + if (partitions == null) { + partitions = Lists.newArrayList(); + partitions.add("*"); + } + infoMap.put("db", job.getTableName().getDb()); + infoMap.put("tbl", job.getTableName().getTbl()); + if (job.getWhereExpr() != null) { + infoMap.put("where expr", job.getWhereExpr().toMySql()); + } + infoMap.put("partitions", partitions); + infoMap.put("broker", job.getBrokerDesc().getName()); + infoMap.put("column separator", job.getColumnSeparator()); + infoMap.put("line delimiter", job.getLineDelimiter()); + infoMap.put("exec mem limit", job.getExecMemLimit()); + infoMap.put("columns", job.getColumns()); + infoMap.put("coord num", job.getCoordList().size()); + infoMap.put("tablet num", job.getTabletLocations() == null ? -1 : job.getTabletLocations().size()); + jobInfo.add(new Gson().toJson(infoMap)); + // path + jobInfo.add(job.getShowExportPath()); + + jobInfo.add(TimeUtils.longToTimeString(job.getCreateTimeMs())); + jobInfo.add(TimeUtils.longToTimeString(job.getStartTimeMs())); + jobInfo.add(TimeUtils.longToTimeString(job.getFinishTimeMs())); + jobInfo.add(job.getTimeoutSecond()); + + // error msg + if (job.getState() == ExportJob.JobState.CANCELLED) { + ExportFailMsg failMsg = job.getFailMsg(); + jobInfo.add("type:" + failMsg.getCancelType() + "; msg:" + failMsg.getMsg()); + } else { + jobInfo.add(FeConstants.null_string); + } + + return jobInfo; + } + public void removeOldExportJobs() { long currentTimeMs = System.currentTimeMillis(); @@ -382,6 +427,12 @@ public long getJobNum(ExportJob.JobState state) { readLock(); try { for (ExportJob job : idToJob.values()) { + if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), + Env.getCurrentEnv().getCatalogMgr().getDbNullable(job.getDbId()).getFullName(), + PrivPredicate.LOAD)) { + continue; + } + if (job.getState() == state) { ++size; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java index 64201f74399a39..25a17f58656d3c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java @@ -1336,6 +1336,11 @@ public long getLoadJobNum(JobState jobState) { try { List loadJobs = new ArrayList<>(); for (Long dbId : dbToLoadJobs.keySet()) { + if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), + Env.getCurrentEnv().getCatalogMgr().getDbNullable(dbId).getFullName(), + PrivPredicate.LOAD)) { + continue; + } loadJobs.addAll(this.dbToLoadJobs.get(dbId)); } @@ -1360,6 +1365,151 @@ public LoadJob getLoadJob(long jobId) { } } + public LinkedList> getAllLoadJobInfos() { + LinkedList> loadJobInfos = new LinkedList>(); + readLock(); + try { + List loadJobs = new ArrayList<>(); + for (Long dbId : dbToLoadJobs.keySet()) { + if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), + Env.getCurrentEnv().getCatalogMgr().getDbNullable(dbId).getFullName(), + PrivPredicate.LOAD)) { + continue; + } + + loadJobs.addAll(this.dbToLoadJobs.get(dbId)); + } + if (loadJobs.size() == 0) { + return loadJobInfos; + } + + long start = System.currentTimeMillis(); + LOG.debug("begin to get load job info, size: {}", loadJobs.size()); + + for (LoadJob loadJob : loadJobs) { + // filter first + String dbName = Env.getCurrentEnv().getCatalogMgr().getDbNullable(loadJob.getDbId()).getFullName(); + // check auth + Set tableNames = loadJob.getTableNames(); + boolean auth = true; + for (String tblName : tableNames) { + if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(), dbName, + tblName, PrivPredicate.LOAD)) { + auth = false; + break; + } + } + if (!auth) { + continue; + } + + loadJobInfos.add(composeJobInfoByLoadJob(loadJob)); + } // end for loadJobs + + LOG.debug("finished to get load job info, cost: {}", (System.currentTimeMillis() - start)); + } finally { + readUnlock(); + } + + return loadJobInfos; + } + + private List composeJobInfoByLoadJob(LoadJob loadJob) { + List jobInfo = new ArrayList(); + + // jobId + jobInfo.add(loadJob.getId()); + // label + jobInfo.add(loadJob.getLabel()); + // state + jobInfo.add(loadJob.getState().name()); + + // progress + switch (loadJob.getState()) { + case PENDING: + jobInfo.add("ETL:0%; LOAD:0%"); + break; + case ETL: + jobInfo.add("ETL:" + loadJob.getProgress() + "%; LOAD:0%"); + break; + case LOADING: + jobInfo.add("ETL:100%; LOAD:" + loadJob.getProgress() + "%"); + break; + case QUORUM_FINISHED: + case FINISHED: + jobInfo.add("ETL:100%; LOAD:100%"); + break; + case CANCELLED: + default: + jobInfo.add("ETL:N/A; LOAD:N/A"); + break; + } + + // type + jobInfo.add(loadJob.getEtlJobType().name()); + + // etl info + EtlStatus status = loadJob.getEtlJobStatus(); + if (status == null || status.getState() == TEtlState.CANCELLED) { + jobInfo.add(FeConstants.null_string); + } else { + Map counters = status.getCounters(); + List info = Lists.newArrayList(); + for (String key : counters.keySet()) { + // XXX: internal etl job return all counters + if (key.equalsIgnoreCase("HDFS bytes read") + || key.equalsIgnoreCase("Map input records") + || key.startsWith("dpp.") + || loadJob.getEtlJobType() == EtlJobType.MINI) { + info.add(key + "=" + counters.get(key)); + } + } // end for counters + if (info.isEmpty()) { + jobInfo.add(FeConstants.null_string); + } else { + jobInfo.add(StringUtils.join(info, "; ")); + } + } + + // task info + jobInfo.add("cluster:" + loadJob.getHadoopCluster() + + "; timeout(s):" + loadJob.getTimeoutSecond() + + "; max_filter_ratio:" + loadJob.getMaxFilterRatio()); + + // error msg + if (loadJob.getState() == JobState.CANCELLED) { + FailMsg failMsg = loadJob.getFailMsg(); + jobInfo.add("type:" + failMsg.getCancelType() + "; msg:" + failMsg.getMsg()); + } else { + jobInfo.add(FeConstants.null_string); + } + + // create time + jobInfo.add(TimeUtils.longToTimeString(loadJob.getCreateTimeMs())); + // etl start time + jobInfo.add(TimeUtils.longToTimeString(loadJob.getEtlStartTimeMs())); + // etl end time + jobInfo.add(TimeUtils.longToTimeString(loadJob.getEtlFinishTimeMs())); + // load start time + jobInfo.add(TimeUtils.longToTimeString(loadJob.getLoadStartTimeMs())); + // load end time + jobInfo.add(TimeUtils.longToTimeString(loadJob.getLoadFinishTimeMs())); + // tracking url + jobInfo.add(status.getTrackingUrl()); + // job detail(not used for hadoop load, just return an empty string) + jobInfo.add(""); + // transaction id + jobInfo.add(loadJob.getTransactionId()); + // error tablets(not used for hadoop load, just return an empty string) + jobInfo.add(""); + // user + jobInfo.add(loadJob.getUser()); + // comment + jobInfo.add(loadJob.getComment()); + + return jobInfo; + } + public LinkedList> getLoadJobInfosByDb(long dbId, String dbName, String labelValue, boolean accurateMatch, Set states) throws AnalysisException { LinkedList> loadJobInfos = new LinkedList>(); @@ -1423,103 +1573,7 @@ public LinkedList> getLoadJobInfosByDb(long dbId, String dbName } } - List jobInfo = new ArrayList(); - - // jobId - jobInfo.add(loadJob.getId()); - // label - jobInfo.add(label); - // state - jobInfo.add(state.name()); - - // progress - switch (loadJob.getState()) { - case PENDING: - jobInfo.add("ETL:0%; LOAD:0%"); - break; - case ETL: - jobInfo.add("ETL:" + loadJob.getProgress() + "%; LOAD:0%"); - break; - case LOADING: - jobInfo.add("ETL:100%; LOAD:" + loadJob.getProgress() + "%"); - break; - case QUORUM_FINISHED: - jobInfo.add("ETL:100%; LOAD:100%"); - break; - case FINISHED: - jobInfo.add("ETL:100%; LOAD:100%"); - break; - case CANCELLED: - jobInfo.add("ETL:N/A; LOAD:N/A"); - break; - default: - jobInfo.add("ETL:N/A; LOAD:N/A"); - break; - } - - // type - jobInfo.add(loadJob.getEtlJobType().name()); - - // etl info - EtlStatus status = loadJob.getEtlJobStatus(); - if (status == null || status.getState() == TEtlState.CANCELLED) { - jobInfo.add(FeConstants.null_string); - } else { - Map counters = status.getCounters(); - List info = Lists.newArrayList(); - for (String key : counters.keySet()) { - // XXX: internal etl job return all counters - if (key.equalsIgnoreCase("HDFS bytes read") - || key.equalsIgnoreCase("Map input records") - || key.startsWith("dpp.") - || loadJob.getEtlJobType() == EtlJobType.MINI) { - info.add(key + "=" + counters.get(key)); - } - } // end for counters - if (info.isEmpty()) { - jobInfo.add(FeConstants.null_string); - } else { - jobInfo.add(StringUtils.join(info, "; ")); - } - } - - // task info - jobInfo.add("cluster:" + loadJob.getHadoopCluster() - + "; timeout(s):" + loadJob.getTimeoutSecond() - + "; max_filter_ratio:" + loadJob.getMaxFilterRatio()); - - // error msg - if (loadJob.getState() == JobState.CANCELLED) { - FailMsg failMsg = loadJob.getFailMsg(); - jobInfo.add("type:" + failMsg.getCancelType() + "; msg:" + failMsg.getMsg()); - } else { - jobInfo.add(FeConstants.null_string); - } - - // create time - jobInfo.add(TimeUtils.longToTimeString(loadJob.getCreateTimeMs())); - // etl start time - jobInfo.add(TimeUtils.longToTimeString(loadJob.getEtlStartTimeMs())); - // etl end time - jobInfo.add(TimeUtils.longToTimeString(loadJob.getEtlFinishTimeMs())); - // load start time - jobInfo.add(TimeUtils.longToTimeString(loadJob.getLoadStartTimeMs())); - // load end time - jobInfo.add(TimeUtils.longToTimeString(loadJob.getLoadFinishTimeMs())); - // tracking url - jobInfo.add(status.getTrackingUrl()); - // job detail(not used for hadoop load, just return an empty string) - jobInfo.add(""); - // transaction id - jobInfo.add(loadJob.getTransactionId()); - // error tablets(not used for hadoop load, just return an empty string) - jobInfo.add(""); - // user - jobInfo.add(loadJob.getUser()); - // comment - jobInfo.add(loadJob.getComment()); - - loadJobInfos.add(jobInfo); + loadJobInfos.add(composeJobInfoByLoadJob(loadJob)); } // end for loadJobs LOG.debug("finished to get load job info, cost: {}", (System.currentTimeMillis() - start)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index 0e2ec94d55f408..5488c5b4028fc2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -43,8 +43,10 @@ import org.apache.doris.load.FailMsg; import org.apache.doris.load.FailMsg.CancelType; import org.apache.doris.load.Load; +import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.persist.CleanLabelOperationLog; import org.apache.doris.qe.OriginStatement; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.DatabaseTransactionMgr; import org.apache.doris.transaction.TransactionState; @@ -354,6 +356,12 @@ public int getLoadJobNum(JobState jobState) { try { Map> labelToLoadJobs = new HashMap<>(); for (Long dbId : dbIdToLabelToLoadJobs.keySet()) { + if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), + Env.getCurrentEnv().getCatalogMgr().getDbNullable(dbId).getFullName(), + PrivPredicate.LOAD)) { + continue; + } + labelToLoadJobs.putAll(dbIdToLabelToLoadJobs.get(dbId)); } @@ -557,6 +565,40 @@ public List> getLoadJobInfosByDb(long dbId, String labelValue, } } + public List> getAllLoadJobInfos() { + LinkedList> loadJobInfos = new LinkedList>(); + + readLock(); + try { + Map> labelToLoadJobs = new HashMap<>(); + for (Long dbId : dbIdToLabelToLoadJobs.keySet()) { + if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), + Env.getCurrentEnv().getCatalogMgr().getDbNullable(dbId).getFullName(), + PrivPredicate.LOAD)) { + continue; + } + + labelToLoadJobs.putAll(dbIdToLabelToLoadJobs.get(dbId)); + } + List loadJobList = Lists.newArrayList(); + loadJobList.addAll( + labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList())); + + // check state + for (LoadJob loadJob : loadJobList) { + try { + // add load job info + loadJobInfos.add(loadJob.getShowInfo()); + } catch (DdlException e) { + continue; + } + } + return loadJobInfos; + } finally { + readUnlock(); + } + } + /** * Get load job info. **/ From 9934a86e040adb37c17496db1ae01a42f9737863 Mon Sep 17 00:00:00 2001 From: Yulei-Yang Date: Thu, 2 Mar 2023 17:26:24 +0800 Subject: [PATCH 5/7] fix code style issue --- .../src/main/java/org/apache/doris/load/loadv2/LoadManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index 5488c5b4028fc2..2457dacdcade4d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -45,8 +45,8 @@ import org.apache.doris.load.Load; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.persist.CleanLabelOperationLog; -import org.apache.doris.qe.OriginStatement; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.OriginStatement; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.DatabaseTransactionMgr; import org.apache.doris.transaction.TransactionState; From 120a06c70a06fed907226ac114fb40fba08cf880 Mon Sep 17 00:00:00 2001 From: Yulei-Yang Date: Sun, 5 Mar 2023 15:43:38 +0800 Subject: [PATCH 6/7] update --- .../org/apache/doris/alter/AlterHandler.java | 10 ++---- .../doris/alter/MaterializedViewHandler.java | 10 ++---- .../doris/alter/SchemaChangeHandler.java | 10 ++---- .../apache/doris/common/proc/JobsProcDir.java | 33 +++++-------------- 4 files changed, 15 insertions(+), 48 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java index bba06ef994ddb0..80a0e33393d188 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java @@ -34,10 +34,8 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.common.util.TimeUtils; -import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.persist.RemoveAlterJobV2OperationLog; import org.apache.doris.persist.ReplicaPersistInfo; -import org.apache.doris.qe.ConnectContext; import org.apache.doris.task.AlterReplicaTask; import com.google.common.base.Preconditions; @@ -145,12 +143,8 @@ public Long getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState state) { Long counter = 0L; for (AlterJobV2 job : alterJobsV2.values()) { - if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), - Env.getCurrentEnv().getCatalogMgr().getDbNullable(job.getDbId()).getFullName(), - PrivPredicate.LOAD)) { - continue; - } - + // no need to check priv here. This method is only called in show proc stmt, + // which already check the ADMIN priv. if (job.getJobState() == state) { counter++; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java index 8c0f21b2c14d97..f6fc944c925f8f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java @@ -1150,15 +1150,9 @@ public List> getAlterJobInfosByDb(Database db) { public List> getAllAlterJobInfos() { List> rollupJobInfos = new LinkedList>(); - ConnectContext ctx = ConnectContext.get(); for (AlterJobV2 alterJob : ImmutableList.copyOf(alterJobsV2.values())) { - if (ctx != null) { - if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ctx, - Env.getCurrentEnv().getCatalogMgr().getDbNullable(alterJob.getDbId()).getFullName(), - alterJob.getTableName(), PrivPredicate.ALTER)) { - continue; - } - } + // no need to check priv here. This method is only called in show proc stmt, + // which already check the ADMIN priv. alterJob.getInfo(rollupJobInfos); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index 5682d7556a2b28..a2b4156e38561c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -1601,15 +1601,9 @@ private void runAlterJobV2() { public List> getAllAlterJobInfos() { List> schemaChangeJobInfos = new LinkedList<>(); - ConnectContext ctx = ConnectContext.get(); for (AlterJobV2 alterJob : ImmutableList.copyOf(alterJobsV2.values())) { - if (ctx != null) { - if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ctx, - Env.getCurrentEnv().getCatalogMgr().getDbNullable(alterJob.getDbId()).getFullName(), - alterJob.getTableName(), PrivPredicate.ALTER)) { - continue; - } - } + // no need to check priv here. This method is only called in show proc stmt, + // which already check the ADMIN priv. alterJob.getInfo(schemaChangeJobInfos); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java index 13baba524c1508..3605a4ae032916 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java @@ -24,7 +24,6 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.load.ExportJob; import org.apache.doris.load.ExportMgr; -import org.apache.doris.load.Load; import org.apache.doris.load.loadv2.LoadManager; import com.google.common.base.Preconditions; @@ -99,21 +98,14 @@ public ProcResult fetchResult() throws AnalysisException { long dbId = db.getId(); // load - Load load = Env.getCurrentEnv().getLoadInstance(); LoadManager loadManager = Env.getCurrentEnv().getLoadManager(); - Long pendingNum = load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.PENDING, dbId) - + loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.PENDING, dbId); - Long runningNum = load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.ETL, dbId) - + load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.LOADING, dbId) - + loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.LOADING, dbId); - Long finishedNum = load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.QUORUM_FINISHED, dbId) - + load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.FINISHED, dbId) - + loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.FINISHED, dbId); - Long cancelledNum = load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.CANCELLED, dbId) - + loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.CANCELLED, dbId); + Long pendingNum = new Long(loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.PENDING, dbId)); + Long runningNum = new Long(loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.LOADING, dbId)); + Long finishedNum = new Long(loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.FINISHED, dbId)); + Long cancelledNum = new Long(loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.CANCELLED, dbId)); Long totalNum = pendingNum + runningNum + finishedNum + cancelledNum; result.addRow(Lists.newArrayList(LOAD, pendingNum.toString(), runningNum.toString(), finishedNum.toString(), - cancelledNum.toString(), totalNum.toString())); + cancelledNum.toString(), totalNum.toString())); // delete // TODO: find it from delete handler @@ -168,18 +160,11 @@ public ProcResult fetchResultForAllDbs() { result.setNames(TITLE_NAMES); // load - Load load = Env.getCurrentEnv().getLoadInstance(); LoadManager loadManager = Env.getCurrentEnv().getLoadManager(); - Long pendingNum = load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.PENDING) - + loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.PENDING); - Long runningNum = load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.ETL) - + load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.LOADING) - + loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.LOADING); - Long finishedNum = load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.QUORUM_FINISHED) - + load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.FINISHED) - + loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.FINISHED); - Long cancelledNum = load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.CANCELLED) - + loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.CANCELLED); + Long pendingNum = new Long(loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.PENDING)); + Long runningNum = new Long(loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.LOADING)); + Long finishedNum = new Long(loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.FINISHED)); + Long cancelledNum = new Long(loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.CANCELLED)); Long totalNum = pendingNum + runningNum + finishedNum + cancelledNum; result.addRow(Lists.newArrayList(LOAD, pendingNum.toString(), runningNum.toString(), finishedNum.toString(), cancelledNum.toString(), totalNum.toString())); From f54e0d965bc97d4f54b689cc2f55ec0b8637200f Mon Sep 17 00:00:00 2001 From: Yulei-Yang Date: Mon, 6 Mar 2023 17:01:25 +0800 Subject: [PATCH 7/7] update --- .../apache/doris/common/proc/JobsProcDir.java | 2 +- .../doris/common/proc/LoadJobProcNode.java | 23 ++++----------- .../apache/doris/common/proc/LoadProcDir.java | 29 ++++++------------- 3 files changed, 15 insertions(+), 39 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java index 3605a4ae032916..597066536b8dc9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java @@ -67,7 +67,7 @@ public ProcNodeInterface lookup(String jobTypeName) throws AnalysisException { } if (jobTypeName.equals(LOAD)) { - return new LoadProcDir(env.getLoadInstance(), db); + return new LoadProcDir(env.getCurrentEnv().getLoadManager(), db); } else if (jobTypeName.equals(DELETE)) { Long dbId = db == null ? -1 : db.getId(); return new DeleteInfoProcDir(env.getDeleteHandler(), env.getLoadInstance(), dbId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadJobProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadJobProcNode.java index 194349f2c60790..4b3056ab6c2ec7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadJobProcNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadJobProcNode.java @@ -18,13 +18,10 @@ package org.apache.doris.common.proc; import org.apache.doris.common.AnalysisException; -import org.apache.doris.load.Load; +import org.apache.doris.load.loadv2.LoadManager; import com.google.common.collect.ImmutableList; -import java.util.ArrayList; -import java.util.List; - public class LoadJobProcNode implements ProcNodeInterface { public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() @@ -32,11 +29,11 @@ public class LoadJobProcNode implements ProcNodeInterface { .add("PartitionId").add("LoadVersion") .build(); - private Load load; + private LoadManager loadManager; private long jobId; - public LoadJobProcNode(Load load, long jobId) { - this.load = load; + public LoadJobProcNode(LoadManager loadManager, long jobId) { + this.loadManager = loadManager; this.jobId = jobId; } @@ -45,17 +42,7 @@ public ProcResult fetchResult() throws AnalysisException { BaseProcResult result = new BaseProcResult(); result.setNames(TITLE_NAMES); - List> infos = load.getLoadJobUnfinishedInfo(jobId); - // In this step, the detail of load job which is belongs to LoadManager will not be presented. - // The reason is that there are no detail info in load job which is streaming during loading. - // So it don't need to invoke the LoadManager here. - for (List info : infos) { - List oneInfo = new ArrayList(TITLE_NAMES.size()); - for (Comparable element : info) { - oneInfo.add(element.toString()); - } - result.addRow(oneInfo); - } + // TODO get results from LoadManager. Before do that, update implement of LoadManager:record detail info return result; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java index 7021f45b04c4c5..86cd01dcc96139 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java @@ -18,16 +18,13 @@ package org.apache.doris.common.proc; import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.Env; import org.apache.doris.common.AnalysisException; -import org.apache.doris.load.Load; +import org.apache.doris.load.loadv2.LoadManager; -import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import java.util.ArrayList; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; public class LoadProcDir implements ProcDirInterface { @@ -47,38 +44,30 @@ public class LoadProcDir implements ProcDirInterface { private static final int LIMIT = 2000; - private Load load; + private LoadManager loadManager; private Database db; - public LoadProcDir(Load load, Database db) { - this.load = load; + public LoadProcDir(LoadManager loadManager, Database db) { + this.loadManager = loadManager; this.db = db; } @Override public ProcResult fetchResult() throws AnalysisException { - Preconditions.checkNotNull(load); - BaseProcResult result = new BaseProcResult(); result.setNames(TITLE_NAMES); - // merge load job from load and loadManager - LinkedList> loadJobInfos; + List> loadJobInfos; // db is null means need total result of all databases if (db == null) { - loadJobInfos = load.getAllLoadJobInfos(); - loadJobInfos.addAll(Env.getCurrentEnv().getLoadManager().getAllLoadJobInfos()); + loadJobInfos = loadManager.getAllLoadJobInfos(); } else { - loadJobInfos = load.getLoadJobInfosByDb(db.getId(), db.getFullName(), - null, false, null); - loadJobInfos.addAll(Env.getCurrentEnv().getLoadManager().getLoadJobInfosByDb(db.getId(), null, - false, - null)); + loadJobInfos = loadManager.getLoadJobInfosByDb(db.getId(), null, false, null); } int counter = 0; - Iterator> iterator = loadJobInfos.descendingIterator(); + Iterator> iterator = loadJobInfos.iterator(); while (iterator.hasNext()) { List infoStr = iterator.next(); List oneInfo = new ArrayList(TITLE_NAMES.size()); @@ -107,7 +96,7 @@ public ProcNodeInterface lookup(String jobIdStr) throws AnalysisException { throw new AnalysisException("Invalid job id format: " + jobIdStr); } - return new LoadJobProcNode(load, jobId); + return new LoadJobProcNode(loadManager, jobId); } public static int analyzeColumn(String columnName) throws AnalysisException {