Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,37 +45,30 @@ This function is used in FROM clauses.

active_queries() table schema:
```
mysql [(none)]> desc function active_queries();
+------------------------+--------+------+-------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+------------------------+--------+------+-------+---------+-------+
| BeHost | TEXT | No | false | NULL | NONE |
| BePort | BIGINT | No | false | NULL | NONE |
| QueryId | TEXT | No | false | NULL | NONE |
| StartTime | TEXT | No | false | NULL | NONE |
| QueryTimeMs | BIGINT | No | false | NULL | NONE |
| WorkloadGroupId | BIGINT | No | false | NULL | NONE |
| QueryCpuTimeMs | BIGINT | No | false | NULL | NONE |
| ScanRows | BIGINT | No | false | NULL | NONE |
| ScanBytes | BIGINT | No | false | NULL | NONE |
| BePeakMemoryBytes | BIGINT | No | false | NULL | NONE |
| CurrentUsedMemoryBytes | BIGINT | No | false | NULL | NONE |
| Database | TEXT | No | false | NULL | NONE |
| FrontendInstance | TEXT | No | false | NULL | NONE |
| Sql | TEXT | No | false | NULL | NONE |
+------------------------+--------+------+-------+---------+-------+
14 rows in set (0.00 sec)
mysql [(none)]>desc function active_queries();
+------------------+--------+------+-------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+------------------+--------+------+-------+---------+-------+
| QueryId | TEXT | No | false | NULL | NONE |
| StartTime | TEXT | No | false | NULL | NONE |
| QueryTimeMs | BIGINT | No | false | NULL | NONE |
| WorkloadGroupId | BIGINT | No | false | NULL | NONE |
| Database | TEXT | No | false | NULL | NONE |
| FrontendInstance | TEXT | No | false | NULL | NONE |
| Sql | TEXT | No | false | NULL | NONE |
+------------------+--------+------+-------+---------+-------+
7 rows in set (0.00 sec)
```

### example
```
mysql [(none)]>select * from active_queries();
+------------+--------+----------------------------------+---------------------+-------------+-----------------+----------------+----------+------------+-------------------+------------------------+----------+------------------+-------+
| BeHost | BePort | QueryId | StartTime | QueryTimeMs | WorkloadGroupId | QueryCpuTimeMs | ScanRows | ScanBytes | BePeakMemoryBytes | CurrentUsedMemoryBytes | Database | FrontendInstance | Sql |
+------------+--------+----------------------------------+---------------------+-------------+-----------------+----------------+----------+------------+-------------------+------------------------+----------+------------------+-------+
| 127.0.0.1 | 6090 | 71fd11b7b0e438c-bc98434b97b8cb98 | 2024-01-16 16:21:15 | 7260 | 10002 | 8392 | 16082249 | 4941889536 | 360470040 | 360420915 | hits | localhost | SELECT xxxx |
+------------+--------+----------------------------------+---------------------+-------------+-----------------+----------------+----------+------------+-------------------+------------------------+----------+------------------+-------+
1 row in set (0.01 sec)
+-----------------------------------+---------------------+-------------+-----------------+----------+------------------+--------------------------------+
| QueryId | StartTime | QueryTimeMs | WorkloadGroupId | Database | FrontendInstance | Sql |
+-----------------------------------+---------------------+-------------+-----------------+----------+------------------+--------------------------------+
| a84bf9f3ea6348e1-ac542839f8f2af5d | 2024-03-04 17:33:09 | 9 | 10002 | | localhost | select * from active_queries() |
+-----------------------------------+---------------------+-------------+-----------------+----------+------------------+--------------------------------+
1 row in set (0.03 sec)
```

### keywords
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,37 +45,30 @@ active_queries

active_queries()表结构:
```
mysql [(none)]> desc function active_queries();
+------------------------+--------+------+-------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+------------------------+--------+------+-------+---------+-------+
| BeHost | TEXT | No | false | NULL | NONE |
| BePort | BIGINT | No | false | NULL | NONE |
| QueryId | TEXT | No | false | NULL | NONE |
| StartTime | TEXT | No | false | NULL | NONE |
| QueryTimeMs | BIGINT | No | false | NULL | NONE |
| WorkloadGroupId | BIGINT | No | false | NULL | NONE |
| QueryCpuTimeMs | BIGINT | No | false | NULL | NONE |
| ScanRows | BIGINT | No | false | NULL | NONE |
| ScanBytes | BIGINT | No | false | NULL | NONE |
| BePeakMemoryBytes | BIGINT | No | false | NULL | NONE |
| CurrentUsedMemoryBytes | BIGINT | No | false | NULL | NONE |
| Database | TEXT | No | false | NULL | NONE |
| FrontendInstance | TEXT | No | false | NULL | NONE |
| Sql | TEXT | No | false | NULL | NONE |
+------------------------+--------+------+-------+---------+-------+
14 rows in set (0.00 sec)
mysql [(none)]>desc function active_queries();
+------------------+--------+------+-------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+------------------+--------+------+-------+---------+-------+
| QueryId | TEXT | No | false | NULL | NONE |
| StartTime | TEXT | No | false | NULL | NONE |
| QueryTimeMs | BIGINT | No | false | NULL | NONE |
| WorkloadGroupId | BIGINT | No | false | NULL | NONE |
| Database | TEXT | No | false | NULL | NONE |
| FrontendInstance | TEXT | No | false | NULL | NONE |
| Sql | TEXT | No | false | NULL | NONE |
+------------------+--------+------+-------+---------+-------+
7 rows in set (0.00 sec)
```

