Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion be/src/exec/schema_scanner/schema_active_queries_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,14 @@ namespace doris {
std::vector<SchemaScanner::ColumnDesc> SchemaActiveQueriesScanner::_s_tbls_columns = {
// name, type, size
{"QUERY_ID", TYPE_VARCHAR, sizeof(StringRef), true},
{"START_TIME", TYPE_VARCHAR, sizeof(StringRef), true},
{"QUERY_START_TIME", TYPE_VARCHAR, sizeof(StringRef), true},
{"QUERY_TIME_MS", TYPE_BIGINT, sizeof(int64_t), true},
{"WORKLOAD_GROUP_ID", TYPE_BIGINT, sizeof(int64_t), true},
{"DATABASE", TYPE_VARCHAR, sizeof(StringRef), true},
{"FRONTEND_INSTANCE", TYPE_VARCHAR, sizeof(StringRef), true},
{"QUEUE_START_TIME", TYPE_VARCHAR, sizeof(StringRef), true},
{"QUEUE_END_TIME", TYPE_VARCHAR, sizeof(StringRef), true},
{"QUERY_STATUS", TYPE_VARCHAR, sizeof(StringRef), true},
{"SQL", TYPE_STRING, sizeof(StringRef), true}};

SchemaActiveQueriesScanner::SchemaActiveQueriesScanner()
Expand Down Expand Up @@ -127,6 +130,9 @@ Status SchemaActiveQueriesScanner::_get_active_queries_block_from_fe() {
insert_string_value(4, row.column_value[4].stringVal, _active_query_block.get());
insert_string_value(5, row.column_value[5].stringVal, _active_query_block.get());
insert_string_value(6, row.column_value[6].stringVal, _active_query_block.get());
insert_string_value(7, row.column_value[7].stringVal, _active_query_block.get());
insert_string_value(8, row.column_value[8].stringVal, _active_query_block.get());
insert_string_value(9, row.column_value[9].stringVal, _active_query_block.get());
}
return Status::OK();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,11 +460,14 @@ public class SchemaTable extends Table {
.build()))
.put("active_queries", new SchemaTable(SystemIdGenerator.getNextId(), "active_queries", TableType.SCHEMA,
builder().column("QUERY_ID", ScalarType.createVarchar(256))
.column("START_TIME", ScalarType.createVarchar(256))
.column("QUERY_START_TIME", ScalarType.createVarchar(256))
.column("QUERY_TIME_MS", ScalarType.createType(PrimitiveType.BIGINT))
.column("WORKLOAD_GROUP_ID", ScalarType.createType(PrimitiveType.BIGINT))
.column("DATABASE", ScalarType.createVarchar(256))
.column("FRONTEND_INSTANCE", ScalarType.createVarchar(256))
.column("QUEUE_START_TIME", ScalarType.createVarchar(256))
.column("QUEUE_END_TIME", ScalarType.createVarchar(256))
.column("QUERY_STATUS", ScalarType.createVarchar(256))
.column("SQL", ScalarType.createStringType())
.build()))
.put("workload_groups", new SchemaTable(SystemIdGenerator.getNextId(), "workload_groups", TableType.SCHEMA,
Expand Down
4 changes: 4 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -3906,6 +3906,10 @@ public void appendTo(StringBuilder sb) {
}
}

public QueueToken getQueueToken() {
return queueToken;
}

// fragment instance exec param, it is used to assemble
// the per-instance TPlanFragmentExecParams, as a member of
// FragmentExecParams
Expand Down
29 changes: 26 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.doris.common.profile.ExecutionProfile;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.resource.workloadgroup.QueueToken.TokenState;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TQueryType;
import org.apache.doris.thrift.TReportExecStatusParams;
Expand Down Expand Up @@ -265,7 +266,6 @@ public static final class QueryInfo {
private final ConnectContext connectContext;
private final Coordinator coord;
private final String sql;
private final long startExecTime;

// from Export, Pull load, Insert
public QueryInfo(Coordinator coord) {
Expand All @@ -277,7 +277,6 @@ public QueryInfo(ConnectContext connectContext, String sql, Coordinator coord) {
this.connectContext = connectContext;
this.coord = coord;
this.sql = sql;
this.startExecTime = System.currentTimeMillis();
}

public ConnectContext getConnectContext() {
Expand All @@ -293,7 +292,31 @@ public String getSql() {
}

public long getStartExecTime() {
return startExecTime;
if (coord.getQueueToken() != null) {
return coord.getQueueToken().getQueueEndTime();
}
return -1;
}

public long getQueueStartTime() {
if (coord.getQueueToken() != null) {
return coord.getQueueToken().getQueueStartTime();
}
return -1;
}

public long getQueueEndTime() {
if (coord.getQueueToken() != null) {
return coord.getQueueToken().getQueueEndTime();
}
return -1;
}

public TokenState getQueueStatus() {
if (coord.getQueueToken() != null) {
return coord.getQueueToken().getTokenState();
}
return null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,11 @@ public String getConnId() {

public String getQueryExecTime() {
final long currentTime = System.currentTimeMillis();
return String.valueOf(currentTime - queryStartTime);
if (queryStartTime <= 0) {
return String.valueOf(-1);
} else {
return String.valueOf(currentTime - queryStartTime);
}
}

public String getQueryId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,21 +100,23 @@ public String debugString() {
}

public QueueToken getToken() throws UserException {

queueLock.lock();
try {
if (LOG.isDebugEnabled()) {
LOG.info(this.debugString());
}
if (currentRunningQueryNum < maxConcurrency) {
currentRunningQueryNum++;
return new QueueToken(TokenState.READY_TO_RUN, queueTimeout, "offer success");
QueueToken retToken = new QueueToken(TokenState.READY_TO_RUN, queueTimeout, "offer success");
retToken.setQueueTimeWhenOfferSuccess();
return retToken;
}
if (priorityTokenQueue.size() >= maxQueueSize) {
throw new UserException("query waiting queue is full, queue length=" + maxQueueSize);
}
QueueToken newQueryToken = new QueueToken(TokenState.ENQUEUE_SUCCESS, queueTimeout,
"query wait timeout " + queueTimeout + " ms");
newQueryToken.setQueueTimeWhenQueueSuccess();
this.priorityTokenQueue.offer(newQueryToken);
return newQueryToken;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public int compareTo(QueueToken other) {
return Long.compare(this.tokenId, other.getTokenId());
}

enum TokenState {
public enum TokenState {
ENQUEUE_SUCCESS,
READY_TO_RUN
}
Expand All @@ -56,6 +56,9 @@ enum TokenState {
private final ReentrantLock tokenLock = new ReentrantLock();
private final Condition tokenCond = tokenLock.newCondition();

private long queueStartTime = -1;
private long queueEndTime = -1;

public QueueToken(TokenState tokenState, long queueWaitTimeout,
String offerResultDetail) {
this.tokenId = tokenIdGenerator.addAndGet(1);
Expand Down Expand Up @@ -94,6 +97,7 @@ public boolean waitSignal(long queryTimeoutMillis) throws InterruptedException {
return false;
} finally {
this.tokenLock.unlock();
this.setQueueTimeWhenQueueEnd();
}
}

Expand Down Expand Up @@ -126,6 +130,33 @@ public boolean isReadyToRun() {
return this.tokenState == TokenState.READY_TO_RUN;
}

public void setQueueTimeWhenOfferSuccess() {
long currentTime = System.currentTimeMillis();
this.queueStartTime = currentTime;
this.queueEndTime = currentTime;
}

public void setQueueTimeWhenQueueSuccess() {
long currentTime = System.currentTimeMillis();
this.queueStartTime = currentTime;
}

public void setQueueTimeWhenQueueEnd() {
this.queueEndTime = System.currentTimeMillis();
}

public long getQueueStartTime() {
return queueStartTime;
}

public long getQueueEndTime() {
return queueEndTime;
}

public TokenState getTokenState() {
return tokenState;
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.qe.QeProcessorImpl.QueryInfo;
import org.apache.doris.resource.workloadgroup.QueueToken.TokenState;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.FrontendService;
Expand All @@ -57,7 +58,6 @@
import org.apache.doris.thrift.TMetadataType;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TQueryStatistics;
import org.apache.doris.thrift.TRow;
import org.apache.doris.thrift.TSchemaTableRequestParams;
import org.apache.doris.thrift.TStatus;
Expand Down Expand Up @@ -91,11 +91,14 @@ public class MetadataGenerator {

private static final ImmutableList<Column> ACTIVE_QUERIES_SCHEMA = ImmutableList.of(
new Column("QUERY_ID", ScalarType.createStringType()),
new Column("START_TIME", ScalarType.createStringType()),
new Column("QUERY_START_TIME", ScalarType.createStringType()),
new Column("QUERY_TIME_MS", PrimitiveType.BIGINT),
new Column("WORKLOAD_GROUP_ID", PrimitiveType.BIGINT),
new Column("DATABASE", ScalarType.createStringType()),
new Column("FRONTEND_INSTANCE", ScalarType.createStringType()),
new Column("QUEUE_START_TIME", ScalarType.createStringType()),
new Column("QUEUE_END_TIME", ScalarType.createStringType()),
new Column("QUERY_STATUS", ScalarType.createStringType()),
new Column("SQL", ScalarType.createStringType()));

private static final ImmutableMap<String, Integer> ACTIVE_QUERIES_COLUMN_TO_INDEX;
Expand Down Expand Up @@ -490,53 +493,6 @@ private static TFetchSchemaTableDataResult workloadSchedPolicyMetadataResult(TMe
return result;
}

private static TRow makeQueryStatisticsTRow(SimpleDateFormat sdf, String queryId, Backend be,
String selfNode, QueryInfo queryInfo, TQueryStatistics qs) {
TRow trow = new TRow();
if (be != null) {
trow.addToColumnValue(new TCell().setStringVal(be.getHost()));
trow.addToColumnValue(new TCell().setLongVal(be.getBePort()));
} else {
trow.addToColumnValue(new TCell().setStringVal("invalid host"));
trow.addToColumnValue(new TCell().setLongVal(-1));
}
trow.addToColumnValue(new TCell().setStringVal(queryId));

String strDate = sdf.format(new Date(queryInfo.getStartExecTime()));
trow.addToColumnValue(new TCell().setStringVal(strDate));
trow.addToColumnValue(new TCell().setLongVal(System.currentTimeMillis() - queryInfo.getStartExecTime()));

if (qs != null) {
trow.addToColumnValue(new TCell().setLongVal(qs.workload_group_id));
trow.addToColumnValue(new TCell().setLongVal(qs.cpu_ms));
trow.addToColumnValue(new TCell().setLongVal(qs.scan_rows));
trow.addToColumnValue(new TCell().setLongVal(qs.scan_bytes));
trow.addToColumnValue(new TCell().setLongVal(qs.max_peak_memory_bytes));
trow.addToColumnValue(new TCell().setLongVal(qs.current_used_memory_bytes));
trow.addToColumnValue(new TCell().setLongVal(qs.shuffle_send_bytes));
trow.addToColumnValue(new TCell().setLongVal(qs.shuffle_send_rows));
} else {
trow.addToColumnValue(new TCell().setLongVal(0L));
trow.addToColumnValue(new TCell().setLongVal(0L));
trow.addToColumnValue(new TCell().setLongVal(0L));
trow.addToColumnValue(new TCell().setLongVal(0L));
trow.addToColumnValue(new TCell().setLongVal(0L));
trow.addToColumnValue(new TCell().setLongVal(0L));
trow.addToColumnValue(new TCell().setLongVal(0L));
trow.addToColumnValue(new TCell().setLongVal(0L));
}

if (queryInfo.getConnectContext() != null) {
trow.addToColumnValue(new TCell().setStringVal(queryInfo.getConnectContext().getDatabase()));
} else {
trow.addToColumnValue(new TCell().setStringVal(""));
}
trow.addToColumnValue(new TCell().setStringVal(selfNode));
trow.addToColumnValue(new TCell().setStringVal(queryInfo.getSql()));

return trow;
}

private static TFetchSchemaTableDataResult queriesMetadataResult(TSchemaTableRequestParams tSchemaTableParams,
TFetchSchemaTableDataRequest parentRequest) {
TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
Expand All @@ -557,9 +513,15 @@ private static TFetchSchemaTableDataResult queriesMetadataResult(TSchemaTableReq
TRow trow = new TRow();
trow.addToColumnValue(new TCell().setStringVal(queryId));

String strDate = sdf.format(new Date(queryInfo.getStartExecTime()));
trow.addToColumnValue(new TCell().setStringVal(strDate));
trow.addToColumnValue(new TCell().setLongVal(System.currentTimeMillis() - queryInfo.getStartExecTime()));
long queryStartTime = queryInfo.getStartExecTime();
if (queryStartTime > 0) {
trow.addToColumnValue(new TCell().setStringVal(sdf.format(new Date(queryStartTime))));
trow.addToColumnValue(
new TCell().setLongVal(System.currentTimeMillis() - queryInfo.getStartExecTime()));
} else {
trow.addToColumnValue(new TCell());
trow.addToColumnValue(new TCell().setLongVal(-1));
}

List<TPipelineWorkloadGroup> tgroupList = queryInfo.getCoord().gettWorkloadGroups();
if (tgroupList != null && tgroupList.size() == 1) {
Expand All @@ -574,6 +536,30 @@ private static TFetchSchemaTableDataResult queriesMetadataResult(TSchemaTableReq
trow.addToColumnValue(new TCell().setStringVal(""));
}
trow.addToColumnValue(new TCell().setStringVal(selfNode));

long queueStartTime = queryInfo.getQueueStartTime();
if (queueStartTime > 0) {
trow.addToColumnValue(new TCell().setStringVal(sdf.format(new Date(queueStartTime))));
} else {
trow.addToColumnValue(new TCell());
}

long queueEndTime = queryInfo.getQueueEndTime();
if (queueEndTime > 0) {
trow.addToColumnValue(new TCell().setStringVal(sdf.format(new Date(queueEndTime))));
} else {
trow.addToColumnValue(new TCell());
}

TokenState tokenState = queryInfo.getQueueStatus();
if (tokenState == null) {
trow.addToColumnValue(new TCell());
} else if (tokenState == TokenState.READY_TO_RUN) {
trow.addToColumnValue(new TCell().setStringVal("RUNNING"));
} else {
trow.addToColumnValue(new TCell().setStringVal("QUEUED"));
}

trow.addToColumnValue(new TCell().setStringVal(queryInfo.getSql()));
dataBatch.add(trow);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,19 @@ suite("test_active_queries") {
sql "set experimental_enable_pipeline_engine=false"
sql "set experimental_enable_pipeline_x_engine=false"
sql "select * from information_schema.active_queries"
sql "select QUERY_ID,START_TIME,QUERY_TIME_MS,WORKLOAD_GROUP_ID,SQL from information_schema.active_queries"
sql "select QUERY_ID,QUERY_START_TIME,QUERY_TIME_MS,WORKLOAD_GROUP_ID,SQL,QUERY_STATUS from information_schema.active_queries"

// pipeline
sql "set experimental_enable_pipeline_engine=true"
sql "set experimental_enable_pipeline_x_engine=false"
sql "select * from information_schema.active_queries"
sql "select QUERY_ID,START_TIME,QUERY_TIME_MS,WORKLOAD_GROUP_ID,SQL from information_schema.active_queries"
sql "select QUERY_ID,QUERY_START_TIME,QUERY_TIME_MS,WORKLOAD_GROUP_ID,SQL,QUERY_STATUS from information_schema.active_queries"

// pipelinex
sql "set experimental_enable_pipeline_engine=true"
sql "set experimental_enable_pipeline_x_engine=true"
sql "select * from information_schema.active_queries"
sql "select QUERY_ID,START_TIME,QUERY_TIME_MS,WORKLOAD_GROUP_ID,SQL from information_schema.active_queries"
sql "select QUERY_ID,QUERY_START_TIME,QUERY_TIME_MS,WORKLOAD_GROUP_ID,SQL,QUERY_STATUS from information_schema.active_queries"
Thread.sleep(1000)
}
})
Expand Down