From 32afb7c103fc361d428ca86388f1286255240dbc Mon Sep 17 00:00:00 2001 From: Vallish Date: Sat, 24 Aug 2024 12:30:25 +0000 Subject: [PATCH 1/5] [Enhancement] support information_schema.partitions Co-authored-by: Mingyu Chen --- .../schema_partitions_scanner.cpp | 134 ++++++++++-- .../schema_partitions_scanner.h | 15 +- .../schema_scanner/schema_scanner_helper.cpp | 62 ++++++ .../schema_scanner/schema_scanner_helper.h | 42 ++++ .../doris/catalog/ListPartitionItem.java | 4 + .../org/apache/doris/catalog/Partition.java | 19 ++ .../apache/doris/catalog/PartitionInfo.java | 13 ++ .../apache/doris/catalog/PartitionItem.java | 4 + .../doris/catalog/RangePartitionItem.java | 4 + .../tablefunction/MetadataGenerator.java | 148 ++++++++++++- gensrc/thrift/FrontendService.thrift | 1 + .../system/test_partitions_schema.out | 48 +++++ .../system/test_partitions_schema.groovy | 195 ++++++++++++++++++ 13 files changed, 667 insertions(+), 22 deletions(-) create mode 100644 be/src/exec/schema_scanner/schema_scanner_helper.cpp create mode 100644 be/src/exec/schema_scanner/schema_scanner_helper.h create mode 100644 regression-test/data/query_p0/system/test_partitions_schema.out create mode 100644 regression-test/suites/query_p0/system/test_partitions_schema.groovy diff --git a/be/src/exec/schema_scanner/schema_partitions_scanner.cpp b/be/src/exec/schema_scanner/schema_partitions_scanner.cpp index ea7394e15e12d2..9f86fe6feb49d9 100644 --- a/be/src/exec/schema_scanner/schema_partitions_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_partitions_scanner.cpp @@ -22,10 +22,14 @@ #include #include "exec/schema_scanner/schema_helper.h" -#include "runtime/decimalv2_value.h" -#include "runtime/define_primitive_type.h" -#include "util/runtime_profile.h" +#include "exec/schema_scanner/schema_scanner_helper.h" +#include "runtime/client_cache.h" +#include "runtime/exec_env.h" +#include "runtime/runtime_state.h" +#include "util/thrift_rpc_helper.h" #include "vec/common/string_ref.h" +#include "vec/core/block.h" +#include "vec/data_types/data_type_factory.hpp" namespace doris { class RuntimeState; @@ -63,9 +67,7 @@ std::vector SchemaPartitionsScanner::_s_tbls_columns }; SchemaPartitionsScanner::SchemaPartitionsScanner() - : SchemaScanner(_s_tbls_columns, TSchemaTableType::SCH_PARTITIONS), - _db_index(0), - _table_index(0) {} + : SchemaScanner(_s_tbls_columns, TSchemaTableType::SCH_PARTITIONS) {} SchemaPartitionsScanner::~SchemaPartitionsScanner() {} @@ -75,21 +77,14 @@ Status SchemaPartitionsScanner::start(RuntimeState* state) { } SCOPED_TIMER(_get_db_timer); TGetDbsParams db_params; - if (nullptr != _param->common_param->db) { + if (_param->common_param->db) { db_params.__set_pattern(*(_param->common_param->db)); } - if (nullptr != _param->common_param->catalog) { + if (_param->common_param->catalog) { db_params.__set_catalog(*(_param->common_param->catalog)); } - if (nullptr != _param->common_param->current_user_ident) { + if (_param->common_param->current_user_ident) { db_params.__set_current_user_ident(*(_param->common_param->current_user_ident)); - } else { - if (nullptr != _param->common_param->user) { - db_params.__set_user(*(_param->common_param->user)); - } - if (nullptr != _param->common_param->user_ip) { - db_params.__set_user_ip(*(_param->common_param->user_ip)); - } } if (nullptr != _param->common_param->ip && 0 != _param->common_param->port) { @@ -98,17 +93,122 @@ Status SchemaPartitionsScanner::start(RuntimeState* state) { } else { return Status::InternalError("IP or port doesn't exists"); } + _block_rows_limit = state->batch_size(); + _rpc_timeout_ms = state->execution_timeout() * 1000; + return Status::OK(); +} + +Status SchemaPartitionsScanner::get_onedb_info_from_fe(int64_t dbId) { + TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; + + TSchemaTableRequestParams schema_table_request_params; + for (int i = 0; i < _s_tbls_columns.size(); i++) { + schema_table_request_params.__isset.columns_name = true; + schema_table_request_params.columns_name.emplace_back(_s_tbls_columns[i].name); + } + schema_table_request_params.__set_current_user_ident(*_param->common_param->current_user_ident); + schema_table_request_params.__set_catalog(*_param->common_param->catalog); + schema_table_request_params.__set_dbId(dbId); + + TFetchSchemaTableDataRequest request; + request.__set_schema_table_name(TSchemaTableName::PARTITIONS); + request.__set_schema_table_params(schema_table_request_params); + + TFetchSchemaTableDataResult result; + + RETURN_IF_ERROR(ThriftRpcHelper::rpc( + master_addr.hostname, master_addr.port, + [&request, &result](FrontendServiceConnection& client) { + client->fetchSchemaTableData(result, request); + }, + _rpc_timeout_ms)); + + Status status(Status::create(result.status)); + if (!status.ok()) { + LOG(WARNING) << "fetch table options from FE failed, errmsg=" << status; + return status; + } + std::vector result_data = result.data_batch; + + _partitions_block = vectorized::Block::create_unique(); + for (int i = 0; i < _s_tbls_columns.size(); ++i) { + TypeDescriptor descriptor(_s_tbls_columns[i].type); + auto data_type = vectorized::DataTypeFactory::instance().create_data_type(descriptor, true); + _partitions_block->insert(vectorized::ColumnWithTypeAndName( + data_type->create_column(), data_type, _s_tbls_columns[i].name)); + } + _partitions_block->reserve(_block_rows_limit); + if (result_data.size() > 0) { + int col_size = result_data[0].column_value.size(); + if (col_size != _s_tbls_columns.size()) { + return Status::InternalError("table options schema is not match for FE and BE"); + } + } + + for (int i = 0; i < result_data.size(); i++) { + TRow row = result_data[i]; + + for (int j = 0; j < _s_tbls_columns.size(); j++) { + if ((_s_tbls_columns[j].type == TYPE_BIGINT) || _s_tbls_columns[j].type == TYPE_INT) { + SchemaScannerHelper::insert_int_value(j, row.column_value[j].longVal, + _partitions_block.get()); + } else if (_s_tbls_columns[j].type == TYPE_DATETIME) { + std::vector datas(1); + VecDateTimeValue src[1]; + src[0].from_date_str(row.column_value[j].stringVal.data(), + row.column_value[j].stringVal.size()); + datas[0] = src; + SchemaScannerHelper::insert_datetime_value(j, datas, _partitions_block.get()); + } else { + SchemaScannerHelper::insert_string_value(j, row.column_value[j].stringVal, + _partitions_block.get()); + } + } + } return Status::OK(); } +bool SchemaPartitionsScanner::check_and_mark_eos(bool* eos) const { + if (_row_idx == _total_rows) { + *eos = true; + if (_db_index < _db_result.db_ids.size()) { + *eos = false; + } + return true; + } + return false; +} + Status SchemaPartitionsScanner::get_next_block_internal(vectorized::Block* block, bool* eos) { if (!_is_init) { return Status::InternalError("Used before initialized."); } + if (nullptr == block || nullptr == eos) { return Status::InternalError("input pointer is nullptr."); } - *eos = true; + + if ((_partitions_block == nullptr) || (_row_idx == _total_rows)) { + if (_db_index < _db_result.db_ids.size()) { + RETURN_IF_ERROR(get_onedb_info_from_fe(_db_result.db_ids[_db_index])); + _row_idx = 0; // reset row index so that it start filling for next block. + _total_rows = _partitions_block->rows(); + _db_index++; + } + } + + if (check_and_mark_eos(eos)) { + return Status::OK(); + } + + int current_batch_rows = std::min(_block_rows_limit, _total_rows - _row_idx); + vectorized::MutableBlock mblock = vectorized::MutableBlock::build_mutable_block(block); + RETURN_IF_ERROR(mblock.add_rows(_partitions_block.get(), _row_idx, current_batch_rows)); + _row_idx += current_batch_rows; + + if (!check_and_mark_eos(eos)) { + *eos = false; + } return Status::OK(); } diff --git a/be/src/exec/schema_scanner/schema_partitions_scanner.h b/be/src/exec/schema_scanner/schema_partitions_scanner.h index 87e55db984a3de..3c246f36eecb93 100644 --- a/be/src/exec/schema_scanner/schema_partitions_scanner.h +++ b/be/src/exec/schema_scanner/schema_partitions_scanner.h @@ -40,11 +40,18 @@ class SchemaPartitionsScanner : public SchemaScanner { Status start(RuntimeState* state) override; Status get_next_block_internal(vectorized::Block* block, bool* eos) override; - int _db_index; - int _table_index; - TGetDbsResult _db_result; - TListTableStatusResult _table_result; static std::vector _s_tbls_columns; + +private: + Status get_onedb_info_from_fe(int64_t dbId); + bool check_and_mark_eos(bool* eos) const; + int _block_rows_limit = 4096; + int _db_index = 0; + TGetDbsResult _db_result; + int _row_idx = 0; + int _total_rows = 0; + std::unique_ptr _partitions_block = nullptr; + int _rpc_timeout_ms = 3000; }; } // namespace doris diff --git a/be/src/exec/schema_scanner/schema_scanner_helper.cpp b/be/src/exec/schema_scanner/schema_scanner_helper.cpp new file mode 100644 index 00000000000000..b7d7b085f78cdc --- /dev/null +++ b/be/src/exec/schema_scanner/schema_scanner_helper.cpp @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "schema_scanner_helper.h" + +#include "runtime/client_cache.h" +#include "runtime/exec_env.h" +#include "runtime/runtime_state.h" +#include "util/thrift_rpc_helper.h" +#include "vec/common/string_ref.h" +#include "vec/core/block.h" +#include "vec/data_types/data_type_factory.hpp" + +namespace doris { + +void SchemaScannerHelper::insert_string_value(int col_index, std::string str_val, + vectorized::Block* block) { + vectorized::MutableColumnPtr mutable_col_ptr; + mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable(); + auto* nullable_column = reinterpret_cast(mutable_col_ptr.get()); + vectorized::IColumn* col_ptr = &nullable_column->get_nested_column(); + reinterpret_cast(col_ptr)->insert_data(str_val.data(), + str_val.size()); + nullable_column->get_null_map_data().emplace_back(0); +} + +void SchemaScannerHelper::insert_datetime_value(int col_index, const std::vector& datas, + vectorized::Block* block) { + vectorized::MutableColumnPtr mutable_col_ptr; + mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable(); + auto* nullable_column = reinterpret_cast(mutable_col_ptr.get()); + vectorized::IColumn* col_ptr = &nullable_column->get_nested_column(); + auto data = datas[0]; + reinterpret_cast*>(col_ptr)->insert_data( + reinterpret_cast(data), 0); + nullable_column->get_null_map_data().emplace_back(0); +} + +void SchemaScannerHelper::insert_int_value(int col_index, int64_t int_val, + vectorized::Block* block) { + vectorized::MutableColumnPtr mutable_col_ptr; + mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable(); + auto* nullable_column = reinterpret_cast(mutable_col_ptr.get()); + vectorized::IColumn* col_ptr = &nullable_column->get_nested_column(); + reinterpret_cast*>(col_ptr)->insert_value(int_val); + nullable_column->get_null_map_data().emplace_back(0); +} +} // namespace doris diff --git a/be/src/exec/schema_scanner/schema_scanner_helper.h b/be/src/exec/schema_scanner/schema_scanner_helper.h new file mode 100644 index 00000000000000..126ad33e5f28a3 --- /dev/null +++ b/be/src/exec/schema_scanner/schema_scanner_helper.h @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef _SCHEMA_SCANNER_HELPER_H_ + +#include + +#include +#include + +// this is a util class which can be used by all shema scanner +// all common functions are added in this class. +namespace doris { + +namespace vectorized { +class Block; +} // namespace vectorized +class SchemaScannerHelper { +public: + static void insert_string_value(int col_index, std::string str_val, vectorized::Block* block); + static void insert_datetime_value(int col_index, const std::vector& datas, + vectorized::Block* block); + + static void insert_int_value(int col_index, int64_t int_val, vectorized::Block* block); +}; + +} // namespace doris +#endif diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java index 491bcc74c6d7c9..dba109a9539876 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java @@ -64,6 +64,10 @@ public String getItemsString() { return toString(); } + public String getItemsSql() { + return toSql(); + } + @Override public boolean isDefaultPartition() { return isDefaultPartition; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java index ab2b646ce39ee6..83bb8a18cbd827 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java @@ -450,4 +450,23 @@ public void convertHashDistributionToRandomDistribution() { public boolean isRollupIndex(long id) { return idToVisibleRollupIndex.containsKey(id); } + + + public long getRowCount() { + return getBaseIndex().getRowCount(); + } + + public long getAvgRowLength() { + long rowCount = getBaseIndex().getRowCount(); + long dataSize = getBaseIndex().getDataSize(false); + if (rowCount > 0) { + return dataSize / rowCount; + } else { + return 0; + } + } + + public long getDataLength() { + return getBaseIndex().getDataSize(false); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java index 448c03f1b3a46e..304a105b8cf92d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java @@ -128,6 +128,19 @@ public List getPartitionColumns() { return partitionColumns; } + public String getDisplayPartitionColumns() { + StringBuilder sb = new StringBuilder(); + int index = 0; + for (Column c : partitionColumns) { + if (index != 0) { + sb.append(", "); + } + sb.append(c.getDisplayName()); + index++; + } + return sb.toString(); + } + public Map getIdToItem(boolean isTemp) { if (isTemp) { return idToTempItem; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionItem.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionItem.java index 5b24b1651c3ae1..d75769382180ad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionItem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionItem.java @@ -62,4 +62,8 @@ public boolean isDefaultPartition() { public abstract boolean isGreaterThanSpecifiedTime(int pos, Optional dateFormatOptional, long nowTruncSubSec) throws AnalysisException; + + + //get the unique string of the partition item in sql format + public abstract String getItemsSql(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java index 760e3111b6dbf1..b5e90634b8bbb1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java @@ -49,6 +49,10 @@ public String getItemsString() { return toString(); } + public String getItemsSql() { + return toPartitionKeyDesc().toSql(); + } + @Override public boolean isDefaultPartition() { return false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java index 66794331377558..4d07d219dc4882 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java @@ -27,6 +27,9 @@ import org.apache.doris.catalog.HashDistributionInfo; import org.apache.doris.catalog.MTMV; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.PartitionType; import org.apache.doris.catalog.SchemaTable; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; @@ -99,6 +102,7 @@ import java.time.Instant; import java.time.LocalDateTime; import java.util.ArrayList; +import java.util.Collection; import java.util.Date; import java.util.List; import java.util.Map; @@ -123,6 +127,8 @@ public class MetadataGenerator { private static final ImmutableMap META_CACHE_STATS_COLUMN_TO_INDEX; + private static final ImmutableMap PARTITIONS_COLUMN_TO_INDEX; + static { ImmutableMap.Builder activeQueriesbuilder = new ImmutableMap.Builder(); List activeQueriesColList = SchemaTable.TABLE_MAP.get("active_queries").getFullSchema(); @@ -177,6 +183,13 @@ public class MetadataGenerator { metaCacheBuilder.put(metaCacheColList.get(i).getName().toLowerCase(), i); } META_CACHE_STATS_COLUMN_TO_INDEX = metaCacheBuilder.build(); + + ImmutableMap.Builder partitionsBuilder = new ImmutableMap.Builder(); + List partitionsColList = SchemaTable.TABLE_MAP.get("partitions").getFullSchema(); + for (int i = 0; i < partitionsColList.size(); i++) { + partitionsBuilder.put(partitionsColList.get(i).getName().toLowerCase(), i); + } + PARTITIONS_COLUMN_TO_INDEX = partitionsBuilder.build(); } public static TFetchSchemaTableDataResult getMetadataTable(TFetchSchemaTableDataRequest request) throws TException { @@ -272,6 +285,10 @@ public static TFetchSchemaTableDataResult getSchemaTableData(TFetchSchemaTableDa result = metaCacheStatsMetadataResult(schemaTableParams); columnIndex = META_CACHE_STATS_COLUMN_TO_INDEX; break; + case PARTITIONS: + result = partitionsMetadataResult(schemaTableParams); + columnIndex = PARTITIONS_COLUMN_TO_INDEX; + break; default: return errorResult("invalid schema table name."); } @@ -1151,6 +1168,12 @@ private static TFetchSchemaTableDataResult tableOptionsMetadataResult(TSchemaTab Long dbId = params.getDbId(); String clg = params.getCatalog(); CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(clg); + if (catalog == null) { + // catalog is NULL let return empty to BE + result.setDataBatch(dataBatch); + result.setStatus(new TStatus(TStatusCode.OK)); + return result; + } DatabaseIf database = catalog.getDbNullable(dbId); if (database == null) { // BE gets the database id list from FE and then invokes this interface @@ -1245,8 +1268,14 @@ private static TFetchSchemaTableDataResult tablePropertiesMetadataResult(TSchema TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult(); Long dbId = params.getDbId(); String clg = params.getCatalog(); - CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(clg); List dataBatch = Lists.newArrayList(); + CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(clg); + if (catalog == null) { + // catalog is NULL let return empty to BE + result.setDataBatch(dataBatch); + result.setStatus(new TStatus(TStatusCode.OK)); + return result; + } DatabaseIf database = catalog.getDbNullable(dbId); if (database == null) { // BE gets the database id list from FE and then invokes this interface @@ -1290,7 +1319,124 @@ private static TFetchSchemaTableDataResult metaCacheStatsMetadataResult(TSchemaT fillBatch(dataBatch, icebergCache.getCacheStats(), catalogIf.getName()); } } + result.setDataBatch(dataBatch); + result.setStatus(new TStatus(TStatusCode.OK)); + return result; + } + + private static void partitionsForInternalCatalog(UserIdentity currentUserIdentity, + CatalogIf catalog, DatabaseIf database, List tables, List dataBatch) { + for (TableIf table : tables) { + if (!(table instanceof OlapTable)) { + continue; + } + if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(currentUserIdentity, catalog.getName(), + database.getFullName(), table.getName(), PrivPredicate.SHOW)) { + continue; + } + + OlapTable olapTable = (OlapTable) table; + Collection allPartitions = olapTable.getAllPartitions(); + + for (Partition partition : allPartitions) { + TRow trow = new TRow(); + trow.addToColumnValue(new TCell().setStringVal(catalog.getName())); // TABLE_CATALOG + trow.addToColumnValue(new TCell().setStringVal(database.getFullName())); // TABLE_SCHEMA + trow.addToColumnValue(new TCell().setStringVal(table.getName())); // TABLE_NAME + trow.addToColumnValue(new TCell().setStringVal(partition.getName())); // PARTITION_NAME + trow.addToColumnValue(new TCell().setStringVal("NULL")); // SUBPARTITION_NAME (always null) + + trow.addToColumnValue(new TCell().setIntVal(0)); //PARTITION_ORDINAL_POSITION (not available) + trow.addToColumnValue(new TCell().setIntVal(0)); //SUBPARTITION_ORDINAL_POSITION (not available) + trow.addToColumnValue(new TCell().setStringVal( + olapTable.getPartitionInfo().getType().toString())); // PARTITION_METHOD + trow.addToColumnValue(new TCell().setStringVal("NULL")); // SUBPARTITION_METHOD(always null) + PartitionItem item = olapTable.getPartitionInfo().getItem(partition.getId()); + if ((olapTable.getPartitionInfo().getType() == PartitionType.UNPARTITIONED) || (item == null)) { + trow.addToColumnValue(new TCell().setStringVal("NULL")); // if unpartitioned, its null + trow.addToColumnValue(new TCell().setStringVal("NULL")); // SUBPARTITION_EXPRESSION (always null) + trow.addToColumnValue(new TCell().setStringVal("NULL")); // PARITION DESC, its null + } else { + trow.addToColumnValue(new TCell().setStringVal( + olapTable.getPartitionInfo() + .getDisplayPartitionColumns().toString())); // PARTITION_EXPRESSION + trow.addToColumnValue(new TCell().setStringVal("NULL")); // SUBPARTITION_EXPRESSION (always null) + trow.addToColumnValue(new TCell().setStringVal( + item.getItemsSql())); // PARITION DESC + } + trow.addToColumnValue(new TCell().setLongVal(partition.getRowCount())); //TABLE_ROWS (PARTITION row) + trow.addToColumnValue(new TCell().setLongVal(partition.getAvgRowLength())); //AVG_ROW_LENGTH + trow.addToColumnValue(new TCell().setLongVal(partition.getDataLength())); //DATA_LENGTH + trow.addToColumnValue(new TCell().setIntVal(0)); //MAX_DATA_LENGTH (not available) + trow.addToColumnValue(new TCell().setIntVal(0)); //INDEX_LENGTH (not available) + trow.addToColumnValue(new TCell().setIntVal(0)); //DATA_FREE (not available) + trow.addToColumnValue(new TCell().setStringVal("NULL")); //CREATE_TIME (not available) + trow.addToColumnValue(new TCell().setStringVal( + TimeUtils.longToTimeString(partition.getVisibleVersionTime()))); //UPDATE_TIME + trow.addToColumnValue(new TCell().setStringVal("NULL")); // CHECK_TIME (not available) + trow.addToColumnValue(new TCell().setIntVal(0)); //CHECKSUM (not available) + trow.addToColumnValue(new TCell().setStringVal("")); // PARTITION_COMMENT (not available) + trow.addToColumnValue(new TCell().setStringVal("")); // NODEGROUP (not available) + trow.addToColumnValue(new TCell().setStringVal("")); // TABLESPACE_NAME (not available) + dataBatch.add(trow); + } + } // for table + } + + private static void partitionsForExternalCatalog(UserIdentity currentUserIdentity, + CatalogIf catalog, DatabaseIf database, List tables, List dataBatch) { + for (TableIf table : tables) { + if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(currentUserIdentity, catalog.getName(), + database.getFullName(), table.getName(), PrivPredicate.SHOW)) { + continue; + } + // TODO + } // for table + } + + private static TFetchSchemaTableDataResult partitionsMetadataResult(TSchemaTableRequestParams params) { + if (!params.isSetCurrentUserIdent()) { + return errorResult("current user ident is not set."); + } + if (!params.isSetDbId()) { + return errorResult("current db id is not set."); + } + + if (!params.isSetCatalog()) { + return errorResult("current catalog is not set."); + } + + TUserIdentity tcurrentUserIdentity = params.getCurrentUserIdent(); + UserIdentity currentUserIdentity = UserIdentity.fromThrift(tcurrentUserIdentity); + TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult(); + Long dbId = params.getDbId(); + String clg = params.getCatalog(); + List dataBatch = Lists.newArrayList(); + CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(clg); + if (catalog == null) { + // catalog is NULL let return empty to BE + result.setDataBatch(dataBatch); + result.setStatus(new TStatus(TStatusCode.OK)); + return result; + } + DatabaseIf database = catalog.getDbNullable(dbId); + if (database == null) { + // BE gets the database id list from FE and then invokes this interface + // per database. there is a chance that in between database can be dropped. + // so need to handle database not exist case and return ok so that BE continue the + // loop with next database. + result.setDataBatch(dataBatch); + result.setStatus(new TStatus(TStatusCode.OK)); + return result; + } + List tables = database.getTables(); + if (catalog instanceof InternalCatalog) { + // only olap tables + partitionsForInternalCatalog(currentUserIdentity, catalog, database, tables, dataBatch); + } else if (catalog instanceof ExternalCatalog) { + partitionsForExternalCatalog(currentUserIdentity, catalog, database, tables, dataBatch); + } result.setDataBatch(dataBatch); result.setStatus(new TStatus(TStatusCode.OK)); return result; diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index e6152bd2aa40ec..9c7aa210fec1c2 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1014,6 +1014,7 @@ enum TSchemaTableName { WORKLOAD_GROUP_PRIVILEGES = 7, TABLE_PROPERTIES = 8, CATALOG_META_CACHE_STATS = 9, + PARTITIONS = 10, } struct TMetadataTableRequestParams { diff --git a/regression-test/data/query_p0/system/test_partitions_schema.out b/regression-test/data/query_p0/system/test_partitions_schema.out new file mode 100644 index 00000000000000..ea82818d2ab2b0 --- /dev/null +++ b/regression-test/data/query_p0/system/test_partitions_schema.out @@ -0,0 +1,48 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_check_0 -- +test_range_table p0 9 +test_range_table p1 2 +test_range_table p100 4 +test_range_table p2 1 +test_range_table p3 1 +test_range_table p4 3 +test_range_table p5 0 + +-- !select_check_1 -- +internal test_partitions_schema_db duplicate_table duplicate_table NULL 0 0 UNPARTITIONED NULL NULL NULL NULL -1 0 0 0 0 0 0 +internal test_partitions_schema_db listtable p1_city NULL 0 0 LIST NULL user_id, city NULL (("1", "Beijing"),("1", "Shanghai")) -1 0 0 0 0 0 0 +internal test_partitions_schema_db listtable p2_city NULL 0 0 LIST NULL user_id, city NULL (("2", "Beijing"),("2", "Shanghai")) -1 0 0 0 0 0 0 +internal test_partitions_schema_db listtable p3_city NULL 0 0 LIST NULL user_id, city NULL (("3", "Beijing"),("3", "Shanghai")) -1 0 0 0 0 0 0 +internal test_partitions_schema_db randomtable randomtable NULL 0 0 UNPARTITIONED NULL NULL NULL NULL -1 0 0 0 0 0 0 +internal test_partitions_schema_db test_range_table p0 NULL 0 0 RANGE NULL col_1 NULL [('-2147483648'), ('4')) 9 636 5728 0 0 0 0 +internal test_partitions_schema_db test_range_table p1 NULL 0 0 RANGE NULL col_1 NULL [('4'), ('6')) 2 959 1919 0 0 0 0 +internal test_partitions_schema_db test_range_table p100 NULL 0 0 RANGE NULL col_1 NULL [('83647'), ('2147483647')) 4 735 2941 0 0 0 0 +internal test_partitions_schema_db test_range_table p2 NULL 0 0 RANGE NULL col_1 NULL [('6'), ('7')) 1 975 975 0 0 0 0 +internal test_partitions_schema_db test_range_table p3 NULL 0 0 RANGE NULL col_1 NULL [('7'), ('8')) 1 959 959 0 0 0 0 +internal test_partitions_schema_db test_range_table p4 NULL 0 0 RANGE NULL col_1 NULL [('8'), ('10')) 3 948 2846 0 0 0 0 +internal test_partitions_schema_db test_range_table p5 NULL 0 0 RANGE NULL col_1 NULL [('10'), ('83647')) 0 0 0 0 0 0 0 +internal test_partitions_schema_db test_row_column_page_size1 test_row_column_page_size1 NULL 0 0 UNPARTITIONED NULL NULL NULL NULL -1 0 0 0 0 0 0 +internal test_partitions_schema_db test_row_column_page_size2 test_row_column_page_size2 NULL 0 0 UNPARTITIONED NULL NULL NULL NULL -1 0 0 0 0 0 0 + +-- !select_check_2 -- +internal test_partitions_schema_db duplicate_table duplicate_table NULL 0 0 UNPARTITIONED NULL NULL NULL NULL -1 0 0 0 0 0 0 +internal test_partitions_schema_db listtable p1_city NULL 0 0 LIST NULL user_id, city NULL (("1", "Beijing"),("1", "Shanghai")) -1 0 0 0 0 0 0 +internal test_partitions_schema_db listtable p2_city NULL 0 0 LIST NULL user_id, city NULL (("2", "Beijing"),("2", "Shanghai")) -1 0 0 0 0 0 0 +internal test_partitions_schema_db listtable p3_city NULL 0 0 LIST NULL user_id, city NULL (("3", "Beijing"),("3", "Shanghai")) -1 0 0 0 0 0 0 +internal test_partitions_schema_db randomtable randomtable NULL 0 0 UNPARTITIONED NULL NULL NULL NULL -1 0 0 0 0 0 0 +internal test_partitions_schema_db test_range_table p0 NULL 0 0 RANGE NULL col_1 NULL [('-2147483648'), ('4')) 9 636 5728 0 0 0 0 +internal test_partitions_schema_db test_range_table p1 NULL 0 0 RANGE NULL col_1 NULL [('4'), ('6')) 2 959 1919 0 0 0 0 +internal test_partitions_schema_db test_range_table p100 NULL 0 0 RANGE NULL col_1 NULL [('83647'), ('2147483647')) 4 735 2941 0 0 0 0 +internal test_partitions_schema_db test_range_table p2 NULL 0 0 RANGE NULL col_1 NULL [('6'), ('7')) 1 975 975 0 0 0 0 +internal test_partitions_schema_db test_range_table p3 NULL 0 0 RANGE NULL col_1 NULL [('7'), ('8')) 1 959 959 0 0 0 0 +internal test_partitions_schema_db test_range_table p4 NULL 0 0 RANGE NULL col_1 NULL [('8'), ('10')) 3 948 2846 0 0 0 0 +internal test_partitions_schema_db test_range_table p5 NULL 0 0 RANGE NULL col_1 NULL [('10'), ('83647')) 0 0 0 0 0 0 0 +internal test_partitions_schema_db test_row_column_page_size1 test_row_column_page_size1 NULL 0 0 UNPARTITIONED NULL NULL NULL NULL -1 0 0 0 0 0 0 + +-- !select_check_3 -- + +-- !select_check_4 -- +internal test_partitions_schema_db duplicate_table duplicate_table NULL 0 0 UNPARTITIONED NULL NULL NULL NULL -1 0 0 0 0 0 0 + +-- !select_check_5 -- + diff --git a/regression-test/suites/query_p0/system/test_partitions_schema.groovy b/regression-test/suites/query_p0/system/test_partitions_schema.groovy new file mode 100644 index 00000000000000..ac73d3315d0dfb --- /dev/null +++ b/regression-test/suites/query_p0/system/test_partitions_schema.groovy @@ -0,0 +1,195 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.awaitility.Awaitility +import static java.util.concurrent.TimeUnit.SECONDS + +suite("test_partitions_schema") { + def dbName = "test_partitions_schema_db" + def listOfColum = "TABLE_CATALOG,TABLE_SCHEMA,TABLE_NAME,PARTITION_NAME,SUBPARTITION_NAME,PARTITION_ORDINAL_POSITION,SUBPARTITION_ORDINAL_POSITION,PARTITION_METHOD,SUBPARTITION_METHOD,PARTITION_EXPRESSION,SUBPARTITION_EXPRESSION,PARTITION_DESCRIPTION,TABLE_ROWS,AVG_ROW_LENGTH,DATA_LENGTH,MAX_DATA_LENGTH,INDEX_LENGTH,DATA_FREE,CHECKSUM,PARTITION_COMMENT,NODEGROUP,TABLESPACE_NAME"; + sql "drop database if exists ${dbName}" + sql "CREATE DATABASE IF NOT EXISTS ${dbName}" + sql "use ${dbName}" + + def checkRowCount = { expectedRowCount -> + Awaitility.await().atMost(180, SECONDS).pollInterval(1, SECONDS).until( + { + def result = sql "select table_rows from information_schema.partitions where table_name='test_range_table' and partition_name='p0'" + logger.info("table: table_name, rowCount: ${result}") + return result[0][0] == expectedRowCount + } + ) + } + + sql """ + create table test_range_table ( + col_1 int, + col_2 int, + col_3 int, + col_4 int, + pk int + ) engine=olap + DUPLICATE KEY(col_1, col_2) + PARTITION BY RANGE(col_1) ( + PARTITION p0 VALUES LESS THAN ('4'), + PARTITION p1 VALUES LESS THAN ('6'), + PARTITION p2 VALUES LESS THAN ('7'), + PARTITION p3 VALUES LESS THAN ('8'), + PARTITION p4 VALUES LESS THAN ('10'), + PARTITION p5 VALUES LESS THAN ('83647'), + PARTITION p100 VALUES LESS THAN ('2147483647') + ) + + distributed by hash(pk) buckets 10 + properties("replication_num" = "1"); + """ + sql """ + insert into test_range_table(pk,col_1,col_2,col_3,col_4) values (0,6,-179064,5213411,5),(1,3,5,2,6),(2,4226261,7,null,3),(3,9,null,4,4),(4,-1003770,2,1,1),(5,8,7,null,8176864),(6,3388266,5,8,8),(7,5,1,2,null),(8,9,2064412,0,null),(9,1489553,8,-446412,6),(10,1,3,0,1),(11,null,3,4621304,null),(12,null,-3058026,-262645,9),(13,null,null,9,3),(14,null,null,5037128,7),(15,299896,-1444893,8,1480339),(16,7,7,0,1470826),(17,-7378014,5,null,5),(18,0,3,6,5),(19,5,3,-4403612,-3103249); + """ + sql """ + sync + """ + checkRowCount(9); + + qt_select_check_0 """select table_name,partition_name,table_rows from information_schema.partitions where table_schema=\"${dbName}\" order by $listOfColum""" + sql """ + CREATE TABLE IF NOT EXISTS listtable + ( + `user_id` LARGEINT NOT NULL COMMENT "User id", + `date` DATE NOT NULL COMMENT "Data fill in date time", + `timestamp` DATETIME NOT NULL COMMENT "Timestamp of data being poured", + `city` VARCHAR(20) COMMENT "The city where the user is located", + `age` SMALLINT COMMENT "User Age", + `sex` TINYINT COMMENT "User gender", + `last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "User last visit time", + `cost` BIGINT SUM DEFAULT "0" COMMENT "Total user consumption", + `max_dwell_time` INT MAX DEFAULT "0" COMMENT "User maximum dwell time", + `min_dwell_time` INT MIN DEFAULT "99999" COMMENT "User minimum dwell time" + ) + ENGINE=olap + AGGREGATE KEY(`user_id`, `date`, `timestamp`, `city`, `age`, `sex`) + PARTITION BY LIST(user_id, city) + ( + PARTITION p1_city VALUES IN (("1", "Beijing"), ("1", "Shanghai")), + PARTITION p2_city VALUES IN (("2", "Beijing"), ("2", "Shanghai")), + PARTITION p3_city VALUES IN (("3", "Beijing"), ("3", "Shanghai")) + ) + DISTRIBUTED BY HASH(`user_id`) BUCKETS 16 + PROPERTIES + ( + "replication_num" = "1" + ); + """ + sql """ + CREATE TABLE IF NOT EXISTS randomtable + ( + `user_id` LARGEINT NOT NULL COMMENT "User id", + `date` DATE NOT NULL COMMENT "Data fill in date time", + `timestamp` DATETIME NOT NULL COMMENT "Timestamp of data being poured", + `city` VARCHAR(20) COMMENT "The city where the user is located", + `age` SMALLINT COMMENT "User Age", + `sex` TINYINT COMMENT "User gender" + ) + ENGINE=olap + DISTRIBUTED BY RANDOM BUCKETS 16 + PROPERTIES + ( + "replication_num" = "1" + ); + """ + sql """ + CREATE TABLE IF NOT EXISTS duplicate_table + ( + `timestamp` DATETIME NOT NULL COMMENT "Log time", + `type` INT NOT NULL COMMENT "Log type", + `error_code` INT COMMENT "Error code", + `error_msg` VARCHAR(1024) COMMENT "Error detail message", + `op_id` BIGINT COMMENT "Operator ID", + `op_time` DATETIME COMMENT "Operation time" + ) + DISTRIBUTED BY HASH(`type`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + // test row column page size + sql """ + CREATE TABLE IF NOT EXISTS test_row_column_page_size1 ( + `aaa` varchar(170) NOT NULL COMMENT "", + `bbb` varchar(20) NOT NULL COMMENT "", + `ccc` INT NULL COMMENT "", + `ddd` SMALLINT NULL COMMENT "" + ) + DISTRIBUTED BY HASH(`aaa`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "store_row_column" = "true" + ); + """ + + sql """ + CREATE TABLE IF NOT EXISTS test_row_column_page_size2 ( + `aaa` varchar(170) NOT NULL COMMENT "", + `bbb` varchar(20) NOT NULL COMMENT "", + `ccc` INT NULL COMMENT "", + `ddd` SMALLINT NULL COMMENT "" + ) + DISTRIBUTED BY HASH(`aaa`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "store_row_column" = "true", + "row_store_page_size" = "8190" + ); + """ + qt_select_check_1 """select $listOfColum from information_schema.partitions where table_schema=\"${dbName}\" order by $listOfColum""" + sql """ + drop table test_row_column_page_size2; + """ + qt_select_check_2 """select $listOfColum from information_schema.partitions where table_schema=\"${dbName}\" order by $listOfColum""" + + def user = "partitions_user" + sql "DROP USER IF EXISTS ${user}" + sql "CREATE USER ${user} IDENTIFIED BY '123abc!@#'" + //cloud-mode + if (isCloudMode()) { + def clusters = sql " SHOW CLUSTERS; " + assertTrue(!clusters.isEmpty()) + def validCluster = clusters[0][0] + sql """GRANT USAGE_PRIV ON CLUSTER ${validCluster} TO ${user}"""; + } + + sql "GRANT SELECT_PRIV ON information_schema.partitions TO ${user}" + + def tokens = context.config.jdbcUrl.split('/') + def url=tokens[0] + "//" + tokens[2] + "/" + "information_schema" + "?" + + connect(user=user, password='123abc!@#', url=url) { + qt_select_check_3 """select $listOfColum from information_schema.partitions where table_schema=\"${dbName}\" order by $listOfColum""" + } + + sql "GRANT SELECT_PRIV ON ${dbName}.duplicate_table TO ${user}" + connect(user=user, password='123abc!@#', url=url) { + qt_select_check_4 """select $listOfColum from information_schema.partitions where table_schema=\"${dbName}\" order by $listOfColum""" + } + + sql "REVOKE SELECT_PRIV ON ${dbName}.duplicate_table FROM ${user}" + connect(user=user, password='123abc!@#', url=url) { + qt_select_check_5 """select $listOfColum from information_schema.partitions where table_schema=\"${dbName}\" order by $listOfColum""" + } + +} From 5717ad6894ecd0f2db2925e1d7142ee03aa330f8 Mon Sep 17 00:00:00 2001 From: Vallish Date: Tue, 3 Sep 2024 16:22:41 +0000 Subject: [PATCH 2/5] [fix] adjust testcase --- .../data/query_p0/system/test_query_sys_tables.out | 2 -- .../suites/query_p0/system/test_query_sys_tables.groovy | 6 +++--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/regression-test/data/query_p0/system/test_query_sys_tables.out b/regression-test/data/query_p0/system/test_query_sys_tables.out index 3073ae0bf53e74..cdd89914994c63 100644 --- a/regression-test/data/query_p0/system/test_query_sys_tables.out +++ b/regression-test/data/query_p0/system/test_query_sys_tables.out @@ -152,8 +152,6 @@ PARTITION_COMMENT text Yes false \N NODEGROUP varchar(256) Yes false \N TABLESPACE_NAME varchar(268) Yes false \N --- !select_partitions -- - -- !schemata -- internal test_query_sys_db_1 \N internal test_query_sys_db_2 \N diff --git a/regression-test/suites/query_p0/system/test_query_sys_tables.groovy b/regression-test/suites/query_p0/system/test_query_sys_tables.groovy index 72198b32eadfd2..a0f246f03f66ee 100644 --- a/regression-test/suites/query_p0/system/test_query_sys_tables.groovy +++ b/regression-test/suites/query_p0/system/test_query_sys_tables.groovy @@ -135,9 +135,9 @@ suite("test_query_sys_tables", "query,p0") { // test partitions - // have no impl + // have impl now, partition based on time and date so not doing data validation. + // data validation taken care in another regression test. qt_desc_partitions """ desc `information_schema`.`partitions` """ - order_qt_select_partitions """ select * from `information_schema`.`partitions`; """ // test schemata // create test dbs @@ -253,4 +253,4 @@ suite("test_query_sys_tables", "query,p0") { qt_sql "select * from triggers" qt_sql "select * from parameters" qt_sql "select * from profiling" -} \ No newline at end of file +} From edebf02876e75e339397954f24eea5e8332d461c Mon Sep 17 00:00:00 2001 From: Vallish Date: Thu, 5 Sep 2024 15:24:02 +0000 Subject: [PATCH 3/5] [chore] remove duplicate code --- .../schema_active_queries_scanner.cpp | 53 ++++++++----------- .../schema_scanner/schema_routine_scanner.cpp | 28 ++-------- .../schema_scanner/schema_scanner_helper.cpp | 13 ++++- .../schema_scanner/schema_scanner_helper.h | 1 + .../runtime/runtime_query_statistics_mgr.cpp | 47 +++++----------- .../runtime/workload_group/workload_group.cpp | 1 + .../workload_group/workload_group_manager.cpp | 35 +++--------- 7 files changed, 59 insertions(+), 119 deletions(-) diff --git a/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp b/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp index 46522a36242fc1..2b516fc6fdac2b 100644 --- a/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp @@ -17,6 +17,7 @@ #include "exec/schema_scanner/schema_active_queries_scanner.h" +#include "exec/schema_scanner/schema_scanner_helper.h" #include "runtime/client_cache.h" #include "runtime/exec_env.h" #include "runtime/runtime_state.h" @@ -98,41 +99,29 @@ Status SchemaActiveQueriesScanner::_get_active_queries_block_from_fe() { } } - // todo(wb) reuse this callback function - auto insert_string_value = [&](int col_index, std::string str_val, vectorized::Block* block) { - vectorized::MutableColumnPtr mutable_col_ptr; - mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable(); - auto* nullable_column = - reinterpret_cast(mutable_col_ptr.get()); - vectorized::IColumn* col_ptr = &nullable_column->get_nested_column(); - reinterpret_cast(col_ptr)->insert_data(str_val.data(), - str_val.size()); - nullable_column->get_null_map_data().emplace_back(0); - }; - auto insert_int_value = [&](int col_index, int64_t int_val, vectorized::Block* block) { - vectorized::MutableColumnPtr mutable_col_ptr; - mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable(); - auto* nullable_column = - reinterpret_cast(mutable_col_ptr.get()); - vectorized::IColumn* col_ptr = &nullable_column->get_nested_column(); - reinterpret_cast*>(col_ptr)->insert_value( - int_val); - nullable_column->get_null_map_data().emplace_back(0); - }; - for (int i = 0; i < result_data.size(); i++) { TRow row = result_data[i]; - insert_string_value(0, row.column_value[0].stringVal, _active_query_block.get()); - insert_string_value(1, row.column_value[1].stringVal, _active_query_block.get()); - insert_int_value(2, row.column_value[2].longVal, _active_query_block.get()); - insert_int_value(3, row.column_value[3].longVal, _active_query_block.get()); - insert_string_value(4, row.column_value[4].stringVal, _active_query_block.get()); - insert_string_value(5, row.column_value[5].stringVal, _active_query_block.get()); - insert_string_value(6, row.column_value[6].stringVal, _active_query_block.get()); - insert_string_value(7, row.column_value[7].stringVal, _active_query_block.get()); - insert_string_value(8, row.column_value[8].stringVal, _active_query_block.get()); - insert_string_value(9, row.column_value[9].stringVal, _active_query_block.get()); + SchemaScannerHelper::insert_string_value(0, row.column_value[0].stringVal, + _active_query_block.get()); + SchemaScannerHelper::insert_string_value(1, row.column_value[1].stringVal, + _active_query_block.get()); + SchemaScannerHelper::insert_int_value(2, row.column_value[2].longVal, + _active_query_block.get()); + SchemaScannerHelper::insert_int_value(3, row.column_value[3].longVal, + _active_query_block.get()); + SchemaScannerHelper::insert_string_value(4, row.column_value[4].stringVal, + _active_query_block.get()); + SchemaScannerHelper::insert_string_value(5, row.column_value[5].stringVal, + _active_query_block.get()); + SchemaScannerHelper::insert_string_value(6, row.column_value[6].stringVal, + _active_query_block.get()); + SchemaScannerHelper::insert_string_value(7, row.column_value[7].stringVal, + _active_query_block.get()); + SchemaScannerHelper::insert_string_value(8, row.column_value[8].stringVal, + _active_query_block.get()); + SchemaScannerHelper::insert_string_value(9, row.column_value[9].stringVal, + _active_query_block.get()); } return Status::OK(); } diff --git a/be/src/exec/schema_scanner/schema_routine_scanner.cpp b/be/src/exec/schema_scanner/schema_routine_scanner.cpp index 8c263c99d2d6c8..adb18450f26490 100644 --- a/be/src/exec/schema_scanner/schema_routine_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_routine_scanner.cpp @@ -17,6 +17,7 @@ #include "exec/schema_scanner/schema_routine_scanner.h" +#include "exec/schema_scanner/schema_scanner_helper.h" #include "runtime/client_cache.h" #include "runtime/exec_env.h" #include "runtime/runtime_state.h" @@ -99,28 +100,6 @@ Status SchemaRoutinesScanner::get_block_from_fe() { return Status::InternalError("routine table schema is not match for FE and BE"); } } - auto insert_string_value = [&](int col_index, std::string str_val, vectorized::Block* block) { - vectorized::MutableColumnPtr mutable_col_ptr; - mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable(); - auto* nullable_column = - reinterpret_cast(mutable_col_ptr.get()); - vectorized::IColumn* col_ptr = &nullable_column->get_nested_column(); - reinterpret_cast(col_ptr)->insert_data(str_val.data(), - str_val.size()); - nullable_column->get_null_map_data().emplace_back(0); - }; - auto insert_datetime_value = [&](int col_index, const std::vector& datas, - vectorized::Block* block) { - vectorized::MutableColumnPtr mutable_col_ptr; - mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable(); - auto* nullable_column = - reinterpret_cast(mutable_col_ptr.get()); - vectorized::IColumn* col_ptr = &nullable_column->get_nested_column(); - auto data = datas[0]; - reinterpret_cast*>(col_ptr)->insert_data( - reinterpret_cast(data), 0); - nullable_column->get_null_map_data().emplace_back(0); - }; for (int i = 0; i < result_data.size(); i++) { TRow row = result_data[i]; @@ -132,9 +111,10 @@ Status SchemaRoutinesScanner::get_block_from_fe() { src[0].from_date_str(row.column_value[j].stringVal.data(), row.column_value[j].stringVal.size()); datas[0] = src; - insert_datetime_value(j, datas, _routines_block.get()); + SchemaScannerHelper::insert_datetime_value(j, datas, _routines_block.get()); } else { - insert_string_value(j, row.column_value[j].stringVal, _routines_block.get()); + SchemaScannerHelper::insert_string_value(j, row.column_value[j].stringVal, + _routines_block.get()); } } } diff --git a/be/src/exec/schema_scanner/schema_scanner_helper.cpp b/be/src/exec/schema_scanner/schema_scanner_helper.cpp index b7d7b085f78cdc..fc42044a29c63f 100644 --- a/be/src/exec/schema_scanner/schema_scanner_helper.cpp +++ b/be/src/exec/schema_scanner/schema_scanner_helper.cpp @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "schema_scanner_helper.h" +#include "exec/schema_scanner/schema_scanner_helper.h" #include "runtime/client_cache.h" #include "runtime/exec_env.h" @@ -59,4 +59,15 @@ void SchemaScannerHelper::insert_int_value(int col_index, int64_t int_val, reinterpret_cast*>(col_ptr)->insert_value(int_val); nullable_column->get_null_map_data().emplace_back(0); } + +void SchemaScannerHelper::insert_double_value(int col_index, double double_val, + vectorized::Block* block) { + vectorized::MutableColumnPtr mutable_col_ptr; + mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable(); + auto* nullable_column = reinterpret_cast(mutable_col_ptr.get()); + vectorized::IColumn* col_ptr = &nullable_column->get_nested_column(); + reinterpret_cast*>(col_ptr)->insert_value( + double_val); + nullable_column->get_null_map_data().emplace_back(0); +} } // namespace doris diff --git a/be/src/exec/schema_scanner/schema_scanner_helper.h b/be/src/exec/schema_scanner/schema_scanner_helper.h index 126ad33e5f28a3..c9fe8881ddb06e 100644 --- a/be/src/exec/schema_scanner/schema_scanner_helper.h +++ b/be/src/exec/schema_scanner/schema_scanner_helper.h @@ -36,6 +36,7 @@ class SchemaScannerHelper { vectorized::Block* block); static void insert_int_value(int col_index, int64_t int_val, vectorized::Block* block); + static void insert_double_value(int col_index, double double_val, vectorized::Block* block); }; } // namespace doris diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp b/be/src/runtime/runtime_query_statistics_mgr.cpp index 5c83a6d8041a74..84a552e45173c4 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.cpp +++ b/be/src/runtime/runtime_query_statistics_mgr.cpp @@ -35,6 +35,7 @@ #include #include "common/logging.h" +#include "exec/schema_scanner/schema_scanner_helper.h" #include "runtime/client_cache.h" #include "runtime/exec_env.h" #include "runtime/query_context.h" @@ -516,51 +517,29 @@ void RuntimeQueryStatisticsMgr::get_active_be_tasks_block(vectorized::Block* blo std::shared_lock read_lock(_qs_ctx_map_lock); int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id; - auto insert_int_value = [&](int col_index, int64_t int_val, vectorized::Block* block) { - vectorized::MutableColumnPtr mutable_col_ptr; - mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable(); - auto* nullable_column = - reinterpret_cast(mutable_col_ptr.get()); - vectorized::IColumn* col_ptr = &nullable_column->get_nested_column(); - reinterpret_cast*>(col_ptr)->insert_value( - int_val); - nullable_column->get_null_map_data().emplace_back(0); - }; - - auto insert_string_value = [&](int col_index, std::string str_val, vectorized::Block* block) { - vectorized::MutableColumnPtr mutable_col_ptr; - mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable(); - auto* nullable_column = - reinterpret_cast(mutable_col_ptr.get()); - vectorized::IColumn* col_ptr = &nullable_column->get_nested_column(); - reinterpret_cast(col_ptr)->insert_data(str_val.data(), - str_val.size()); - nullable_column->get_null_map_data().emplace_back(0); - }; - // block's schema come from SchemaBackendActiveTasksScanner::_s_tbls_columns for (auto& [query_id, qs_ctx_ptr] : _query_statistics_ctx_map) { TQueryStatistics tqs; qs_ctx_ptr->collect_query_statistics(&tqs); - insert_int_value(0, be_id, block); - insert_string_value(1, qs_ctx_ptr->_fe_addr.hostname, block); - insert_string_value(2, query_id, block); + SchemaScannerHelper::insert_int_value(0, be_id, block); + SchemaScannerHelper::insert_string_value(1, qs_ctx_ptr->_fe_addr.hostname, block); + SchemaScannerHelper::insert_string_value(2, query_id, block); int64_t task_time = qs_ctx_ptr->_is_query_finished ? qs_ctx_ptr->_query_finish_time - qs_ctx_ptr->_query_start_time : MonotonicMillis() - qs_ctx_ptr->_query_start_time; - insert_int_value(3, task_time, block); - insert_int_value(4, tqs.cpu_ms, block); - insert_int_value(5, tqs.scan_rows, block); - insert_int_value(6, tqs.scan_bytes, block); - insert_int_value(7, tqs.max_peak_memory_bytes, block); - insert_int_value(8, tqs.current_used_memory_bytes, block); - insert_int_value(9, tqs.shuffle_send_bytes, block); - insert_int_value(10, tqs.shuffle_send_rows, block); + SchemaScannerHelper::insert_int_value(3, task_time, block); + SchemaScannerHelper::insert_int_value(4, tqs.cpu_ms, block); + SchemaScannerHelper::insert_int_value(5, tqs.scan_rows, block); + SchemaScannerHelper::insert_int_value(6, tqs.scan_bytes, block); + SchemaScannerHelper::insert_int_value(7, tqs.max_peak_memory_bytes, block); + SchemaScannerHelper::insert_int_value(8, tqs.current_used_memory_bytes, block); + SchemaScannerHelper::insert_int_value(9, tqs.shuffle_send_bytes, block); + SchemaScannerHelper::insert_int_value(10, tqs.shuffle_send_rows, block); std::stringstream ss; ss << qs_ctx_ptr->_query_type; - insert_string_value(11, ss.str(), block); + SchemaScannerHelper::insert_string_value(11, ss.str(), block); } } diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index e37f83a00e828b..f8e822bfd0a04f 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -27,6 +27,7 @@ #include #include "common/logging.h" +#include "exec/schema_scanner/schema_scanner_helper.h" #include "io/fs/local_file_reader.h" #include "olap/storage_engine.h" #include "pipeline/task_queue.h" diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 314a2b87841e1d..393d327e7c42ca 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -22,6 +22,7 @@ #include #include +#include "exec/schema_scanner/schema_scanner_helper.h" #include "pipeline/task_scheduler.h" #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/workload_group/workload_group.h" @@ -246,28 +247,6 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() { } void WorkloadGroupMgr::get_wg_resource_usage(vectorized::Block* block) { - auto insert_int_value = [&](int col_index, int64_t int_val, vectorized::Block* block) { - vectorized::MutableColumnPtr mutable_col_ptr; - mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable(); - auto* nullable_column = - reinterpret_cast(mutable_col_ptr.get()); - vectorized::IColumn* col_ptr = &nullable_column->get_nested_column(); - reinterpret_cast*>(col_ptr)->insert_value( - int_val); - nullable_column->get_null_map_data().emplace_back(0); - }; - - auto insert_double_value = [&](int col_index, double double_val, vectorized::Block* block) { - vectorized::MutableColumnPtr mutable_col_ptr; - mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable(); - auto* nullable_column = - reinterpret_cast(mutable_col_ptr.get()); - vectorized::IColumn* col_ptr = &nullable_column->get_nested_column(); - reinterpret_cast*>(col_ptr)->insert_value( - double_val); - nullable_column->get_null_map_data().emplace_back(0); - }; - int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id; int cpu_num = CpuInfo::num_cores(); cpu_num = cpu_num <= 0 ? 1 : cpu_num; @@ -276,18 +255,18 @@ void WorkloadGroupMgr::get_wg_resource_usage(vectorized::Block* block) { std::shared_lock r_lock(_group_mutex); block->reserve(_workload_groups.size()); for (const auto& [id, wg] : _workload_groups) { - insert_int_value(0, be_id, block); - insert_int_value(1, wg->id(), block); - insert_int_value(2, wg->get_mem_used(), block); + SchemaScannerHelper::insert_int_value(0, be_id, block); + SchemaScannerHelper::insert_int_value(1, wg->id(), block); + SchemaScannerHelper::insert_int_value(2, wg->get_mem_used(), block); double cpu_usage_p = (double)wg->get_cpu_usage() / (double)total_cpu_time_ns_per_second * 100; cpu_usage_p = std::round(cpu_usage_p * 100.0) / 100.0; - insert_double_value(3, cpu_usage_p, block); + SchemaScannerHelper::insert_double_value(3, cpu_usage_p, block); - insert_int_value(4, wg->get_local_scan_bytes_per_second(), block); - insert_int_value(5, wg->get_remote_scan_bytes_per_second(), block); + SchemaScannerHelper::insert_int_value(4, wg->get_local_scan_bytes_per_second(), block); + SchemaScannerHelper::insert_int_value(5, wg->get_remote_scan_bytes_per_second(), block); } } From 3387825fcbfb266df78b85059de3b2ccd52f0f24 Mon Sep 17 00:00:00 2001 From: Vallish Date: Mon, 2 Sep 2024 11:30:00 +0000 Subject: [PATCH 4/5] [Enhancement] add row count in show partition --- .../java/org/apache/doris/common/proc/PartitionsProcDir.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java index 3e7c6bdebc762b..f1aed5b6278960 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java @@ -76,6 +76,7 @@ public class PartitionsProcDir implements ProcDirInterface { .add("Buckets").add("ReplicationNum").add("StorageMedium").add("CooldownTime").add("RemoteStoragePolicy") .add("LastConsistencyCheckTime").add("DataSize").add("IsInMemory").add("ReplicaAllocation") .add("IsMutable").add("SyncWithBaseTables").add("UnsyncTables").add("CommittedVersion") + .add("RowCount") .build(); private Database db; @@ -383,6 +384,9 @@ private List, TRow>> getPartitionInfosInrernal() throws An partitionInfo.add(partition.getCommittedVersion()); trow.addToColumnValue(new TCell().setLongVal(partition.getCommittedVersion())); + partitionInfo.add(partition.getRowCount()); + trow.addToColumnValue(new TCell().setLongVal(partition.getRowCount())); + partitionInfos.add(Pair.of(partitionInfo, trow)); } } finally { From f764c6912eead6d0785df7270275d2a085f6781e Mon Sep 17 00:00:00 2001 From: Vallish Date: Mon, 9 Sep 2024 13:59:36 +0000 Subject: [PATCH 5/5] [fix](systable) refactor code to avoid wrong usage --- be/src/exec/schema_scanner.cpp | 11 ++++++++ .../schema_active_queries_scanner.cpp | 26 +++---------------- .../schema_partitions_scanner.cpp | 18 ++----------- .../schema_scanner/schema_routine_scanner.cpp | 15 ++--------- 4 files changed, 19 insertions(+), 51 deletions(-) diff --git a/be/src/exec/schema_scanner.cpp b/be/src/exec/schema_scanner.cpp index 9a4b198aa3d27d..7f64387b2204b3 100644 --- a/be/src/exec/schema_scanner.cpp +++ b/be/src/exec/schema_scanner.cpp @@ -449,6 +449,17 @@ Status SchemaScanner::insert_block_column(TCell cell, int col_index, vectorized: break; } + case TYPE_DATETIME: { + std::vector datas(1); + VecDateTimeValue src[1]; + src[0].from_date_str(cell.stringVal.data(), cell.stringVal.size()); + datas[0] = src; + auto data = datas[0]; + reinterpret_cast*>(col_ptr)->insert_data( + reinterpret_cast(data), 0); + nullable_column->get_null_map_data().emplace_back(0); + break; + } default: { std::stringstream ss; ss << "unsupported column type:" << type; diff --git a/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp b/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp index 2b516fc6fdac2b..6aa6e758999fb0 100644 --- a/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp @@ -17,7 +17,6 @@ #include "exec/schema_scanner/schema_active_queries_scanner.h" -#include "exec/schema_scanner/schema_scanner_helper.h" #include "runtime/client_cache.h" #include "runtime/exec_env.h" #include "runtime/runtime_state.h" @@ -101,27 +100,10 @@ Status SchemaActiveQueriesScanner::_get_active_queries_block_from_fe() { for (int i = 0; i < result_data.size(); i++) { TRow row = result_data[i]; - - SchemaScannerHelper::insert_string_value(0, row.column_value[0].stringVal, - _active_query_block.get()); - SchemaScannerHelper::insert_string_value(1, row.column_value[1].stringVal, - _active_query_block.get()); - SchemaScannerHelper::insert_int_value(2, row.column_value[2].longVal, - _active_query_block.get()); - SchemaScannerHelper::insert_int_value(3, row.column_value[3].longVal, - _active_query_block.get()); - SchemaScannerHelper::insert_string_value(4, row.column_value[4].stringVal, - _active_query_block.get()); - SchemaScannerHelper::insert_string_value(5, row.column_value[5].stringVal, - _active_query_block.get()); - SchemaScannerHelper::insert_string_value(6, row.column_value[6].stringVal, - _active_query_block.get()); - SchemaScannerHelper::insert_string_value(7, row.column_value[7].stringVal, - _active_query_block.get()); - SchemaScannerHelper::insert_string_value(8, row.column_value[8].stringVal, - _active_query_block.get()); - SchemaScannerHelper::insert_string_value(9, row.column_value[9].stringVal, - _active_query_block.get()); + for (int j = 0; j < _s_tbls_columns.size(); j++) { + RETURN_IF_ERROR(insert_block_column(row.column_value[j], j, _active_query_block.get(), + _s_tbls_columns[j].type)); + } } return Status::OK(); } diff --git a/be/src/exec/schema_scanner/schema_partitions_scanner.cpp b/be/src/exec/schema_scanner/schema_partitions_scanner.cpp index 9f86fe6feb49d9..ebe2bd3b70ec0e 100644 --- a/be/src/exec/schema_scanner/schema_partitions_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_partitions_scanner.cpp @@ -22,7 +22,6 @@ #include #include "exec/schema_scanner/schema_helper.h" -#include "exec/schema_scanner/schema_scanner_helper.h" #include "runtime/client_cache.h" #include "runtime/exec_env.h" #include "runtime/runtime_state.h" @@ -147,22 +146,9 @@ Status SchemaPartitionsScanner::get_onedb_info_from_fe(int64_t dbId) { for (int i = 0; i < result_data.size(); i++) { TRow row = result_data[i]; - for (int j = 0; j < _s_tbls_columns.size(); j++) { - if ((_s_tbls_columns[j].type == TYPE_BIGINT) || _s_tbls_columns[j].type == TYPE_INT) { - SchemaScannerHelper::insert_int_value(j, row.column_value[j].longVal, - _partitions_block.get()); - } else if (_s_tbls_columns[j].type == TYPE_DATETIME) { - std::vector datas(1); - VecDateTimeValue src[1]; - src[0].from_date_str(row.column_value[j].stringVal.data(), - row.column_value[j].stringVal.size()); - datas[0] = src; - SchemaScannerHelper::insert_datetime_value(j, datas, _partitions_block.get()); - } else { - SchemaScannerHelper::insert_string_value(j, row.column_value[j].stringVal, - _partitions_block.get()); - } + RETURN_IF_ERROR(insert_block_column(row.column_value[j], j, _partitions_block.get(), + _s_tbls_columns[j].type)); } } return Status::OK(); diff --git a/be/src/exec/schema_scanner/schema_routine_scanner.cpp b/be/src/exec/schema_scanner/schema_routine_scanner.cpp index adb18450f26490..e8d95f0abd6d36 100644 --- a/be/src/exec/schema_scanner/schema_routine_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_routine_scanner.cpp @@ -17,7 +17,6 @@ #include "exec/schema_scanner/schema_routine_scanner.h" -#include "exec/schema_scanner/schema_scanner_helper.h" #include "runtime/client_cache.h" #include "runtime/exec_env.h" #include "runtime/runtime_state.h" @@ -103,19 +102,9 @@ Status SchemaRoutinesScanner::get_block_from_fe() { for (int i = 0; i < result_data.size(); i++) { TRow row = result_data[i]; - for (int j = 0; j < _s_tbls_columns.size(); j++) { - if (_s_tbls_columns[j].type == TYPE_DATETIME) { - std::vector datas(1); - VecDateTimeValue src[1]; - src[0].from_date_str(row.column_value[j].stringVal.data(), - row.column_value[j].stringVal.size()); - datas[0] = src; - SchemaScannerHelper::insert_datetime_value(j, datas, _routines_block.get()); - } else { - SchemaScannerHelper::insert_string_value(j, row.column_value[j].stringVal, - _routines_block.get()); - } + RETURN_IF_ERROR(insert_block_column(row.column_value[j], j, _routines_block.get(), + _s_tbls_columns[j].type)); } } return Status::OK();