From 42cd5b09500433f30f74cfd32a0fd520c6c77e8f Mon Sep 17 00:00:00 2001 From: Nitin Kashyap Date: Mon, 25 Sep 2023 03:38:40 +0530 Subject: [PATCH] [feature](meta) queries as table valued function (#25052) 1. Add queries view as table function. 2. Proxy result to other FEs and return merged results back to BE. --- be/src/vec/exec/scan/vmeta_scanner.cpp | 22 ++++ be/src/vec/exec/scan/vmeta_scanner.h | 2 + .../doris/common/proc/FrontendsProcNode.java | 18 +++ .../tablefunction/MetadataGenerator.java | 103 ++++++++++++++++++ .../MetadataTableValuedFunction.java | 2 + .../QueriesTableValuedFunction.java | 93 ++++++++++++++++ .../tablefunction/TableValuedFunctionIf.java | 2 + gensrc/thrift/FrontendService.thrift | 1 + gensrc/thrift/PlanNodes.thrift | 6 + gensrc/thrift/Types.thrift | 1 + 10 files changed, 250 insertions(+) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/tablefunction/QueriesTableValuedFunction.java diff --git a/be/src/vec/exec/scan/vmeta_scanner.cpp b/be/src/vec/exec/scan/vmeta_scanner.cpp index acccd23042400c..66d4138d48f540 100644 --- a/be/src/vec/exec/scan/vmeta_scanner.cpp +++ b/be/src/vec/exec/scan/vmeta_scanner.cpp @@ -231,6 +231,9 @@ Status VMetaScanner::_fetch_metadata(const TMetaScanRange& meta_scan_range) { case TMetadataType::CATALOGS: RETURN_IF_ERROR(_build_catalogs_metadata_request(meta_scan_range, &request)); break; + case TMetadataType::QUERIES: + RETURN_IF_ERROR(_build_queries_metadata_request(meta_scan_range, &request)); + break; default: _meta_eos = true; return Status::OK(); @@ -372,6 +375,25 @@ Status VMetaScanner::_build_catalogs_metadata_request(const TMetaScanRange& meta return Status::OK(); } +Status VMetaScanner::_build_queries_metadata_request(const TMetaScanRange& meta_scan_range, + TFetchSchemaTableDataRequest* request) { + VLOG_CRITICAL << "VMetaScanner::_build_queries_metadata_request"; + if (!meta_scan_range.__isset.queries_params) { + return Status::InternalError("Can not find TQueriesMetadataParams from meta_scan_range."); + } + // create request + request->__set_cluster_name(""); + request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE); + + // create TMetadataTableRequestParams + TMetadataTableRequestParams metadata_table_params; + metadata_table_params.__set_metadata_type(TMetadataType::QUERIES); + metadata_table_params.__set_queries_metadata_params(meta_scan_range.queries_params); + + request->__set_metada_table_params(metadata_table_params); + return Status::OK(); +} + Status VMetaScanner::close(RuntimeState* state) { VLOG_CRITICAL << "VMetaScanner::close"; RETURN_IF_ERROR(VScanner::close(state)); diff --git a/be/src/vec/exec/scan/vmeta_scanner.h b/be/src/vec/exec/scan/vmeta_scanner.h index a3297f7e9c888f..6c364b94fe2d27 100644 --- a/be/src/vec/exec/scan/vmeta_scanner.h +++ b/be/src/vec/exec/scan/vmeta_scanner.h @@ -81,6 +81,8 @@ class VMetaScanner : public VScanner { TFetchSchemaTableDataRequest* request); Status _build_catalogs_metadata_request(const TMetaScanRange& meta_scan_range, TFetchSchemaTableDataRequest* request); + Status _build_queries_metadata_request(const TMetaScanRange& meta_scan_range, + TFetchSchemaTableDataRequest* request); bool _meta_eos; TupleId _tuple_id; TUserIdentity _user_identity; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java index 0e500259639c15..b49874b6e9c90f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java @@ -19,6 +19,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; +import org.apache.doris.common.Pair; import org.apache.doris.common.io.DiskUtils; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.qe.ConnectContext; @@ -87,6 +88,23 @@ public static void getFrontendsInfo(Env env, String detailType, List> getFrontendWithRpcPort(Env env, boolean includeSelf) { + List> allFe = new ArrayList<>(); + List frontends = env.getFrontends(null); + + String selfNode = Env.getCurrentEnv().getSelfNode().getHost(); + if (ConnectContext.get() != null && !Strings.isNullOrEmpty(ConnectContext.get().getCurrentConnectedFEIp())) { + selfNode = ConnectContext.get().getCurrentConnectedFEIp(); + } + + String finalSelfNode = selfNode; + frontends.stream() + .filter(fe -> (!fe.getHost().equals(finalSelfNode) || includeSelf)) + .map(fe -> Pair.of(fe.getHost(), fe.getRpcPort())) + .forEach(allFe::add); + return allFe; + } + public static void getFrontendsInfo(Env env, List> infos) { InetSocketAddress master = null; try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java index 7b9d3f892eacc1..2abb84cdfb92f0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java @@ -19,16 +19,23 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ClientPool; import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.proc.FrontendsProcNode; +import org.apache.doris.common.util.NetUtils; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.HMSExternalCatalog; import org.apache.doris.datasource.property.constants.HMSProperties; import org.apache.doris.planner.external.iceberg.IcebergMetadataCache; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.QueryDetail; +import org.apache.doris.qe.QueryDetailQueue; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; +import org.apache.doris.thrift.FrontendService; import org.apache.doris.thrift.TBackendsMetadataParams; import org.apache.doris.thrift.TCell; import org.apache.doris.thrift.TFetchSchemaTableDataRequest; @@ -37,6 +44,8 @@ import org.apache.doris.thrift.TIcebergQueryType; import org.apache.doris.thrift.TMetadataTableRequestParams; import org.apache.doris.thrift.TMetadataType; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TQueriesMetadataParams; import org.apache.doris.thrift.TRow; import org.apache.doris.thrift.TStatus; import org.apache.doris.thrift.TStatusCode; @@ -57,6 +66,7 @@ import java.time.Instant; import java.time.LocalDateTime; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -92,6 +102,9 @@ public static TFetchSchemaTableDataResult getMetadataTable(TFetchSchemaTableData case CATALOGS: result = catalogsMetadataResult(params); break; + case QUERIES: + result = queriesMetadataResult(params, request); + break; default: return errorResult("Metadata table params is not set."); } @@ -352,6 +365,96 @@ private static TFetchSchemaTableDataResult workloadGroupsMetadataResult(TMetadat return result; } + private static TFetchSchemaTableDataResult queriesMetadataResult(TMetadataTableRequestParams params, + TFetchSchemaTableDataRequest parentRequest) { + if (!params.isSetQueriesMetadataParams()) { + return errorResult("queries metadata param is not set."); + } + + TQueriesMetadataParams queriesMetadataParams = params.getQueriesMetadataParams(); + TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult(); + + String selfNode = Env.getCurrentEnv().getSelfNode().getHost(); + if (ConnectContext.get() != null && !Strings.isNullOrEmpty(ConnectContext.get().getCurrentConnectedFEIp())) { + selfNode = ConnectContext.get().getCurrentConnectedFEIp(); + } + selfNode = NetUtils.getHostnameByIp(selfNode); + + List dataBatch = Lists.newArrayList(); + List queries = QueryDetailQueue.getQueryDetails(0L); + for (QueryDetail query : queries) { + TRow trow = new TRow(); + trow.addToColumnValue(new TCell().setStringVal(query.getQueryId())); + trow.addToColumnValue(new TCell().setLongVal(query.getStartTime())); + trow.addToColumnValue(new TCell().setLongVal(query.getEndTime())); + trow.addToColumnValue(new TCell().setLongVal(query.getEventTime())); + if (query.getState() == QueryDetail.QueryMemState.RUNNING) { + trow.addToColumnValue(new TCell().setLongVal(System.currentTimeMillis() - query.getStartTime())); + } else { + trow.addToColumnValue(new TCell().setLongVal(query.getLatency())); + } + trow.addToColumnValue(new TCell().setStringVal(query.getState().toString())); + trow.addToColumnValue(new TCell().setStringVal(query.getDatabase())); + trow.addToColumnValue(new TCell().setStringVal(query.getSql())); + trow.addToColumnValue(new TCell().setStringVal(selfNode)); + dataBatch.add(trow); + } + + /* Get the query results from other FE also */ + if (queriesMetadataParams.isRelayToOtherFe()) { + TFetchSchemaTableDataRequest relayRequest = new TFetchSchemaTableDataRequest(parentRequest); + TMetadataTableRequestParams relayParams = new TMetadataTableRequestParams(params); + TQueriesMetadataParams relayQueryParams = new TQueriesMetadataParams(queriesMetadataParams); + + relayQueryParams.setRelayToOtherFe(false); + relayParams.setQueriesMetadataParams(relayQueryParams); + relayRequest.setMetadaTableParams(relayParams); + + List relayResults = forwardToOtherFrontends(relayRequest); + relayResults + .forEach(rs -> rs.getDataBatch() + .forEach(row -> dataBatch.add(row))); + } + + result.setDataBatch(dataBatch); + result.setStatus(new TStatus(TStatusCode.OK)); + return result; + } + + private static List forwardToOtherFrontends(TFetchSchemaTableDataRequest request) { + List results = new ArrayList<>(); + List> frontends = FrontendsProcNode.getFrontendWithRpcPort(Env.getCurrentEnv(), false); + + FrontendService.Client client = null; + int waitTimeOut = ConnectContext.get() == null ? 300 : ConnectContext.get().getExecTimeout(); + for (Pair fe : frontends) { + TNetworkAddress thriftAddress = new TNetworkAddress(fe.key(), fe.value()); + try { + client = ClientPool.frontendPool.borrowObject(thriftAddress, waitTimeOut * 1000); + } catch (Exception e) { + LOG.warn("Failed to get frontend {} client. exception: {}", fe.key(), e); + continue; + } + + boolean isReturnToPool = false; + try { + TFetchSchemaTableDataResult result = client.fetchSchemaTableData(request); + results.add(result); + isReturnToPool = true; + } catch (Exception e) { + LOG.warn("Failed to finish forward fetch operation to fe: {} . exception: {}", fe.key(), e); + } finally { + if (isReturnToPool) { + ClientPool.frontendPool.returnObject(thriftAddress, client); + } else { + ClientPool.frontendPool.invalidateObject(thriftAddress, client); + } + } + } + + return results; + } + private static void filterColumns(TFetchSchemaTableDataResult result, List columnNames, TMetadataType type) throws TException { List fullColumnsRow = result.getDataBatch(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java index c09687c9b4080d..cc84a29f5e9191 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java @@ -41,6 +41,8 @@ public static Integer getColumnIndexFromColumnName(TMetadataType type, String co return WorkloadGroupsTableValuedFunction.getColumnIndexFromColumnName(columnName); case CATALOGS: return CatalogsTableValuedFunction.getColumnIndexFromColumnName(columnName); + case QUERIES: + return QueriesTableValuedFunction.getColumnIndexFromColumnName(columnName); default: throw new AnalysisException("Unknown Metadata TableValuedFunction type"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/QueriesTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/QueriesTableValuedFunction.java new file mode 100644 index 00000000000000..77aeed6c89e9d0 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/QueriesTableValuedFunction.java @@ -0,0 +1,93 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.thrift.TMetaScanRange; +import org.apache.doris.thrift.TMetadataType; +import org.apache.doris.thrift.TQueriesMetadataParams; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import java.util.List; +import java.util.Map; + +public class QueriesTableValuedFunction extends MetadataTableValuedFunction { + public static final String NAME = "queries"; + + private static final ImmutableList SCHEMA = ImmutableList.of( + new Column("QueryId", ScalarType.createStringType()), + new Column("StartTime", PrimitiveType.BIGINT), + new Column("EndTime", PrimitiveType.BIGINT, true), + new Column("EventTime", PrimitiveType.BIGINT, true), + new Column("Latency", PrimitiveType.BIGINT), + new Column("State", ScalarType.createStringType()), + new Column("Database", ScalarType.createStringType(), true), + new Column("Sql", ScalarType.createStringType()), + new Column("FrontendInstance", ScalarType.createStringType())); + + private static final ImmutableMap COLUMN_TO_INDEX; + + static { + ImmutableMap.Builder builder = new ImmutableMap.Builder(); + for (int i = 0; i < SCHEMA.size(); i++) { + builder.put(SCHEMA.get(i).getName().toLowerCase(), i); + } + COLUMN_TO_INDEX = builder.build(); + } + + public static Integer getColumnIndexFromColumnName(String columnName) { + return COLUMN_TO_INDEX.get(columnName.toLowerCase()); + } + + public QueriesTableValuedFunction(Map params) throws AnalysisException { + if (params.size() != 0) { + throw new AnalysisException("Queries table-valued-function does not support any params"); + } + } + + @Override + public TMetadataType getMetadataType() { + return TMetadataType.QUERIES; + } + + @Override + public TMetaScanRange getMetaScanRange() { + TMetaScanRange metaScanRange = new TMetaScanRange(); + metaScanRange.setMetadataType(TMetadataType.QUERIES); + TQueriesMetadataParams queriesMetadataParams = new TQueriesMetadataParams(); + queriesMetadataParams.setClusterName(""); + queriesMetadataParams.setRelayToOtherFe(true); + metaScanRange.setQueriesParams(queriesMetadataParams); + return metaScanRange; + } + + @Override + public String getTableName() { + return "QueriesTableValuedFunction"; + } + + @Override + public List getTableColumns() throws AnalysisException { + return SCHEMA; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java index ab37053c45b83c..90472aa48455db 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java @@ -68,6 +68,8 @@ public static TableValuedFunctionIf getTableFunction(String funcName, Map columns_name 5: optional PlanNodes.TFrontendsMetadataParams frontends_metadata_params 6: optional Types.TUserIdentity current_user_ident + 7: optional PlanNodes.TQueriesMetadataParams queries_metadata_params } struct TFetchSchemaTableDataRequest { diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index f38d245496cbd9..cb656d26defed7 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -469,11 +469,17 @@ struct TFrontendsMetadataParams { 1: optional string cluster_name } +struct TQueriesMetadataParams { + 1: optional string cluster_name + 2: optional bool relay_to_other_fe +} + struct TMetaScanRange { 1: optional Types.TMetadataType metadata_type 2: optional TIcebergMetadataParams iceberg_params 3: optional TBackendsMetadataParams backends_params 4: optional TFrontendsMetadataParams frontends_params + 5: optional TQueriesMetadataParams queries_params; } // Specification of an individual data range which is held in its entirety diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index baca98b228bc8a..f6a138976c4e16 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -692,6 +692,7 @@ enum TMetadataType { FRONTENDS, CATALOGS, FRONTENDS_DISKS, + QUERIES, } enum TIcebergQueryType {