diff --git a/be/src/exec/schema_scanner.h b/be/src/exec/schema_scanner.h index 874b4a34b7014c..75ca8ed7e93898 100644 --- a/be/src/exec/schema_scanner.h +++ b/be/src/exec/schema_scanner.h @@ -55,6 +55,7 @@ struct SchemaScannerParam { int64_t thread_id; const std::string* catalog; std::unique_ptr profile; + std::set fe_addr_list; SchemaScannerParam() : db(nullptr), diff --git a/be/src/exec/schema_scanner/schema_processlist_scanner.cpp b/be/src/exec/schema_scanner/schema_processlist_scanner.cpp index 964f05db9d8771..53d5b7008c5f45 100644 --- a/be/src/exec/schema_scanner/schema_processlist_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_processlist_scanner.cpp @@ -55,8 +55,14 @@ Status SchemaProcessListScanner::start(RuntimeState* state) { TShowProcessListRequest request; request.__set_show_full_sql(true); - RETURN_IF_ERROR(SchemaHelper::show_process_list(*(_param->ip), _param->port, request, - &_process_list_result)); + for (const auto& fe_addr : _param->fe_addr_list) { + TShowProcessListResult tmp_ret; + RETURN_IF_ERROR( + SchemaHelper::show_process_list(fe_addr.hostname, fe_addr.port, request, &tmp_ret)); + _process_list_result.process_list.insert(_process_list_result.process_list.end(), + tmp_ret.process_list.begin(), + tmp_ret.process_list.end()); + } return Status::OK(); } diff --git a/be/src/vec/exec/vschema_scan_node.cpp b/be/src/vec/exec/vschema_scan_node.cpp index 226d1e1c0038d0..03c7aacd5d7f6e 100644 --- a/be/src/vec/exec/vschema_scan_node.cpp +++ b/be/src/vec/exec/vschema_scan_node.cpp @@ -101,6 +101,18 @@ Status VSchemaScanNode::init(const TPlanNode& tnode, RuntimeState* state) { if (tnode.schema_scan_node.__isset.catalog) { _scanner_param.catalog = _pool->add(new std::string(tnode.schema_scan_node.catalog)); } + + if (tnode.schema_scan_node.__isset.fe_addr_list) { + for (const auto& fe_addr : tnode.schema_scan_node.fe_addr_list) { + _scanner_param.fe_addr_list.insert(fe_addr); + } + } else if (tnode.schema_scan_node.__isset.ip && tnode.schema_scan_node.__isset.port) { + TNetworkAddress fe_addr; + fe_addr.hostname = tnode.schema_scan_node.ip; + fe_addr.port = tnode.schema_scan_node.port; + _scanner_param.fe_addr_list.insert(fe_addr); + } + return Status::OK(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java index 74d59b4cd831e2..31110af16f1592 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java @@ -459,13 +459,20 @@ public class SchemaTable extends Table { .column("QUERY_ID", ScalarType.createVarchar(256)) .column("INFO", ScalarType.createVarchar(ScalarType.MAX_VARCHAR_LENGTH)) .column("FE", - ScalarType.createVarchar(64)).build())) + ScalarType.createVarchar(64)).build(), true)) .build(); + private boolean fetchAllFe = false; + protected SchemaTable(long id, String name, TableType type, List baseSchema) { super(id, name, type, baseSchema); } + protected SchemaTable(long id, String name, TableType type, List baseSchema, boolean fetchAllFe) { + this(id, name, type, baseSchema); + this.fetchAllFe = fetchAllFe; + } + @Override public void write(DataOutput out) throws IOException { throw new UnsupportedOperationException("Do not allow to write SchemaTable to image."); @@ -479,6 +486,14 @@ public static Builder builder() { return new Builder(); } + public static boolean isShouldFetchAllFe(String schemaTableName) { + Table table = TABLE_MAP.get(schemaTableName); + if (table != null && table instanceof SchemaTable) { + return ((SchemaTable) table).fetchAllFe; + } + return false; + } + /** * For TABLE_MAP. **/ diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java index 4a8a488dfc3963..7fc0af84c2ce52 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java @@ -19,6 +19,7 @@ import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.SchemaTable; import org.apache.doris.common.Config; import org.apache.doris.common.UserException; @@ -28,6 +29,8 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.service.FrontendOptions; import org.apache.doris.statistics.StatisticalType; +import org.apache.doris.system.Frontend; +import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPlanNode; import org.apache.doris.thrift.TPlanNodeType; import org.apache.doris.thrift.TScanRangeLocations; @@ -39,6 +42,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.ArrayList; import java.util.List; /** @@ -91,6 +95,21 @@ public void finalizeForNereids() throws UserException { frontendPort = Config.rpc_port; } + private void setFeAddrList(TPlanNode msg) { + if (SchemaTable.isShouldFetchAllFe(tableName)) { + List feAddrList = new ArrayList(); + if (ConnectContext.get().getSessionVariable().showAllFeConnection) { + List feList = Env.getCurrentEnv().getFrontends(null); + for (Frontend fe : feList) { + feAddrList.add(new TNetworkAddress(fe.getHost(), fe.getRpcPort())); + } + } else { + feAddrList.add(new TNetworkAddress(frontendIP, frontendPort)); + } + msg.schema_scan_node.setFeAddrList(feAddrList); + } + } + @Override protected void toThrift(TPlanNode msg) { msg.node_type = TPlanNodeType.SCHEMA_SCAN_NODE; @@ -127,6 +146,7 @@ protected void toThrift(TPlanNode msg) { TUserIdentity tCurrentUser = ConnectContext.get().getCurrentUserIdentity().toThrift(); msg.schema_scan_node.setCurrentUserIdent(tCurrentUser); + setFeAddrList(msg); } @Override diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index ca608ca86ca6d8..fae675b00b985d 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -614,6 +614,7 @@ struct TSchemaScanNode { 12: optional bool show_hidden_cloumns = false // 13: optional list table_structure // deprecated 14: optional string catalog + 15: optional list fe_addr_list } struct TMetaScanNode {