### example
```
mysql [(none)]>select * from active_queries();
+------------+--------+----------------------------------+---------------------+-------------+-----------------+----------------+----------+------------+-------------------+------------------------+----------+------------------+-------+
| BeHost | BePort | QueryId | StartTime | QueryTimeMs | WorkloadGroupId | QueryCpuTimeMs | ScanRows | ScanBytes | BePeakMemoryBytes | CurrentUsedMemoryBytes | Database | FrontendInstance | Sql |
+------------+--------+----------------------------------+---------------------+-------------+-----------------+----------------+----------+------------+-------------------+------------------------+----------+------------------+-------+
| 127.0.0.1 | 6090 | 71fd11b7b0e438c-bc98434b97b8cb98 | 2024-01-16 16:21:15 | 7260 | 10002 | 8392 | 16082249 | 4941889536 | 360470040 | 360420915 | hits | localhost | SELECT xxxx |
+------------+--------+----------------------------------+---------------------+-------------+-----------------+----------------+----------+------------+-------------------+------------------------+----------+------------------+-------+
1 row in set (0.01 sec)
+-----------------------------------+---------------------+-------------+-----------------+----------+------------------+--------------------------------+
| QueryId | StartTime | QueryTimeMs | WorkloadGroupId | Database | FrontendInstance | Sql |
+-----------------------------------+---------------------+-------------+-----------------+----------+------------------+--------------------------------+
| a84bf9f3ea6348e1-ac542839f8f2af5d | 2024-03-04 17:33:09 | 9 | 10002 | | localhost | select * from active_queries() |
+-----------------------------------+---------------------+-------------+-----------------+----------+------------------+--------------------------------+
1 row in set (0.03 sec)
```

### keywords
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.InsertStreamTxnExecutor;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.qe.QeProcessorImpl.QueryInfo;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.StmtExecutor;
Expand Down Expand Up @@ -222,7 +223,8 @@ public void executeSingleInsertTransaction(StmtExecutor executor, long jobId) {
coordinator.setLoadZeroTolerance(ctx.getSessionVariable().getEnableInsertStrict());
coordinator.setQueryType(TQueryType.LOAD);
executor.getProfile().setExecutionProfile(coordinator.getExecutionProfile());
QeProcessorImpl.INSTANCE.registerQuery(ctx.queryId(), coordinator);
QueryInfo queryInfo = new QueryInfo(ConnectContext.get(), executor.getOriginStmtInString(), coordinator);
QeProcessorImpl.INSTANCE.registerQuery(ctx.queryId(), queryInfo);
coordinator.exec();
int execTimeout = ctx.getExecTimeout();
if (LOG.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,10 @@ public void setTWorkloadGroups(List<TPipelineWorkloadGroup> tWorkloadGroups) {
this.tWorkloadGroups = tWorkloadGroups;
}

public List<TPipelineWorkloadGroup> gettWorkloadGroups() {
return tWorkloadGroups;
}

private List<TPipelineWorkloadGroup> tWorkloadGroups = Lists.newArrayList();

private final ExecutionProfile executionProfile;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@
import org.apache.doris.proto.Types;
import org.apache.doris.qe.CommonResultSet.CommonResultSetMetaData;
import org.apache.doris.qe.ConnectContext.ConnectType;
import org.apache.doris.qe.QeProcessorImpl.QueryInfo;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.cache.Cache;
import org.apache.doris.qe.cache.CacheAnalyzer;
Expand Down Expand Up @@ -2053,8 +2054,8 @@ private void handleInsertStmt() throws Exception {
coord.setLoadZeroTolerance(context.getSessionVariable().getEnableInsertStrict());
coord.setQueryType(TQueryType.LOAD);
profile.setExecutionProfile(coord.getExecutionProfile());

QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), coord);
QueryInfo queryInfo = new QueryInfo(ConnectContext.get(), this.getOriginStmtInString(), coord);
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), queryInfo);

