diff --git a/be/src/exec/schema_scanner.cpp b/be/src/exec/schema_scanner.cpp index 375252501edab4..32197e37e1e21b 100644 --- a/be/src/exec/schema_scanner.cpp +++ b/be/src/exec/schema_scanner.cpp @@ -35,6 +35,7 @@ #include "exec/schema_scanner/schema_files_scanner.h" #include "exec/schema_scanner/schema_metadata_name_ids_scanner.h" #include "exec/schema_scanner/schema_partitions_scanner.h" +#include "exec/schema_scanner/schema_processlist_scanner.h" #include "exec/schema_scanner/schema_profiling_scanner.h" #include "exec/schema_scanner/schema_rowsets_scanner.h" #include "exec/schema_scanner/schema_schema_privileges_scanner.h" @@ -158,6 +159,8 @@ std::unique_ptr SchemaScanner::create(TSchemaTableType::type type return SchemaActiveQueriesScanner::create_unique(); case TSchemaTableType::SCH_WORKLOAD_GROUPS: return SchemaWorkloadGroupsScanner::create_unique(); + case TSchemaTableType::SCH_PROCESSLIST: + return SchemaProcessListScanner::create_unique(); default: return SchemaDummyScanner::create_unique(); break; diff --git a/be/src/exec/schema_scanner/schema_helper.cpp b/be/src/exec/schema_scanner/schema_helper.cpp index b1d6900e0f5c2f..965dc7a6eb4556 100644 --- a/be/src/exec/schema_scanner/schema_helper.cpp +++ b/be/src/exec/schema_scanner/schema_helper.cpp @@ -35,6 +35,8 @@ class TListPrivilegesResult; class TListTableStatusResult; class TShowVariableRequest; class TShowVariableResult; +class TShowProcessListRequest; +class TShowProcessListResult; Status SchemaHelper::get_db_names(const std::string& ip, const int32_t port, const TGetDbsParams& request, TGetDbsResult* result) { @@ -123,4 +125,13 @@ std::string SchemaHelper::extract_db_name(const std::string& full_name) { return std::string(full_name.c_str() + found, full_name.size() - found); } +Status SchemaHelper::show_process_list(const std::string& ip, const int32_t port, + const TShowProcessListRequest& request, + TShowProcessListResult* result) { + return ThriftRpcHelper::rpc( + ip, port, [&request, &result](FrontendServiceConnection& client) { + client->showProcessList(*result, request); + }); +} + } // namespace doris diff --git a/be/src/exec/schema_scanner/schema_helper.h b/be/src/exec/schema_scanner/schema_helper.h index c0143136115bf1..5602e3c005f081 100644 --- a/be/src/exec/schema_scanner/schema_helper.h +++ b/be/src/exec/schema_scanner/schema_helper.h @@ -37,6 +37,8 @@ class TListTableStatusResult; class TListTableMetadataNameIdsResult; class TShowVariableRequest; class TShowVariableResult; +class TShowProcessListRequest; +class TShowProcessListResult; // this class is a helper for getting schema info from FE class SchemaHelper { @@ -76,6 +78,10 @@ class SchemaHelper { TListPrivilegesResult* privileges_result); static std::string extract_db_name(const std::string& full_name); + + static Status show_process_list(const std::string& ip, const int32_t port, + const TShowProcessListRequest& request, + TShowProcessListResult* result); }; } // namespace doris diff --git a/be/src/exec/schema_scanner/schema_processlist_scanner.cpp b/be/src/exec/schema_scanner/schema_processlist_scanner.cpp new file mode 100644 index 00000000000000..2ecc2be9e01978 --- /dev/null +++ b/be/src/exec/schema_scanner/schema_processlist_scanner.cpp @@ -0,0 +1,149 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/schema_scanner/schema_processlist_scanner.h" + +#include + +#include + +#include "exec/schema_scanner/schema_helper.h" +#include "runtime/runtime_state.h" +#include "vec/common/string_ref.h" +#include "vec/core/block.h" +#include "vec/data_types/data_type_factory.hpp" + +namespace doris { + +std::vector SchemaProcessListScanner::_s_processlist_columns = { + {"ID", TYPE_LARGEINT, sizeof(int128_t), false}, + {"USER", TYPE_VARCHAR, sizeof(StringRef), false}, + {"HOST", TYPE_VARCHAR, sizeof(StringRef), false}, + {"CATALOG", TYPE_VARCHAR, sizeof(StringRef), false}, + {"DB", TYPE_VARCHAR, sizeof(StringRef), false}, + {"COMMAND", TYPE_VARCHAR, sizeof(StringRef), false}, + {"TIME", TYPE_INT, sizeof(int32_t), false}, + {"STATE", TYPE_VARCHAR, sizeof(StringRef), false}, + {"INFO", TYPE_VARCHAR, sizeof(StringRef), false}}; + +SchemaProcessListScanner::SchemaProcessListScanner() + : SchemaScanner(_s_processlist_columns, TSchemaTableType::SCH_PROCESSLIST) {} + +SchemaProcessListScanner::~SchemaProcessListScanner() = default; + +Status SchemaProcessListScanner::start(RuntimeState* state) { + TShowProcessListRequest request; + request.__set_show_full_sql(true); + + RETURN_IF_ERROR(SchemaHelper::show_process_list(*(_param->common_param->ip), + _param->common_param->port, request, + &_process_list_result)); + + return Status::OK(); +} + +Status SchemaProcessListScanner::get_next_block(vectorized::Block* block, bool* eos) { + if (!_is_init) { + return Status::InternalError("call this before initial."); + } + if (block == nullptr || eos == nullptr) { + return Status::InternalError("invalid parameter."); + } + + *eos = true; + if (_process_list_result.process_list.empty()) { + return Status::OK(); + } + + return _fill_block_impl(block); +} + +Status SchemaProcessListScanner::_fill_block_impl(vectorized::Block* block) { + SCOPED_TIMER(_fill_block_timer); + + const auto& process_list = _process_list_result.process_list; + size_t row_num = process_list.size(); + if (row_num == 0) { + return Status::OK(); + } + + for (size_t col_idx = 0; col_idx < _s_processlist_columns.size(); ++col_idx) { + std::vector str_refs(row_num); + std::vector int_vals(row_num); + std::vector datas(row_num); + std::vector column_values( + row_num); // Store the strings to ensure their lifetime + + for (size_t row_idx = 0; row_idx < row_num; ++row_idx) { + const auto& row = process_list[row_idx]; + + // Fetch and store the column value based on its index + std::string& column_value = + column_values[row_idx]; // Reference to the actual string in the vector + + switch (col_idx) { + case 0: + column_value = row.size() > 1 ? row[1] : ""; + break; // ID + case 1: + column_value = row.size() > 2 ? row[2] : ""; + break; // USER + case 2: + column_value = row.size() > 3 ? row[3] : ""; + break; // HOST + case 3: + column_value = row.size() > 5 ? row[5] : ""; + break; // CATALOG + case 4: + column_value = row.size() > 6 ? row[6] : ""; + break; // DB + case 5: + column_value = row.size() > 7 ? row[7] : ""; + break; // COMMAND + case 6: + column_value = row.size() > 8 ? row[8] : ""; + break; // TIME + case 7: + column_value = row.size() > 9 ? row[9] : ""; + break; // STATE + case 8: + column_value = row.size() > 11 ? row[11] : ""; + break; // INFO + default: + column_value = ""; + break; + } + + if (_s_processlist_columns[col_idx].type == TYPE_LARGEINT || + _s_processlist_columns[col_idx].type == TYPE_INT) { + int128_t val = !column_value.empty() ? std::stoll(column_value) : 0; + int_vals[row_idx] = val; + datas[row_idx] = &int_vals[row_idx]; + } else { + str_refs[row_idx] = + StringRef(column_values[row_idx].data(), column_values[row_idx].size()); + datas[row_idx] = &str_refs[row_idx]; + } + } + + RETURN_IF_ERROR(fill_dest_column_for_range(block, col_idx, datas)); + } + + return Status::OK(); +} + +} // namespace doris diff --git a/be/src/exec/schema_scanner/schema_processlist_scanner.h b/be/src/exec/schema_scanner/schema_processlist_scanner.h new file mode 100644 index 00000000000000..8aae87e1ef6d0f --- /dev/null +++ b/be/src/exec/schema_scanner/schema_processlist_scanner.h @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "common/status.h" +#include "exec/schema_scanner.h" +#include "gen_cpp/FrontendService_types.h" + +namespace doris { + +class RuntimeState; + +namespace vectorized { +class Block; +} + +class SchemaProcessListScanner : public SchemaScanner { + ENABLE_FACTORY_CREATOR(SchemaProcessListScanner); + +public: + SchemaProcessListScanner(); + ~SchemaProcessListScanner() override; + + Status start(RuntimeState* state) override; + Status get_next_block(vectorized::Block* block, bool* eos) override; + + static std::vector _s_processlist_columns; + +private: + Status _fill_block_impl(vectorized::Block* block); + + TShowProcessListResult _process_list_result; +}; + +} // namespace doris diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java index 5654805619f2bf..4747bd3bf536c3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java @@ -485,6 +485,17 @@ public class SchemaTable extends Table { .column("MAX_REMOTE_SCAN_THREAD_NUM", ScalarType.createType(PrimitiveType.BIGINT)) .column("MIN_REMOTE_SCAN_THREAD_NUM", ScalarType.createType(PrimitiveType.BIGINT)) .build())) + .put("processlist", new SchemaTable(SystemIdGenerator.getNextId(), "processlist", TableType.SCHEMA, + builder().column("ID", ScalarType.createType(PrimitiveType.LARGEINT)) + .column("USER", ScalarType.createVarchar(32)) + .column("HOST", ScalarType.createVarchar(261)) + .column("CATALOG", ScalarType.createVarchar(64)) + .column("DB", ScalarType.createVarchar(64)) + .column("COMMAND", ScalarType.createVarchar(16)) + .column("TIME", ScalarType.createType(PrimitiveType.INT)) + .column("STATE", ScalarType.createVarchar(64)) + .column("INFO", ScalarType.createVarchar(ScalarType.MAX_VARCHAR_LENGTH)) + .build())) .build(); protected SchemaTable(long id, String name, TableType type, List baseSchema) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java index 7e8e8c3322484d..8480154f243e51 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java @@ -87,7 +87,7 @@ public void testRefreshCatalog() throws Exception { List dbNames2 = test1.getDbNames(); Assertions.assertEquals(4, dbNames2.size()); ExternalInfoSchemaDatabase infoDb = (ExternalInfoSchemaDatabase) test1.getDb(InfoSchemaDb.DATABASE_NAME).get(); - Assertions.assertEquals(30, infoDb.getTables().size()); + Assertions.assertEquals(31, infoDb.getTables().size()); TestExternalDatabase testDb = (TestExternalDatabase) test1.getDb("db1").get(); Assertions.assertEquals(2, testDb.getTables().size()); @@ -96,7 +96,7 @@ public void testRefreshCatalog() throws Exception { CatalogMgr mgr2 = GsonUtils.GSON.fromJson(json, CatalogMgr.class); test1 = mgr2.getCatalog("test1"); infoDb = (ExternalInfoSchemaDatabase) test1.getDb(InfoSchemaDb.DATABASE_NAME).get(); - Assertions.assertEquals(30, infoDb.getTables().size()); + Assertions.assertEquals(31, infoDb.getTables().size()); testDb = (TestExternalDatabase) test1.getDb("db1").get(); Assertions.assertEquals(2, testDb.getTables().size()); } diff --git a/regression-test/data/external_table_p0/jdbc/test_mariadb_jdbc_catalog.out b/regression-test/data/external_table_p0/jdbc/test_mariadb_jdbc_catalog.out index 2a009b0bd8fb2f..cea4cfe4aef802 100644 --- a/regression-test/data/external_table_p0/jdbc/test_mariadb_jdbc_catalog.out +++ b/regression-test/data/external_table_p0/jdbc/test_mariadb_jdbc_catalog.out @@ -43,6 +43,7 @@ key_column_usage metadata_name_ids parameters partitions +processlist profiling referential_constraints routines diff --git a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out index 95b2ac788f842d..470826a7016b71 100644 --- a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out +++ b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out @@ -207,6 +207,7 @@ key_column_usage metadata_name_ids parameters partitions +processlist profiling referential_constraints routines diff --git a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog_nereids.out b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog_nereids.out index 6b895c059f1e99..09714901c14e5d 100644 --- a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog_nereids.out +++ b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog_nereids.out @@ -175,6 +175,7 @@ key_column_usage metadata_name_ids parameters partitions +processlist profiling referential_constraints routines diff --git a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_driver5_catalog.out b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_driver5_catalog.out index 8fb34e0e336724..b3c63311cc3765 100644 --- a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_driver5_catalog.out +++ b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_driver5_catalog.out @@ -217,6 +217,7 @@ key_column_usage metadata_name_ids parameters partitions +processlist profiling referential_constraints routines