diff --git a/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-PROC.md b/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-PROC.md index 6b555cbb07afa4..f170da70e035a4 100644 --- a/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-PROC.md +++ b/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-PROC.md @@ -87,7 +87,7 @@ illustrate: 3. frontends: Display all FE node information in the cluster, including IP address, role, status, whether it is a mater, etc., equivalent to [SHOW FRONTENDS](./SHOW-FRONTENDS.md) 4. routine_loads: Display all routine load job information, including job name, status, etc. 5. auth: User name and corresponding permission information -6. jobs: +6. jobs: show statistics of all kind of jobs. If a specific `dbId` is given, will return statistics data of the database. If `dbId` is -1, will return total statistics data of all databases 7. bdbje: To view the bdbje database list, you need to modify the `fe.conf` file to add `enable_bdbje_debug_mode=true`, and then start `FE` through `sh start_fe.sh --daemon` to enter the `debug` mode. After entering `debug` mode, only `http server` and `MySQLServer` will be started and the `BDBJE` instance will be opened, but no metadata loading and subsequent startup processes will be entered. 8. dbs: Mainly used to view the metadata information of each database and the tables in the Doris cluster. This information includes table structure, partitions, materialized views, data shards and replicas, and more. Through this directory and its subdirectories, you can clearly display the table metadata in the cluster, and locate some problems such as data skew, replica failure, etc. 9. resources : View system resources, ordinary accounts can only see resources that they have USAGE_PRIV permission to use. Only the root and admin accounts can see all resources. Equivalent to [SHOW RESOURCES](./SHOW-RESOURCES.md) diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-PROC.md b/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-PROC.md index ba46b4b630e4b7..7ddee944ae599b 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-PROC.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-PROC.md @@ -87,7 +87,7 @@ mysql> show proc "/"; 3. frontends :显示集群中所有的 FE 节点信息,包括IP地址、角色、状态、是否是mater等,等同于 [SHOW FRONTENDS](./SHOW-FRONTENDS.md) 4. routine_loads : 显示所有的 routine load 作业信息,包括作业名称、状态等 5. auth:用户名称及对应的权限信息 -6. jobs : +6. jobs :各类任务的统计信息,可查看指定数据库的 Job 的统计信息,如果 `dbId` = -1, 则返回所有库的汇总信息 7. bdbje:查看 bdbje 数据库列表,需要修改 `fe.conf` 文件增加 `enable_bdbje_debug_mode=true` , 然后通过 `sh start_fe.sh --daemon` 启动 `FE` 即可进入 `debug` 模式。 进入 `debug` 模式之后,仅会启动 `http server` 和 `MySQLServer` 并打开 `BDBJE` 实例,但不会进入任何元数据的加载及后续其他启动流程, 8. dbs : 主要用于查看 Doris 集群中各个数据库以及其中的表的元数据信息。这些信息包括表结构、分区、物化视图、数据分片和副本等等。通过这个目录和其子目录,可以清楚的展示集群中的表元数据情况,以及定位一些如数据倾斜、副本故障等问题 9. resources : 查看系统资源,普通账户只能看到自己有 USAGE_PRIV 使用权限的资源。只有root和admin账户可以看到所有的资源。等同于 [SHOW RESOURCES](./SHOW-RESOURCES.md) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java index c408f9690d4955..80a0e33393d188 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java @@ -140,7 +140,16 @@ public Long getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState state, l } public Long getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState state) { - return alterJobsV2.values().stream().filter(e -> e.getJobState() == state).count(); + Long counter = 0L; + + for (AlterJobV2 job : alterJobsV2.values()) { + // no need to check priv here. This method is only called in show proc stmt, + // which already check the ADMIN priv. + if (job.getJobState() == state) { + counter++; + } + } + return counter; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java index 5a7ff739cacb3f..f6fc944c925f8f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java @@ -65,6 +65,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -1146,6 +1147,18 @@ public List> getAlterJobInfosByDb(Database db) { return rollupJobInfos; } + public List> getAllAlterJobInfos() { + List> rollupJobInfos = new LinkedList>(); + + for (AlterJobV2 alterJob : ImmutableList.copyOf(alterJobsV2.values())) { + // no need to check priv here. This method is only called in show proc stmt, + // which already check the ADMIN priv. + alterJob.getInfo(rollupJobInfos); + } + + return rollupJobInfos; + } + private void getAlterJobV2Infos(Database db, List> rollupJobInfos) { ConnectContext ctx = ConnectContext.get(); for (AlterJobV2 alterJob : alterJobsV2.values()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index 11fb77c3ee0d79..a2b4156e38561c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -1599,6 +1599,20 @@ private void runAlterJobV2() { }); } + public List> getAllAlterJobInfos() { + List> schemaChangeJobInfos = new LinkedList<>(); + for (AlterJobV2 alterJob : ImmutableList.copyOf(alterJobsV2.values())) { + // no need to check priv here. This method is only called in show proc stmt, + // which already check the ADMIN priv. + alterJob.getInfo(schemaChangeJobInfos); + } + + // sort by "JobId", "PartitionName", "CreateTime", "FinishTime", "IndexName", "IndexState" + ListComparator> comparator = new ListComparator>(0, 1, 2, 3, 4, 5); + schemaChangeJobInfos.sort(comparator); + return schemaChangeJobInfos; + } + @Override public List> getAlterJobInfosByDb(Database db) { List> schemaChangeJobInfos = new LinkedList<>(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ExportProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ExportProcNode.java index 4c469a3039d304..1a154c8fd92265 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ExportProcNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ExportProcNode.java @@ -51,14 +51,18 @@ public ExportProcNode(ExportMgr exportMgr, Database db) { @Override public ProcResult fetchResult() throws AnalysisException { - Preconditions.checkNotNull(db); Preconditions.checkNotNull(exportMgr); BaseProcResult result = new BaseProcResult(); result.setNames(TITLE_NAMES); - List> jobInfos = exportMgr.getExportJobInfosByIdOrState( + List> jobInfos; + if (db == null) { + jobInfos = exportMgr.getExportJobInfos(LIMIT); + } else { + jobInfos = exportMgr.getExportJobInfosByIdOrState( db.getId(), 0, "", false, null, null, LIMIT); + } result.setRows(jobInfos); return result; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsDbProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsDbProcDir.java index 2957d7133eb862..414d6912d9ec5f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsDbProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsDbProcDir.java @@ -60,7 +60,8 @@ public ProcNodeInterface lookup(String dbIdStr) throws AnalysisException { throw new AnalysisException("Invalid db id format: " + dbIdStr); } - Database db = env.getInternalCatalog().getDbOrAnalysisException(dbId); + // dbId = -1 means need total result of all databases + Database db = dbId == -1 ? null : env.getInternalCatalog().getDbOrAnalysisException(dbId); return new JobsProcDir(env, db); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java index 3d93d31e7dcd32..597066536b8dc9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java @@ -24,7 +24,6 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.load.ExportJob; import org.apache.doris.load.ExportMgr; -import org.apache.doris.load.Load; import org.apache.doris.load.loadv2.LoadManager; import com.google.common.base.Preconditions; @@ -68,9 +67,10 @@ public ProcNodeInterface lookup(String jobTypeName) throws AnalysisException { } if (jobTypeName.equals(LOAD)) { - return new LoadProcDir(env.getLoadInstance(), db); + return new LoadProcDir(env.getCurrentEnv().getLoadManager(), db); } else if (jobTypeName.equals(DELETE)) { - return new DeleteInfoProcDir(env.getDeleteHandler(), env.getLoadInstance(), db.getId()); + Long dbId = db == null ? -1 : db.getId(); + return new DeleteInfoProcDir(env.getDeleteHandler(), env.getLoadInstance(), dbId); } else if (jobTypeName.equals(ROLLUP)) { return new RollupProcDir(env.getMaterializedViewHandler(), db); } else if (jobTypeName.equals(SCHEMA_CHANGE)) { @@ -90,23 +90,22 @@ public ProcResult fetchResult() throws AnalysisException { result.setNames(TITLE_NAMES); + // db is null means need total result of all databases + if (db == null) { + return fetchResultForAllDbs(); + } + long dbId = db.getId(); + // load - Load load = Env.getCurrentEnv().getLoadInstance(); LoadManager loadManager = Env.getCurrentEnv().getLoadManager(); - Long pendingNum = load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.PENDING, dbId) - + loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.PENDING, dbId); - Long runningNum = load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.ETL, dbId) - + load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.LOADING, dbId) - + loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.LOADING, dbId); - Long finishedNum = load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.QUORUM_FINISHED, dbId) - + load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.FINISHED, dbId) - + loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.FINISHED, dbId); - Long cancelledNum = load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.CANCELLED, dbId) - + loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.CANCELLED, dbId); + Long pendingNum = new Long(loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.PENDING, dbId)); + Long runningNum = new Long(loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.LOADING, dbId)); + Long finishedNum = new Long(loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.FINISHED, dbId)); + Long cancelledNum = new Long(loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.CANCELLED, dbId)); Long totalNum = pendingNum + runningNum + finishedNum + cancelledNum; result.addRow(Lists.newArrayList(LOAD, pendingNum.toString(), runningNum.toString(), finishedNum.toString(), - cancelledNum.toString(), totalNum.toString())); + cancelledNum.toString(), totalNum.toString())); // delete // TODO: find it from delete handler @@ -155,4 +154,66 @@ public ProcResult fetchResult() throws AnalysisException { return result; } + + public ProcResult fetchResultForAllDbs() { + BaseProcResult result = new BaseProcResult(); + + result.setNames(TITLE_NAMES); + // load + LoadManager loadManager = Env.getCurrentEnv().getLoadManager(); + Long pendingNum = new Long(loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.PENDING)); + Long runningNum = new Long(loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.LOADING)); + Long finishedNum = new Long(loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.FINISHED)); + Long cancelledNum = new Long(loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.CANCELLED)); + Long totalNum = pendingNum + runningNum + finishedNum + cancelledNum; + result.addRow(Lists.newArrayList(LOAD, pendingNum.toString(), runningNum.toString(), finishedNum.toString(), + cancelledNum.toString(), totalNum.toString())); + + // delete + // TODO: find it from delete handler + pendingNum = 0L; + runningNum = 0L; + finishedNum = 0L; + cancelledNum = 0L; + totalNum = pendingNum + runningNum + finishedNum + cancelledNum; + result.addRow(Lists.newArrayList(DELETE, pendingNum.toString(), runningNum.toString(), finishedNum.toString(), + cancelledNum.toString(), totalNum.toString())); + + // rollup + MaterializedViewHandler materializedViewHandler = Env.getCurrentEnv().getMaterializedViewHandler(); + pendingNum = materializedViewHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.PENDING); + runningNum = materializedViewHandler.getAlterJobV2Num( + org.apache.doris.alter.AlterJobV2.JobState.WAITING_TXN) + + materializedViewHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.RUNNING); + finishedNum = materializedViewHandler.getAlterJobV2Num( + org.apache.doris.alter.AlterJobV2.JobState.FINISHED); + cancelledNum = materializedViewHandler.getAlterJobV2Num( + org.apache.doris.alter.AlterJobV2.JobState.CANCELLED); + totalNum = pendingNum + runningNum + finishedNum + cancelledNum; + result.addRow(Lists.newArrayList(ROLLUP, pendingNum.toString(), runningNum.toString(), finishedNum.toString(), + cancelledNum.toString(), totalNum.toString())); + + // schema change + SchemaChangeHandler schemaChangeHandler = Env.getCurrentEnv().getSchemaChangeHandler(); + pendingNum = schemaChangeHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.PENDING); + runningNum = schemaChangeHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.WAITING_TXN) + + schemaChangeHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.RUNNING); + finishedNum = schemaChangeHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.FINISHED); + cancelledNum = schemaChangeHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.CANCELLED); + totalNum = pendingNum + runningNum + finishedNum + cancelledNum; + result.addRow(Lists.newArrayList(SCHEMA_CHANGE, pendingNum.toString(), runningNum.toString(), + finishedNum.toString(), cancelledNum.toString(), totalNum.toString())); + + // export + ExportMgr exportMgr = Env.getCurrentEnv().getExportMgr(); + pendingNum = exportMgr.getJobNum(ExportJob.JobState.PENDING); + runningNum = exportMgr.getJobNum(ExportJob.JobState.EXPORTING); + finishedNum = exportMgr.getJobNum(ExportJob.JobState.FINISHED); + cancelledNum = exportMgr.getJobNum(ExportJob.JobState.CANCELLED); + totalNum = pendingNum + runningNum + finishedNum + cancelledNum; + result.addRow(Lists.newArrayList(EXPORT, pendingNum.toString(), runningNum.toString(), finishedNum.toString(), + cancelledNum.toString(), totalNum.toString())); + + return result; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadJobProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadJobProcNode.java index 194349f2c60790..4b3056ab6c2ec7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadJobProcNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadJobProcNode.java @@ -18,13 +18,10 @@ package org.apache.doris.common.proc; import org.apache.doris.common.AnalysisException; -import org.apache.doris.load.Load; +import org.apache.doris.load.loadv2.LoadManager; import com.google.common.collect.ImmutableList; -import java.util.ArrayList; -import java.util.List; - public class LoadJobProcNode implements ProcNodeInterface { public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() @@ -32,11 +29,11 @@ public class LoadJobProcNode implements ProcNodeInterface { .add("PartitionId").add("LoadVersion") .build(); - private Load load; + private LoadManager loadManager; private long jobId; - public LoadJobProcNode(Load load, long jobId) { - this.load = load; + public LoadJobProcNode(LoadManager loadManager, long jobId) { + this.loadManager = loadManager; this.jobId = jobId; } @@ -45,17 +42,7 @@ public ProcResult fetchResult() throws AnalysisException { BaseProcResult result = new BaseProcResult(); result.setNames(TITLE_NAMES); - List> infos = load.getLoadJobUnfinishedInfo(jobId); - // In this step, the detail of load job which is belongs to LoadManager will not be presented. - // The reason is that there are no detail info in load job which is streaming during loading. - // So it don't need to invoke the LoadManager here. - for (List info : infos) { - List oneInfo = new ArrayList(TITLE_NAMES.size()); - for (Comparable element : info) { - oneInfo.add(element.toString()); - } - result.addRow(oneInfo); - } + // TODO get results from LoadManager. Before do that, update implement of LoadManager:record detail info return result; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java index c2cd21d81575eb..86cd01dcc96139 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java @@ -18,16 +18,13 @@ package org.apache.doris.common.proc; import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.Env; import org.apache.doris.common.AnalysisException; -import org.apache.doris.load.Load; +import org.apache.doris.load.loadv2.LoadManager; -import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import java.util.ArrayList; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; public class LoadProcDir implements ProcDirInterface { @@ -47,30 +44,30 @@ public class LoadProcDir implements ProcDirInterface { private static final int LIMIT = 2000; - private Load load; + private LoadManager loadManager; private Database db; - public LoadProcDir(Load load, Database db) { - this.load = load; + public LoadProcDir(LoadManager loadManager, Database db) { + this.loadManager = loadManager; this.db = db; } @Override public ProcResult fetchResult() throws AnalysisException { - Preconditions.checkNotNull(db); - Preconditions.checkNotNull(load); - BaseProcResult result = new BaseProcResult(); result.setNames(TITLE_NAMES); - // merge load job from load and loadManager - LinkedList> loadJobInfos = load.getLoadJobInfosByDb(db.getId(), db.getFullName(), - null, false, null); - loadJobInfos.addAll(Env.getCurrentEnv().getLoadManager().getLoadJobInfosByDb(db.getId(), null, - false, - null)); + List> loadJobInfos; + + // db is null means need total result of all databases + if (db == null) { + loadJobInfos = loadManager.getAllLoadJobInfos(); + } else { + loadJobInfos = loadManager.getLoadJobInfosByDb(db.getId(), null, false, null); + } + int counter = 0; - Iterator> iterator = loadJobInfos.descendingIterator(); + Iterator> iterator = loadJobInfos.iterator(); while (iterator.hasNext()) { List infoStr = iterator.next(); List oneInfo = new ArrayList(TITLE_NAMES.size()); @@ -99,7 +96,7 @@ public ProcNodeInterface lookup(String jobIdStr) throws AnalysisException { throw new AnalysisException("Invalid job id format: " + jobIdStr); } - return new LoadJobProcNode(load, jobId); + return new LoadJobProcNode(loadManager, jobId); } public static int analyzeColumn(String columnName) throws AnalysisException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/RollupProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/RollupProcDir.java index fb65f5231cdb9b..896d6349be946d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/RollupProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/RollupProcDir.java @@ -62,13 +62,18 @@ public RollupProcDir(MaterializedViewHandler materializedViewHandler, Database d @Override public ProcResult fetchResult() throws AnalysisException { - Preconditions.checkNotNull(db); Preconditions.checkNotNull(materializedViewHandler); BaseProcResult result = new BaseProcResult(); result.setNames(TITLE_NAMES); - List> rollupJobInfos = materializedViewHandler.getAlterJobInfosByDb(db); + List> rollupJobInfos; + // db is null means need total result of all databases + if (db == null) { + rollupJobInfos = materializedViewHandler.getAllAlterJobInfos(); + } else { + rollupJobInfos = materializedViewHandler.getAlterJobInfosByDb(db); + } for (List infoStr : rollupJobInfos) { List oneInfo = new ArrayList(TITLE_NAMES.size()); for (Comparable element : infoStr) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/SchemaChangeProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/SchemaChangeProcDir.java index 6bf05a02aefa13..d89792ddf59471 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/SchemaChangeProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/SchemaChangeProcDir.java @@ -176,13 +176,18 @@ public ProcResult fetchResultByFilter(HashMap filter, ArrayList> schemaChangeJobInfos = schemaChangeHandler.getAlterJobInfosByDb(db); + List> schemaChangeJobInfos; + // db is null means need total result of all databases + if (db == null) { + schemaChangeJobInfos = schemaChangeHandler.getAllAlterJobInfos(); + } else { + schemaChangeJobInfos = schemaChangeHandler.getAlterJobInfosByDb(db); + } for (List infoStr : schemaChangeJobInfos) { List oneInfo = new ArrayList(TITLE_NAMES.size()); for (Comparable element : infoStr) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java index 3e8d6c631830b7..bd793e0808ce5a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java @@ -833,7 +833,20 @@ public List> getDeleteInfosByDb(long dbId) { } String dbName = db.getFullName(); - List deleteInfoList = dbToDeleteInfos.get(dbId); + List deleteInfoList = new ArrayList<>(); + if (dbId == -1) { + for (Long tempDbId : dbToDeleteInfos.keySet()) { + if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), + Env.getCurrentEnv().getCatalogMgr().getDbNullable(tempDbId).getFullName(), + PrivPredicate.LOAD)) { + continue; + } + + deleteInfoList.addAll(dbToDeleteInfos.get(tempDbId)); + } + } else { + deleteInfoList = dbToDeleteInfos.get(dbId); + } readLock(); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java index a6862a9b2b024f..a56337ce9bdd8d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java @@ -223,78 +223,17 @@ public List> getExportJobInfosByIdOrState( } } - // check auth - TableName tableName = job.getTableName(); - if (tableName == null || tableName.getTbl().equals("DUMMY")) { - // forward compatibility, no table name is saved before - Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); - if (db == null) { - continue; - } - if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), - db.getFullName(), PrivPredicate.SHOW)) { - continue; - } - } else { - if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(), - tableName.getDb(), tableName.getTbl(), - PrivPredicate.SHOW)) { - continue; - } - } - if (states != null) { if (!states.contains(state)) { continue; } } - List jobInfo = new ArrayList(); - - jobInfo.add(id); - jobInfo.add(jobLabel); - jobInfo.add(state.name()); - jobInfo.add(job.getProgress() + "%"); - - // task infos - Map infoMap = Maps.newHashMap(); - List partitions = job.getPartitions(); - if (partitions == null) { - partitions = Lists.newArrayList(); - partitions.add("*"); - } - infoMap.put("db", job.getTableName().getDb()); - infoMap.put("tbl", job.getTableName().getTbl()); - if (job.getWhereExpr() != null) { - infoMap.put("where expr", job.getWhereExpr().toMySql()); - } - infoMap.put("partitions", partitions); - infoMap.put("broker", job.getBrokerDesc().getName()); - infoMap.put("column separator", job.getColumnSeparator()); - infoMap.put("line delimiter", job.getLineDelimiter()); - infoMap.put("exec mem limit", job.getExecMemLimit()); - infoMap.put("columns", job.getColumns()); - infoMap.put("coord num", job.getCoordList().size()); - infoMap.put("tablet num", job.getTabletLocations() == null ? -1 : job.getTabletLocations().size()); - jobInfo.add(new Gson().toJson(infoMap)); - // path - jobInfo.add(job.getShowExportPath()); - - jobInfo.add(TimeUtils.longToTimeString(job.getCreateTimeMs())); - jobInfo.add(TimeUtils.longToTimeString(job.getStartTimeMs())); - jobInfo.add(TimeUtils.longToTimeString(job.getFinishTimeMs())); - jobInfo.add(job.getTimeoutSecond()); - - // error msg - if (job.getState() == ExportJob.JobState.CANCELLED) { - ExportFailMsg failMsg = job.getFailMsg(); - jobInfo.add("type:" + failMsg.getCancelType() + "; msg:" + failMsg.getMsg()); - } else { - jobInfo.add(FeConstants.null_string); + // check auth + if (isJobShowable(job)) { + exportJobInfos.add(composeExportJobInfo(job)); } - exportJobInfos.add(jobInfo); - if (++counter >= resultNum) { break; } @@ -322,6 +261,112 @@ public List> getExportJobInfosByIdOrState( return results; } + public List> getExportJobInfos(long limit) { + long resultNum = limit == -1L ? Integer.MAX_VALUE : limit; + LinkedList> exportJobInfos = new LinkedList>(); + + readLock(); + try { + int counter = 0; + for (ExportJob job : idToJob.values()) { + // check auth + if (isJobShowable(job)) { + exportJobInfos.add(composeExportJobInfo(job)); + } + + if (++counter >= resultNum) { + break; + } + } + } finally { + readUnlock(); + } + + // order by + ListComparator> comparator = null; + // sort by id asc + comparator = new ListComparator>(0); + Collections.sort(exportJobInfos, comparator); + + List> results = Lists.newArrayList(); + for (List list : exportJobInfos) { + results.add(list.stream().map(e -> e.toString()).collect(Collectors.toList())); + } + + return results; + } + + public boolean isJobShowable(ExportJob job) { + TableName tableName = job.getTableName(); + if (tableName == null || tableName.getTbl().equals("DUMMY")) { + // forward compatibility, no table name is saved before + Database db = Env.getCurrentInternalCatalog().getDbNullable(job.getDbId()); + if (db == null) { + return false; + } + if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), + db.getFullName(), PrivPredicate.SHOW)) { + return false; + } + } else { + if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(), + tableName.getDb(), tableName.getTbl(), + PrivPredicate.SHOW)) { + return false; + } + } + + return true; + } + + private List composeExportJobInfo(ExportJob job) { + List jobInfo = new ArrayList(); + + jobInfo.add(job.getId()); + jobInfo.add(job.getLabel()); + jobInfo.add(job.getState().name()); + jobInfo.add(job.getProgress() + "%"); + + // task infos + Map infoMap = Maps.newHashMap(); + List partitions = job.getPartitions(); + if (partitions == null) { + partitions = Lists.newArrayList(); + partitions.add("*"); + } + infoMap.put("db", job.getTableName().getDb()); + infoMap.put("tbl", job.getTableName().getTbl()); + if (job.getWhereExpr() != null) { + infoMap.put("where expr", job.getWhereExpr().toMySql()); + } + infoMap.put("partitions", partitions); + infoMap.put("broker", job.getBrokerDesc().getName()); + infoMap.put("column separator", job.getColumnSeparator()); + infoMap.put("line delimiter", job.getLineDelimiter()); + infoMap.put("exec mem limit", job.getExecMemLimit()); + infoMap.put("columns", job.getColumns()); + infoMap.put("coord num", job.getCoordList().size()); + infoMap.put("tablet num", job.getTabletLocations() == null ? -1 : job.getTabletLocations().size()); + jobInfo.add(new Gson().toJson(infoMap)); + // path + jobInfo.add(job.getShowExportPath()); + + jobInfo.add(TimeUtils.longToTimeString(job.getCreateTimeMs())); + jobInfo.add(TimeUtils.longToTimeString(job.getStartTimeMs())); + jobInfo.add(TimeUtils.longToTimeString(job.getFinishTimeMs())); + jobInfo.add(job.getTimeoutSecond()); + + // error msg + if (job.getState() == ExportJob.JobState.CANCELLED) { + ExportFailMsg failMsg = job.getFailMsg(); + jobInfo.add("type:" + failMsg.getCancelType() + "; msg:" + failMsg.getMsg()); + } else { + jobInfo.add(FeConstants.null_string); + } + + return jobInfo; + } + public void removeOldExportJobs() { long currentTimeMs = System.currentTimeMillis(); @@ -376,4 +421,25 @@ public long getJobNum(ExportJob.JobState state, long dbId) { } return size; } + + public long getJobNum(ExportJob.JobState state) { + int size = 0; + readLock(); + try { + for (ExportJob job : idToJob.values()) { + if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), + Env.getCurrentEnv().getCatalogMgr().getDbNullable(job.getDbId()).getFullName(), + PrivPredicate.LOAD)) { + continue; + } + + if (job.getState() == state) { + ++size; + } + } + } finally { + readUnlock(); + } + return size; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java index c6d08b305456c6..25a17f58656d3c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java @@ -1331,6 +1331,31 @@ public long getLoadJobNum(JobState jobState, long dbId) { } } + public long getLoadJobNum(JobState jobState) { + readLock(); + try { + List loadJobs = new ArrayList<>(); + for (Long dbId : dbToLoadJobs.keySet()) { + if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), + Env.getCurrentEnv().getCatalogMgr().getDbNullable(dbId).getFullName(), + PrivPredicate.LOAD)) { + continue; + } + loadJobs.addAll(this.dbToLoadJobs.get(dbId)); + } + + int jobNum = 0; + for (LoadJob job : loadJobs) { + if (job.getState() == jobState) { + ++jobNum; + } + } + return jobNum; + } finally { + readUnlock(); + } + } + public LoadJob getLoadJob(long jobId) { readLock(); try { @@ -1340,6 +1365,151 @@ public LoadJob getLoadJob(long jobId) { } } + public LinkedList> getAllLoadJobInfos() { + LinkedList> loadJobInfos = new LinkedList>(); + readLock(); + try { + List loadJobs = new ArrayList<>(); + for (Long dbId : dbToLoadJobs.keySet()) { + if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), + Env.getCurrentEnv().getCatalogMgr().getDbNullable(dbId).getFullName(), + PrivPredicate.LOAD)) { + continue; + } + + loadJobs.addAll(this.dbToLoadJobs.get(dbId)); + } + if (loadJobs.size() == 0) { + return loadJobInfos; + } + + long start = System.currentTimeMillis(); + LOG.debug("begin to get load job info, size: {}", loadJobs.size()); + + for (LoadJob loadJob : loadJobs) { + // filter first + String dbName = Env.getCurrentEnv().getCatalogMgr().getDbNullable(loadJob.getDbId()).getFullName(); + // check auth + Set tableNames = loadJob.getTableNames(); + boolean auth = true; + for (String tblName : tableNames) { + if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(), dbName, + tblName, PrivPredicate.LOAD)) { + auth = false; + break; + } + } + if (!auth) { + continue; + } + + loadJobInfos.add(composeJobInfoByLoadJob(loadJob)); + } // end for loadJobs + + LOG.debug("finished to get load job info, cost: {}", (System.currentTimeMillis() - start)); + } finally { + readUnlock(); + } + + return loadJobInfos; + } + + private List composeJobInfoByLoadJob(LoadJob loadJob) { + List jobInfo = new ArrayList(); + + // jobId + jobInfo.add(loadJob.getId()); + // label + jobInfo.add(loadJob.getLabel()); + // state + jobInfo.add(loadJob.getState().name()); + + // progress + switch (loadJob.getState()) { + case PENDING: + jobInfo.add("ETL:0%; LOAD:0%"); + break; + case ETL: + jobInfo.add("ETL:" + loadJob.getProgress() + "%; LOAD:0%"); + break; + case LOADING: + jobInfo.add("ETL:100%; LOAD:" + loadJob.getProgress() + "%"); + break; + case QUORUM_FINISHED: + case FINISHED: + jobInfo.add("ETL:100%; LOAD:100%"); + break; + case CANCELLED: + default: + jobInfo.add("ETL:N/A; LOAD:N/A"); + break; + } + + // type + jobInfo.add(loadJob.getEtlJobType().name()); + + // etl info + EtlStatus status = loadJob.getEtlJobStatus(); + if (status == null || status.getState() == TEtlState.CANCELLED) { + jobInfo.add(FeConstants.null_string); + } else { + Map counters = status.getCounters(); + List info = Lists.newArrayList(); + for (String key : counters.keySet()) { + // XXX: internal etl job return all counters + if (key.equalsIgnoreCase("HDFS bytes read") + || key.equalsIgnoreCase("Map input records") + || key.startsWith("dpp.") + || loadJob.getEtlJobType() == EtlJobType.MINI) { + info.add(key + "=" + counters.get(key)); + } + } // end for counters + if (info.isEmpty()) { + jobInfo.add(FeConstants.null_string); + } else { + jobInfo.add(StringUtils.join(info, "; ")); + } + } + + // task info + jobInfo.add("cluster:" + loadJob.getHadoopCluster() + + "; timeout(s):" + loadJob.getTimeoutSecond() + + "; max_filter_ratio:" + loadJob.getMaxFilterRatio()); + + // error msg + if (loadJob.getState() == JobState.CANCELLED) { + FailMsg failMsg = loadJob.getFailMsg(); + jobInfo.add("type:" + failMsg.getCancelType() + "; msg:" + failMsg.getMsg()); + } else { + jobInfo.add(FeConstants.null_string); + } + + // create time + jobInfo.add(TimeUtils.longToTimeString(loadJob.getCreateTimeMs())); + // etl start time + jobInfo.add(TimeUtils.longToTimeString(loadJob.getEtlStartTimeMs())); + // etl end time + jobInfo.add(TimeUtils.longToTimeString(loadJob.getEtlFinishTimeMs())); + // load start time + jobInfo.add(TimeUtils.longToTimeString(loadJob.getLoadStartTimeMs())); + // load end time + jobInfo.add(TimeUtils.longToTimeString(loadJob.getLoadFinishTimeMs())); + // tracking url + jobInfo.add(status.getTrackingUrl()); + // job detail(not used for hadoop load, just return an empty string) + jobInfo.add(""); + // transaction id + jobInfo.add(loadJob.getTransactionId()); + // error tablets(not used for hadoop load, just return an empty string) + jobInfo.add(""); + // user + jobInfo.add(loadJob.getUser()); + // comment + jobInfo.add(loadJob.getComment()); + + return jobInfo; + } + public LinkedList> getLoadJobInfosByDb(long dbId, String dbName, String labelValue, boolean accurateMatch, Set states) throws AnalysisException { LinkedList> loadJobInfos = new LinkedList>(); @@ -1403,103 +1573,7 @@ public LinkedList> getLoadJobInfosByDb(long dbId, String dbName } } - List jobInfo = new ArrayList(); - - // jobId - jobInfo.add(loadJob.getId()); - // label - jobInfo.add(label); - // state - jobInfo.add(state.name()); - - // progress - switch (loadJob.getState()) { - case PENDING: - jobInfo.add("ETL:0%; LOAD:0%"); - break; - case ETL: - jobInfo.add("ETL:" + loadJob.getProgress() + "%; LOAD:0%"); - break; - case LOADING: - jobInfo.add("ETL:100%; LOAD:" + loadJob.getProgress() + "%"); - break; - case QUORUM_FINISHED: - jobInfo.add("ETL:100%; LOAD:100%"); - break; - case FINISHED: - jobInfo.add("ETL:100%; LOAD:100%"); - break; - case CANCELLED: - jobInfo.add("ETL:N/A; LOAD:N/A"); - break; - default: - jobInfo.add("ETL:N/A; LOAD:N/A"); - break; - } - - // type - jobInfo.add(loadJob.getEtlJobType().name()); - - // etl info - EtlStatus status = loadJob.getEtlJobStatus(); - if (status == null || status.getState() == TEtlState.CANCELLED) { - jobInfo.add(FeConstants.null_string); - } else { - Map counters = status.getCounters(); - List info = Lists.newArrayList(); - for (String key : counters.keySet()) { - // XXX: internal etl job return all counters - if (key.equalsIgnoreCase("HDFS bytes read") - || key.equalsIgnoreCase("Map input records") - || key.startsWith("dpp.") - || loadJob.getEtlJobType() == EtlJobType.MINI) { - info.add(key + "=" + counters.get(key)); - } - } // end for counters - if (info.isEmpty()) { - jobInfo.add(FeConstants.null_string); - } else { - jobInfo.add(StringUtils.join(info, "; ")); - } - } - - // task info - jobInfo.add("cluster:" + loadJob.getHadoopCluster() - + "; timeout(s):" + loadJob.getTimeoutSecond() - + "; max_filter_ratio:" + loadJob.getMaxFilterRatio()); - - // error msg - if (loadJob.getState() == JobState.CANCELLED) { - FailMsg failMsg = loadJob.getFailMsg(); - jobInfo.add("type:" + failMsg.getCancelType() + "; msg:" + failMsg.getMsg()); - } else { - jobInfo.add(FeConstants.null_string); - } - - // create time - jobInfo.add(TimeUtils.longToTimeString(loadJob.getCreateTimeMs())); - // etl start time - jobInfo.add(TimeUtils.longToTimeString(loadJob.getEtlStartTimeMs())); - // etl end time - jobInfo.add(TimeUtils.longToTimeString(loadJob.getEtlFinishTimeMs())); - // load start time - jobInfo.add(TimeUtils.longToTimeString(loadJob.getLoadStartTimeMs())); - // load end time - jobInfo.add(TimeUtils.longToTimeString(loadJob.getLoadFinishTimeMs())); - // tracking url - jobInfo.add(status.getTrackingUrl()); - // job detail(not used for hadoop load, just return an empty string) - jobInfo.add(""); - // transaction id - jobInfo.add(loadJob.getTransactionId()); - // error tablets(not used for hadoop load, just return an empty string) - jobInfo.add(""); - // user - jobInfo.add(loadJob.getUser()); - // comment - jobInfo.add(loadJob.getComment()); - - loadJobInfos.add(jobInfo); + loadJobInfos.add(composeJobInfoByLoadJob(loadJob)); } // end for loadJobs LOG.debug("finished to get load job info, cost: {}", (System.currentTimeMillis() - start)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index 1eb5d24a757872..2457dacdcade4d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -43,7 +43,9 @@ import org.apache.doris.load.FailMsg; import org.apache.doris.load.FailMsg.CancelType; import org.apache.doris.load.Load; +import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.persist.CleanLabelOperationLog; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.OriginStatement; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.DatabaseTransactionMgr; @@ -66,6 +68,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; +import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -345,6 +348,30 @@ public int getLoadJobNum(JobState jobState, long dbId) { } } + /** + * Get load job num, used by proc. + **/ + public int getLoadJobNum(JobState jobState) { + readLock(); + try { + Map> labelToLoadJobs = new HashMap<>(); + for (Long dbId : dbIdToLabelToLoadJobs.keySet()) { + if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), + Env.getCurrentEnv().getCatalogMgr().getDbNullable(dbId).getFullName(), + PrivPredicate.LOAD)) { + continue; + } + + labelToLoadJobs.putAll(dbIdToLabelToLoadJobs.get(dbId)); + } + + List loadJobList = + labelToLoadJobs.values().stream().flatMap(entity -> entity.stream()).collect(Collectors.toList()); + return (int) loadJobList.stream().filter(entity -> entity.getState() == jobState).count(); + } finally { + readUnlock(); + } + } /** * Get load job num, used by metric. @@ -538,6 +565,40 @@ public List> getLoadJobInfosByDb(long dbId, String labelValue, } } + public List> getAllLoadJobInfos() { + LinkedList> loadJobInfos = new LinkedList>(); + + readLock(); + try { + Map> labelToLoadJobs = new HashMap<>(); + for (Long dbId : dbIdToLabelToLoadJobs.keySet()) { + if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), + Env.getCurrentEnv().getCatalogMgr().getDbNullable(dbId).getFullName(), + PrivPredicate.LOAD)) { + continue; + } + + labelToLoadJobs.putAll(dbIdToLabelToLoadJobs.get(dbId)); + } + List loadJobList = Lists.newArrayList(); + loadJobList.addAll( + labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList())); + + // check state + for (LoadJob loadJob : loadJobList) { + try { + // add load job info + loadJobInfos.add(loadJob.getShowInfo()); + } catch (DdlException e) { + continue; + } + } + return loadJobInfos; + } finally { + readUnlock(); + } + } + /** * Get load job info. **/