Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions be/src/vec/exec/scan/vmeta_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ Status VMetaScanner::_fetch_metadata(const TMetaScanRange& meta_scan_range) {
case TMetadataType::CATALOGS:
RETURN_IF_ERROR(_build_catalogs_metadata_request(meta_scan_range, &request));
break;
case TMetadataType::QUERIES:
RETURN_IF_ERROR(_build_queries_metadata_request(meta_scan_range, &request));
break;
default:
_meta_eos = true;
return Status::OK();
Expand Down Expand Up @@ -372,6 +375,25 @@ Status VMetaScanner::_build_catalogs_metadata_request(const TMetaScanRange& meta
return Status::OK();
}

Status VMetaScanner::_build_queries_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request) {
VLOG_CRITICAL << "VMetaScanner::_build_queries_metadata_request";
if (!meta_scan_range.__isset.queries_params) {
return Status::InternalError("Can not find TQueriesMetadataParams from meta_scan_range.");
}
// create request
request->__set_cluster_name("");
request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE);

// create TMetadataTableRequestParams
TMetadataTableRequestParams metadata_table_params;
metadata_table_params.__set_metadata_type(TMetadataType::QUERIES);
metadata_table_params.__set_queries_metadata_params(meta_scan_range.queries_params);

request->__set_metada_table_params(metadata_table_params);
return Status::OK();
}

Status VMetaScanner::close(RuntimeState* state) {
VLOG_CRITICAL << "VMetaScanner::close";
RETURN_IF_ERROR(VScanner::close(state));
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/exec/scan/vmeta_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ class VMetaScanner : public VScanner {
TFetchSchemaTableDataRequest* request);
Status _build_catalogs_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request);
Status _build_queries_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request);
bool _meta_eos;
TupleId _tuple_id;
TUserIdentity _user_identity;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.DiskUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.qe.ConnectContext;
Expand Down Expand Up @@ -87,6 +88,23 @@ public static void getFrontendsInfo(Env env, String detailType, List<List<String
}
}

public static List<Pair<String, Integer>> getFrontendWithRpcPort(Env env, boolean includeSelf) {
List<Pair<String, Integer>> allFe = new ArrayList<>();
List<Frontend> frontends = env.getFrontends(null);

String selfNode = Env.getCurrentEnv().getSelfNode().getHost();
if (ConnectContext.get() != null && !Strings.isNullOrEmpty(ConnectContext.get().getCurrentConnectedFEIp())) {
selfNode = ConnectContext.get().getCurrentConnectedFEIp();
}

String finalSelfNode = selfNode;
frontends.stream()
.filter(fe -> (!fe.getHost().equals(finalSelfNode) || includeSelf))
.map(fe -> Pair.of(fe.getHost(), fe.getRpcPort()))
.forEach(allFe::add);
return allFe;
}