Table table = insertStmt.getTargetTable();
if (table instanceof OlapTable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,10 @@ public class ActiveQueriesTableValuedFunction extends MetadataTableValuedFunctio
public static final String NAME = "active_queries";

private static final ImmutableList<Column> SCHEMA = ImmutableList.of(
new Column("BeHost", ScalarType.createStringType()),
new Column("BePort", PrimitiveType.BIGINT),
new Column("QueryId", ScalarType.createStringType()),
new Column("StartTime", ScalarType.createStringType()),
new Column("QueryTimeMs", PrimitiveType.BIGINT),
new Column("WorkloadGroupId", PrimitiveType.BIGINT),
new Column("QueryCpuTimeMs", PrimitiveType.BIGINT),
new Column("ScanRows", PrimitiveType.BIGINT),
new Column("ScanBytes", PrimitiveType.BIGINT),
new Column("BePeakMemoryBytes", PrimitiveType.BIGINT),
new Column("CurrentUsedMemoryBytes", PrimitiveType.BIGINT),
new Column("ShuffleSendBytes", PrimitiveType.BIGINT),
new Column("ShuffleSendRows", PrimitiveType.BIGINT),
new Column("Database", ScalarType.createStringType()),
new Column("FrontendInstance", ScalarType.createStringType()),
new Column("Sql", ScalarType.createStringType()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.doris.thrift.TMetadataTableRequestParams;
import org.apache.doris.thrift.TMetadataType;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TQueriesMetadataParams;
import org.apache.doris.thrift.TQueryStatistics;
import org.apache.doris.thrift.TRow;
Expand Down Expand Up @@ -83,7 +84,6 @@
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

public class MetadataGenerator {
Expand Down Expand Up @@ -517,7 +517,7 @@ private static TRow makeQueryStatisticsTRow(SimpleDateFormat sdf, String queryId
}

private static TFetchSchemaTableDataResult queriesMetadataResult(TMetadataTableRequestParams params,
TFetchSchemaTableDataRequest parentRequest) {
TFetchSchemaTableDataRequest parentRequest) {
if (!params.isSetQueriesMetadataParams()) {
return errorResult("queries metadata param is not set.");
}
Expand All @@ -531,37 +531,35 @@ private static TFetchSchemaTableDataResult queriesMetadataResult(TMetadataTableR
}
selfNode = NetUtils.getHostnameByIp(selfNode);

// get query
Map<Long, Map<String, TQueryStatistics>> beQsMap = Env.getCurrentEnv().getWorkloadRuntimeStatusMgr()
.getBeQueryStatsMap();
Set<Long> beIdSet = beQsMap.keySet();

List<TRow> dataBatch = Lists.newArrayList();
Map<String, QueryInfo> queryInfoMap = QeProcessorImpl.INSTANCE.getQueryInfoMap();

SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
for (Long beId : beIdSet) {
Map<String, TQueryStatistics> qsMap = beQsMap.get(beId);
if (qsMap == null) {
continue;
for (Map.Entry<String, QueryInfo> entry : queryInfoMap.entrySet()) {
String queryId = entry.getKey();
QueryInfo queryInfo = entry.getValue();

TRow trow = new TRow();
trow.addToColumnValue(new TCell().setStringVal(queryId));

String strDate = sdf.format(new Date(queryInfo.getStartExecTime()));
trow.addToColumnValue(new TCell().setStringVal(strDate));
trow.addToColumnValue(new TCell().setLongVal(System.currentTimeMillis() - queryInfo.getStartExecTime()));

List<TPipelineWorkloadGroup> tgroupList = queryInfo.getCoord().gettWorkloadGroups();
if (tgroupList != null && tgroupList.size() == 1) {
trow.addToColumnValue(new TCell().setLongVal(tgroupList.get(0).id));
} else {
trow.addToColumnValue(new TCell().setLongVal(-1));
}
Set<String> queryIdSet = qsMap.keySet();
for (String queryId : queryIdSet) {
QueryInfo queryInfo = queryInfoMap.get(queryId);
if (queryInfo == null) {
continue;
}
//todo(wb) add connect context for insert select
if (queryInfo.getConnectContext() != null && !Env.getCurrentEnv().getAccessManager()
.checkDbPriv(queryInfo.getConnectContext(), queryInfo.getConnectContext().getDatabase(),
PrivPredicate.SELECT)) {
continue;
}
TQueryStatistics qs = qsMap.get(queryId);
Backend be = Env.getCurrentEnv().getClusterInfo().getBackend(beId);
TRow tRow = makeQueryStatisticsTRow(sdf, queryId, be, selfNode, queryInfo, qs);
dataBatch.add(tRow);

if (queryInfo.getConnectContext() != null) {
trow.addToColumnValue(new TCell().setStringVal(queryInfo.getConnectContext().getDatabase()));
} else {
trow.addToColumnValue(new TCell().setStringVal(""));
}
trow.addToColumnValue(new TCell().setStringVal(selfNode));
trow.addToColumnValue(new TCell().setStringVal(queryInfo.getSql()));
dataBatch.add(trow);
}

/* Get the query results from other FE also */
Expand Down