From 49cadf86a26ec049e5f2843a608ed40399c0e9d0 Mon Sep 17 00:00:00 2001 From: wangbo Date: Tue, 5 Mar 2024 13:51:36 +0800 Subject: [PATCH] Refactor active queries (#31742) --- .../table-functions/active_queries.md | 45 +++++++--------- .../table-functions/active_queries.md | 45 +++++++--------- .../trees/plans/commands/InsertExecutor.java | 4 +- .../java/org/apache/doris/qe/Coordinator.java | 4 ++ .../org/apache/doris/qe/StmtExecutor.java | 5 +- .../ActiveQueriesTableValuedFunction.java | 9 ---- .../tablefunction/MetadataGenerator.java | 54 +++++++++---------- 7 files changed, 74 insertions(+), 92 deletions(-) diff --git a/docs/en/docs/sql-manual/sql-functions/table-functions/active_queries.md b/docs/en/docs/sql-manual/sql-functions/table-functions/active_queries.md index 35a71b5eb60bce..cbc0e20845d00a 100644 --- a/docs/en/docs/sql-manual/sql-functions/table-functions/active_queries.md +++ b/docs/en/docs/sql-manual/sql-functions/table-functions/active_queries.md @@ -45,37 +45,30 @@ This function is used in FROM clauses. active_queries() table schema: ``` -mysql [(none)]> desc function active_queries(); -+------------------------+--------+------+-------+---------+-------+ -| Field | Type | Null | Key | Default | Extra | -+------------------------+--------+------+-------+---------+-------+ -| BeHost | TEXT | No | false | NULL | NONE | -| BePort | BIGINT | No | false | NULL | NONE | -| QueryId | TEXT | No | false | NULL | NONE | -| StartTime | TEXT | No | false | NULL | NONE | -| QueryTimeMs | BIGINT | No | false | NULL | NONE | -| WorkloadGroupId | BIGINT | No | false | NULL | NONE | -| QueryCpuTimeMs | BIGINT | No | false | NULL | NONE | -| ScanRows | BIGINT | No | false | NULL | NONE | -| ScanBytes | BIGINT | No | false | NULL | NONE | -| BePeakMemoryBytes | BIGINT | No | false | NULL | NONE | -| CurrentUsedMemoryBytes | BIGINT | No | false | NULL | NONE | -| Database | TEXT | No | false | NULL | NONE | -| FrontendInstance | TEXT | No | false | NULL | NONE | -| Sql | TEXT | No | false | NULL | NONE | -+------------------------+--------+------+-------+---------+-------+ -14 rows in set (0.00 sec) +mysql [(none)]>desc function active_queries(); ++------------------+--------+------+-------+---------+-------+ +| Field | Type | Null | Key | Default | Extra | ++------------------+--------+------+-------+---------+-------+ +| QueryId | TEXT | No | false | NULL | NONE | +| StartTime | TEXT | No | false | NULL | NONE | +| QueryTimeMs | BIGINT | No | false | NULL | NONE | +| WorkloadGroupId | BIGINT | No | false | NULL | NONE | +| Database | TEXT | No | false | NULL | NONE | +| FrontendInstance | TEXT | No | false | NULL | NONE | +| Sql | TEXT | No | false | NULL | NONE | ++------------------+--------+------+-------+---------+-------+ +7 rows in set (0.00 sec) ``` ### example ``` mysql [(none)]>select * from active_queries(); -+------------+--------+----------------------------------+---------------------+-------------+-----------------+----------------+----------+------------+-------------------+------------------------+----------+------------------+-------+ -| BeHost | BePort | QueryId | StartTime | QueryTimeMs | WorkloadGroupId | QueryCpuTimeMs | ScanRows | ScanBytes | BePeakMemoryBytes | CurrentUsedMemoryBytes | Database | FrontendInstance | Sql | -+------------+--------+----------------------------------+---------------------+-------------+-----------------+----------------+----------+------------+-------------------+------------------------+----------+------------------+-------+ -| 127.0.0.1 | 6090 | 71fd11b7b0e438c-bc98434b97b8cb98 | 2024-01-16 16:21:15 | 7260 | 10002 | 8392 | 16082249 | 4941889536 | 360470040 | 360420915 | hits | localhost | SELECT xxxx | -+------------+--------+----------------------------------+---------------------+-------------+-----------------+----------------+----------+------------+-------------------+------------------------+----------+------------------+-------+ -1 row in set (0.01 sec) ++-----------------------------------+---------------------+-------------+-----------------+----------+------------------+--------------------------------+ +| QueryId | StartTime | QueryTimeMs | WorkloadGroupId | Database | FrontendInstance | Sql | ++-----------------------------------+---------------------+-------------+-----------------+----------+------------------+--------------------------------+ +| a84bf9f3ea6348e1-ac542839f8f2af5d | 2024-03-04 17:33:09 | 9 | 10002 | | localhost | select * from active_queries() | ++-----------------------------------+---------------------+-------------+-----------------+----------+------------------+--------------------------------+ +1 row in set (0.03 sec) ``` ### keywords diff --git a/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/active_queries.md b/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/active_queries.md index bdae08285f285c..feda3c128ca2fe 100644 --- a/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/active_queries.md +++ b/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/active_queries.md @@ -45,37 +45,30 @@ active_queries active_queries()表结构: ``` -mysql [(none)]> desc function active_queries(); -+------------------------+--------+------+-------+---------+-------+ -| Field | Type | Null | Key | Default | Extra | -+------------------------+--------+------+-------+---------+-------+ -| BeHost | TEXT | No | false | NULL | NONE | -| BePort | BIGINT | No | false | NULL | NONE | -| QueryId | TEXT | No | false | NULL | NONE | -| StartTime | TEXT | No | false | NULL | NONE | -| QueryTimeMs | BIGINT | No | false | NULL | NONE | -| WorkloadGroupId | BIGINT | No | false | NULL | NONE | -| QueryCpuTimeMs | BIGINT | No | false | NULL | NONE | -| ScanRows | BIGINT | No | false | NULL | NONE | -| ScanBytes | BIGINT | No | false | NULL | NONE | -| BePeakMemoryBytes | BIGINT | No | false | NULL | NONE | -| CurrentUsedMemoryBytes | BIGINT | No | false | NULL | NONE | -| Database | TEXT | No | false | NULL | NONE | -| FrontendInstance | TEXT | No | false | NULL | NONE | -| Sql | TEXT | No | false | NULL | NONE | -+------------------------+--------+------+-------+---------+-------+ -14 rows in set (0.00 sec) +mysql [(none)]>desc function active_queries(); ++------------------+--------+------+-------+---------+-------+ +| Field | Type | Null | Key | Default | Extra | ++------------------+--------+------+-------+---------+-------+ +| QueryId | TEXT | No | false | NULL | NONE | +| StartTime | TEXT | No | false | NULL | NONE | +| QueryTimeMs | BIGINT | No | false | NULL | NONE | +| WorkloadGroupId | BIGINT | No | false | NULL | NONE | +| Database | TEXT | No | false | NULL | NONE | +| FrontendInstance | TEXT | No | false | NULL | NONE | +| Sql | TEXT | No | false | NULL | NONE | ++------------------+--------+------+-------+---------+-------+ +7 rows in set (0.00 sec) ``` ### example ``` mysql [(none)]>select * from active_queries(); -+------------+--------+----------------------------------+---------------------+-------------+-----------------+----------------+----------+------------+-------------------+------------------------+----------+------------------+-------+ -| BeHost | BePort | QueryId | StartTime | QueryTimeMs | WorkloadGroupId | QueryCpuTimeMs | ScanRows | ScanBytes | BePeakMemoryBytes | CurrentUsedMemoryBytes | Database | FrontendInstance | Sql | -+------------+--------+----------------------------------+---------------------+-------------+-----------------+----------------+----------+------------+-------------------+------------------------+----------+------------------+-------+ -| 127.0.0.1 | 6090 | 71fd11b7b0e438c-bc98434b97b8cb98 | 2024-01-16 16:21:15 | 7260 | 10002 | 8392 | 16082249 | 4941889536 | 360470040 | 360420915 | hits | localhost | SELECT xxxx | -+------------+--------+----------------------------------+---------------------+-------------+-----------------+----------------+----------+------------+-------------------+------------------------+----------+------------------+-------+ -1 row in set (0.01 sec) ++-----------------------------------+---------------------+-------------+-----------------+----------+------------------+--------------------------------+ +| QueryId | StartTime | QueryTimeMs | WorkloadGroupId | Database | FrontendInstance | Sql | ++-----------------------------------+---------------------+-------------+-----------------+----------+------------------+--------------------------------+ +| a84bf9f3ea6348e1-ac542839f8f2af5d | 2024-03-04 17:33:09 | 9 | 10002 | | localhost | select * from active_queries() | ++-----------------------------------+---------------------+-------------+-----------------+----------+------------------+--------------------------------+ +1 row in set (0.03 sec) ``` ### keywords diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java index 2c5ee559f3da7b..da4a1c6ac687e4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java @@ -69,6 +69,7 @@ import org.apache.doris.qe.Coordinator; import org.apache.doris.qe.InsertStreamTxnExecutor; import org.apache.doris.qe.QeProcessorImpl; +import org.apache.doris.qe.QeProcessorImpl.QueryInfo; import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.qe.SessionVariable; import org.apache.doris.qe.StmtExecutor; @@ -222,7 +223,8 @@ public void executeSingleInsertTransaction(StmtExecutor executor, long jobId) { coordinator.setLoadZeroTolerance(ctx.getSessionVariable().getEnableInsertStrict()); coordinator.setQueryType(TQueryType.LOAD); executor.getProfile().setExecutionProfile(coordinator.getExecutionProfile()); - QeProcessorImpl.INSTANCE.registerQuery(ctx.queryId(), coordinator); + QueryInfo queryInfo = new QueryInfo(ConnectContext.get(), executor.getOriginStmtInString(), coordinator); + QeProcessorImpl.INSTANCE.registerQuery(ctx.queryId(), queryInfo); coordinator.exec(); int execTimeout = ctx.getExecTimeout(); if (LOG.isDebugEnabled()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 8bf1b3813badae..6e016d94e2a5c1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -267,6 +267,10 @@ public void setTWorkloadGroups(List tWorkloadGroups) { this.tWorkloadGroups = tWorkloadGroups; } + public List gettWorkloadGroups() { + return tWorkloadGroups; + } + private List tWorkloadGroups = Lists.newArrayList(); private final ExecutionProfile executionProfile; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 808ec0de09d17b..063cbf0ad710ff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -147,6 +147,7 @@ import org.apache.doris.proto.Types; import org.apache.doris.qe.CommonResultSet.CommonResultSetMetaData; import org.apache.doris.qe.ConnectContext.ConnectType; +import org.apache.doris.qe.QeProcessorImpl.QueryInfo; import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.qe.cache.Cache; import org.apache.doris.qe.cache.CacheAnalyzer; @@ -2053,8 +2054,8 @@ private void handleInsertStmt() throws Exception { coord.setLoadZeroTolerance(context.getSessionVariable().getEnableInsertStrict()); coord.setQueryType(TQueryType.LOAD); profile.setExecutionProfile(coord.getExecutionProfile()); - - QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), coord); + QueryInfo queryInfo = new QueryInfo(ConnectContext.get(), this.getOriginStmtInString(), coord); + QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), queryInfo); Table table = insertStmt.getTargetTable(); if (table instanceof OlapTable) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java index 41dd5484dd569c..ebc0ffa1121038 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java @@ -35,19 +35,10 @@ public class ActiveQueriesTableValuedFunction extends MetadataTableValuedFunctio public static final String NAME = "active_queries"; private static final ImmutableList SCHEMA = ImmutableList.of( - new Column("BeHost", ScalarType.createStringType()), - new Column("BePort", PrimitiveType.BIGINT), new Column("QueryId", ScalarType.createStringType()), new Column("StartTime", ScalarType.createStringType()), new Column("QueryTimeMs", PrimitiveType.BIGINT), new Column("WorkloadGroupId", PrimitiveType.BIGINT), - new Column("QueryCpuTimeMs", PrimitiveType.BIGINT), - new Column("ScanRows", PrimitiveType.BIGINT), - new Column("ScanBytes", PrimitiveType.BIGINT), - new Column("BePeakMemoryBytes", PrimitiveType.BIGINT), - new Column("CurrentUsedMemoryBytes", PrimitiveType.BIGINT), - new Column("ShuffleSendBytes", PrimitiveType.BIGINT), - new Column("ShuffleSendRows", PrimitiveType.BIGINT), new Column("Database", ScalarType.createStringType()), new Column("FrontendInstance", ScalarType.createStringType()), new Column("Sql", ScalarType.createStringType())); diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java index 87aabe5bc092c2..24cb4a365a99af 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java @@ -56,6 +56,7 @@ import org.apache.doris.thrift.TMetadataTableRequestParams; import org.apache.doris.thrift.TMetadataType; import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TPipelineWorkloadGroup; import org.apache.doris.thrift.TQueriesMetadataParams; import org.apache.doris.thrift.TQueryStatistics; import org.apache.doris.thrift.TRow; @@ -83,7 +84,6 @@ import java.util.Date; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.TimeUnit; public class MetadataGenerator { @@ -517,7 +517,7 @@ private static TRow makeQueryStatisticsTRow(SimpleDateFormat sdf, String queryId } private static TFetchSchemaTableDataResult queriesMetadataResult(TMetadataTableRequestParams params, - TFetchSchemaTableDataRequest parentRequest) { + TFetchSchemaTableDataRequest parentRequest) { if (!params.isSetQueriesMetadataParams()) { return errorResult("queries metadata param is not set."); } @@ -531,37 +531,35 @@ private static TFetchSchemaTableDataResult queriesMetadataResult(TMetadataTableR } selfNode = NetUtils.getHostnameByIp(selfNode); - // get query - Map> beQsMap = Env.getCurrentEnv().getWorkloadRuntimeStatusMgr() - .getBeQueryStatsMap(); - Set beIdSet = beQsMap.keySet(); - List dataBatch = Lists.newArrayList(); Map queryInfoMap = QeProcessorImpl.INSTANCE.getQueryInfoMap(); - SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - for (Long beId : beIdSet) { - Map qsMap = beQsMap.get(beId); - if (qsMap == null) { - continue; + for (Map.Entry entry : queryInfoMap.entrySet()) { + String queryId = entry.getKey(); + QueryInfo queryInfo = entry.getValue(); + + TRow trow = new TRow(); + trow.addToColumnValue(new TCell().setStringVal(queryId)); + + String strDate = sdf.format(new Date(queryInfo.getStartExecTime())); + trow.addToColumnValue(new TCell().setStringVal(strDate)); + trow.addToColumnValue(new TCell().setLongVal(System.currentTimeMillis() - queryInfo.getStartExecTime())); + + List tgroupList = queryInfo.getCoord().gettWorkloadGroups(); + if (tgroupList != null && tgroupList.size() == 1) { + trow.addToColumnValue(new TCell().setLongVal(tgroupList.get(0).id)); + } else { + trow.addToColumnValue(new TCell().setLongVal(-1)); } - Set queryIdSet = qsMap.keySet(); - for (String queryId : queryIdSet) { - QueryInfo queryInfo = queryInfoMap.get(queryId); - if (queryInfo == null) { - continue; - } - //todo(wb) add connect context for insert select - if (queryInfo.getConnectContext() != null && !Env.getCurrentEnv().getAccessManager() - .checkDbPriv(queryInfo.getConnectContext(), queryInfo.getConnectContext().getDatabase(), - PrivPredicate.SELECT)) { - continue; - } - TQueryStatistics qs = qsMap.get(queryId); - Backend be = Env.getCurrentEnv().getClusterInfo().getBackend(beId); - TRow tRow = makeQueryStatisticsTRow(sdf, queryId, be, selfNode, queryInfo, qs); - dataBatch.add(tRow); + + if (queryInfo.getConnectContext() != null) { + trow.addToColumnValue(new TCell().setStringVal(queryInfo.getConnectContext().getDatabase())); + } else { + trow.addToColumnValue(new TCell().setStringVal("")); } + trow.addToColumnValue(new TCell().setStringVal(selfNode)); + trow.addToColumnValue(new TCell().setStringVal(queryInfo.getSql())); + dataBatch.add(trow); } /* Get the query results from other FE also */