public static void getFrontendsInfo(Env env, List<List<String>> infos) {
InetSocketAddress master = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,23 @@

import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.proc.FrontendsProcNode;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.property.constants.HMSProperties;
import org.apache.doris.planner.external.iceberg.IcebergMetadataCache;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryDetail;
import org.apache.doris.qe.QueryDetailQueue;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.FrontendService;
import org.apache.doris.thrift.TBackendsMetadataParams;
import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TFetchSchemaTableDataRequest;
Expand All @@ -37,6 +44,8 @@
import org.apache.doris.thrift.TIcebergQueryType;
import org.apache.doris.thrift.TMetadataTableRequestParams;
import org.apache.doris.thrift.TMetadataType;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TQueriesMetadataParams;
import org.apache.doris.thrift.TRow;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
Expand All @@ -57,6 +66,7 @@

import java.time.Instant;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -92,6 +102,9 @@ public static TFetchSchemaTableDataResult getMetadataTable(TFetchSchemaTableData
case CATALOGS:
result = catalogsMetadataResult(params);
break;
case QUERIES:
result = queriesMetadataResult(params, request);
break;
default:
return errorResult("Metadata table params is not set.");
}
Expand Down Expand Up @@ -352,6 +365,96 @@ private static TFetchSchemaTableDataResult workloadGroupsMetadataResult(TMetadat
return result;
}

private static TFetchSchemaTableDataResult queriesMetadataResult(TMetadataTableRequestParams params,
TFetchSchemaTableDataRequest parentRequest) {
if (!params.isSetQueriesMetadataParams()) {
return errorResult("queries metadata param is not set.");
}

TQueriesMetadataParams queriesMetadataParams = params.getQueriesMetadataParams();
TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();

String selfNode = Env.getCurrentEnv().getSelfNode().getHost();
if (ConnectContext.get() != null && !Strings.isNullOrEmpty(ConnectContext.get().getCurrentConnectedFEIp())) {
selfNode = ConnectContext.get().getCurrentConnectedFEIp();
}
selfNode = NetUtils.getHostnameByIp(selfNode);

List<TRow> dataBatch = Lists.newArrayList();
List<QueryDetail> queries = QueryDetailQueue.getQueryDetails(0L);
for (QueryDetail query : queries) {
TRow trow = new TRow();
trow.addToColumnValue(new TCell().setStringVal(query.getQueryId()));
trow.addToColumnValue(new TCell().setLongVal(query.getStartTime()));
trow.addToColumnValue(new TCell().setLongVal(query.getEndTime()));
trow.addToColumnValue(new TCell().setLongVal(query.getEventTime()));
if (query.getState() == QueryDetail.QueryMemState.RUNNING) {
trow.addToColumnValue(new TCell().setLongVal(System.currentTimeMillis() - query.getStartTime()));
} else {
trow.addToColumnValue(new TCell().setLongVal(query.getLatency()));
}
trow.addToColumnValue(new TCell().setStringVal(query.getState().toString()));
trow.addToColumnValue(new TCell().setStringVal(query.getDatabase()));
trow.addToColumnValue(new TCell().setStringVal(query.getSql()));
trow.addToColumnValue(new TCell().setStringVal(selfNode));
dataBatch.add(trow);
}

/* Get the query results from other FE also */
if (queriesMetadataParams.isRelayToOtherFe()) {
TFetchSchemaTableDataRequest relayRequest = new TFetchSchemaTableDataRequest(parentRequest);
TMetadataTableRequestParams relayParams = new TMetadataTableRequestParams(params);
TQueriesMetadataParams relayQueryParams = new TQueriesMetadataParams(queriesMetadataParams);

relayQueryParams.setRelayToOtherFe(false);
relayParams.setQueriesMetadataParams(relayQueryParams);
relayRequest.setMetadaTableParams(relayParams);

List<TFetchSchemaTableDataResult> relayResults = forwardToOtherFrontends(relayRequest);
relayResults
.forEach(rs -> rs.getDataBatch()
.forEach(row -> dataBatch.add(row)));
}

result.setDataBatch(dataBatch);
result.setStatus(new TStatus(TStatusCode.OK));
return result;
}

private static List<TFetchSchemaTableDataResult> forwardToOtherFrontends(TFetchSchemaTableDataRequest request) {
List<TFetchSchemaTableDataResult> results = new ArrayList<>();
List<Pair<String, Integer>> frontends = FrontendsProcNode.getFrontendWithRpcPort(Env.getCurrentEnv(), false);

FrontendService.Client client = null;
int waitTimeOut = ConnectContext.get() == null ? 300 : ConnectContext.get().getExecTimeout();
for (Pair<String, Integer> fe : frontends) {
TNetworkAddress thriftAddress = new TNetworkAddress(fe.key(), fe.value());
try {
client = ClientPool.frontendPool.borrowObject(thriftAddress, waitTimeOut * 1000);
} catch (Exception e) {
LOG.warn("Failed to get frontend {} client. exception: {}", fe.key(), e);
continue;
}

boolean isReturnToPool = false;
try {
TFetchSchemaTableDataResult result = client.fetchSchemaTableData(request);
results.add(result);
isReturnToPool = true;
} catch (Exception e) {
LOG.warn("Failed to finish forward fetch operation to fe: {} . exception: {}", fe.key(), e);
} finally {
if (isReturnToPool) {
ClientPool.frontendPool.returnObject(thriftAddress, client);
} else {
ClientPool.frontendPool.invalidateObject(thriftAddress, client);
}
}
}

return results;
}

private static void filterColumns(TFetchSchemaTableDataResult result,
List<String> columnNames, TMetadataType type) throws TException {
List<TRow> fullColumnsRow = result.getDataBatch();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public static Integer getColumnIndexFromColumnName(TMetadataType type, String co
return WorkloadGroupsTableValuedFunction.getColumnIndexFromColumnName(columnName);
case CATALOGS:
return CatalogsTableValuedFunction.getColumnIndexFromColumnName(columnName);
case QUERIES:
return QueriesTableValuedFunction.getColumnIndexFromColumnName(columnName);
default:
throw new AnalysisException("Unknown Metadata TableValuedFunction type");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.tablefunction;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.thrift.TMetaScanRange;
import org.apache.doris.thrift.TMetadataType;
import org.apache.doris.thrift.TQueriesMetadataParams;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

import java.util.List;
import java.util.Map;

public class QueriesTableValuedFunction extends MetadataTableValuedFunction {
public static final String NAME = "queries";

private static final ImmutableList<Column> SCHEMA = ImmutableList.of(
new Column("QueryId", ScalarType.createStringType()),
new Column("StartTime", PrimitiveType.BIGINT),
new Column("EndTime", PrimitiveType.BIGINT, true),
new Column("EventTime", PrimitiveType.BIGINT, true),
new Column("Latency", PrimitiveType.BIGINT),
new Column("State", ScalarType.createStringType()),
new Column("Database", ScalarType.createStringType(), true),
new Column("Sql", ScalarType.createStringType()),
new Column("FrontendInstance", ScalarType.createStringType()));

private static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;

static {
ImmutableMap.Builder<String, Integer> builder = new ImmutableMap.Builder();
for (int i = 0; i < SCHEMA.size(); i++) {
builder.put(SCHEMA.get(i).getName().toLowerCase(), i);
}
COLUMN_TO_INDEX = builder.build();
}

public static Integer getColumnIndexFromColumnName(String columnName) {
return COLUMN_TO_INDEX.get(columnName.toLowerCase());
}

public QueriesTableValuedFunction(Map<String, String> params) throws AnalysisException {
if (params.size() != 0) {
throw new AnalysisException("Queries table-valued-function does not support any params");
}
}

@Override
public TMetadataType getMetadataType() {
return TMetadataType.QUERIES;
}

@Override
public TMetaScanRange getMetaScanRange() {
TMetaScanRange metaScanRange = new TMetaScanRange();
metaScanRange.setMetadataType(TMetadataType.QUERIES);
TQueriesMetadataParams queriesMetadataParams = new TQueriesMetadataParams();
queriesMetadataParams.setClusterName("");
queriesMetadataParams.setRelayToOtherFe(true);
metaScanRange.setQueriesParams(queriesMetadataParams);
return metaScanRange;
}

@Override
public String getTableName() {
return "QueriesTableValuedFunction";
}

@Override
public List<Column> getTableColumns() throws AnalysisException {
return SCHEMA;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ public static TableValuedFunctionIf getTableFunction(String funcName, Map<String
return new CatalogsTableValuedFunction(params);
case GroupCommitTableValuedFunction.NAME:
return new GroupCommitTableValuedFunction(params);
case QueriesTableValuedFunction.NAME:
return new QueriesTableValuedFunction(params);
default:
throw new AnalysisException("Could not find table function " + funcName);
}
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,7 @@ struct TMetadataTableRequestParams {
4: optional list<string> columns_name
5: optional PlanNodes.TFrontendsMetadataParams frontends_metadata_params
6: optional Types.TUserIdentity current_user_ident
7: optional PlanNodes.TQueriesMetadataParams queries_metadata_params
}

struct TFetchSchemaTableDataRequest {
Expand Down
6 changes: 6 additions & 0 deletions gensrc/thrift/PlanNodes.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -469,11 +469,17 @@ struct TFrontendsMetadataParams {
1: optional string cluster_name
}

struct TQueriesMetadataParams {
1: optional string cluster_name
2: optional bool relay_to_other_fe
}

struct TMetaScanRange {
1: optional Types.TMetadataType metadata_type
2: optional TIcebergMetadataParams iceberg_params
3: optional TBackendsMetadataParams backends_params
4: optional TFrontendsMetadataParams frontends_params
5: optional TQueriesMetadataParams queries_params;
}

// Specification of an individual data range which is held in its entirety
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/Types.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,7 @@ enum TMetadataType {
FRONTENDS,
CATALOGS,
FRONTENDS_DISKS,
QUERIES,
}

enum TIcebergQueryType {
Expand Down