From 0684c9363be1577627a694be51de124cfd68fe6e Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Wed, 26 Jun 2024 18:56:12 +0800 Subject: [PATCH 1/2] 1 --- be/src/vec/exec/scan/vmeta_scanner.cpp | 23 ++ be/src/vec/exec/scan/vmeta_scanner.h | 2 + .../catalog/BuiltinTableValuedFunctions.java | 2 + .../doris/common/proc/PartitionsProcDir.java | 77 ++++-- .../functions/table/Partitions.java | 58 +++++ .../visitor/TableValuedFunctionVisitor.java | 5 + .../tablefunction/MetadataGenerator.java | 108 ++++++++ .../MetadataTableValuedFunction.java | 2 + .../PartitionsTableValuedFunction.java | 243 ++++++++++++++++++ .../tablefunction/TableValuedFunctionIf.java | 2 + gensrc/thrift/FrontendService.thrift | 1 + gensrc/thrift/PlanNodes.thrift | 8 + gensrc/thrift/Types.thrift | 3 +- .../tvf/test_hms_partitions_tvf.out | 8 + .../tvf/test_partitions_tvf.out | 23 ++ .../tvf/test_hms_partitions_tvf.groovy | 46 ++++ .../tvf/test_partitions_tvf.groovy | 78 ++++++ 17 files changed, 673 insertions(+), 16 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Partitions.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionsTableValuedFunction.java create mode 100644 regression-test/data/external_table_p0/tvf/test_hms_partitions_tvf.out create mode 100644 regression-test/data/external_table_p0/tvf/test_partitions_tvf.out create mode 100644 regression-test/suites/external_table_p0/tvf/test_hms_partitions_tvf.groovy create mode 100644 regression-test/suites/external_table_p0/tvf/test_partitions_tvf.groovy diff --git a/be/src/vec/exec/scan/vmeta_scanner.cpp b/be/src/vec/exec/scan/vmeta_scanner.cpp index 2680b2679d8380..02cf15e1af3871 100644 --- a/be/src/vec/exec/scan/vmeta_scanner.cpp +++ b/be/src/vec/exec/scan/vmeta_scanner.cpp @@ -243,6 +243,9 @@ Status VMetaScanner::_fetch_metadata(const TMetaScanRange& meta_scan_range) { case TMetadataType::MATERIALIZED_VIEWS: RETURN_IF_ERROR(_build_materialized_views_metadata_request(meta_scan_range, &request)); break; + case TMetadataType::PARTITIONS: + RETURN_IF_ERROR(_build_partitions_metadata_request(meta_scan_range, &request)); + break; case TMetadataType::JOBS: RETURN_IF_ERROR(_build_jobs_metadata_request(meta_scan_range, &request)); break; @@ -411,6 +414,26 @@ Status VMetaScanner::_build_materialized_views_metadata_request( return Status::OK(); } +Status VMetaScanner::_build_partitions_metadata_request(const TMetaScanRange& meta_scan_range, + TFetchSchemaTableDataRequest* request) { + VLOG_CRITICAL << "VMetaScanner::_build_partitions_metadata_request"; + if (!meta_scan_range.__isset.partitions_params) { + return Status::InternalError( + "Can not find TPartitionsMetadataParams from meta_scan_range."); + } + + // create request + request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE); + + // create TMetadataTableRequestParams + TMetadataTableRequestParams metadata_table_params; + metadata_table_params.__set_metadata_type(TMetadataType::PARTITIONS); + metadata_table_params.__set_partitions_metadata_params(meta_scan_range.partitions_params); + + request->__set_metada_table_params(metadata_table_params); + return Status::OK(); +} + Status VMetaScanner::_build_jobs_metadata_request(const TMetaScanRange& meta_scan_range, TFetchSchemaTableDataRequest* request) { VLOG_CRITICAL << "VMetaScanner::_build_jobs_metadata_request"; diff --git a/be/src/vec/exec/scan/vmeta_scanner.h b/be/src/vec/exec/scan/vmeta_scanner.h index 518f42ffc1c697..8f952661b35b0d 100644 --- a/be/src/vec/exec/scan/vmeta_scanner.h +++ b/be/src/vec/exec/scan/vmeta_scanner.h @@ -85,6 +85,8 @@ class VMetaScanner : public VScanner { TFetchSchemaTableDataRequest* request); Status _build_materialized_views_metadata_request(const TMetaScanRange& meta_scan_range, TFetchSchemaTableDataRequest* request); + Status _build_partitions_metadata_request(const TMetaScanRange& meta_scan_range, + TFetchSchemaTableDataRequest* request); Status _build_jobs_metadata_request(const TMetaScanRange& meta_scan_range, TFetchSchemaTableDataRequest* request); Status _build_tasks_metadata_request(const TMetaScanRange& meta_scan_range, diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java index 3becd2e102bf81..db66c260e5670c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java @@ -29,6 +29,7 @@ import org.apache.doris.nereids.trees.expressions.functions.table.Local; import org.apache.doris.nereids.trees.expressions.functions.table.MvInfos; import org.apache.doris.nereids.trees.expressions.functions.table.Numbers; +import org.apache.doris.nereids.trees.expressions.functions.table.Partitions; import org.apache.doris.nereids.trees.expressions.functions.table.Query; import org.apache.doris.nereids.trees.expressions.functions.table.S3; import org.apache.doris.nereids.trees.expressions.functions.table.Tasks; @@ -55,6 +56,7 @@ public class BuiltinTableValuedFunctions implements FunctionHelper { tableValued(Numbers.class, "numbers"), tableValued(S3.class, "s3"), tableValued(MvInfos.class, "mv_infos"), + tableValued(Partitions.class, "partitions"), tableValued(Jobs.class, "jobs"), tableValued(Tasks.class, "tasks"), tableValued(Query.class, "query") diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java index 19e2ac6b6bbd35..bfca561c8c5aa2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java @@ -46,6 +46,8 @@ import org.apache.doris.common.util.OrderByPair; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.mtmv.MTMVPartitionUtil; +import org.apache.doris.thrift.TCell; +import org.apache.doris.thrift.TRow; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -222,12 +224,22 @@ public BaseProcResult getBasicProcResult(List> partitionInfos) } private List> getPartitionInfos() throws AnalysisException { + List, TRow>> partitionInfosInrernal = getPartitionInfosInrernal(); + return partitionInfosInrernal.stream().map(pair -> pair.first).collect(Collectors.toList()); + } + + public List getPartitionInfosForTvf() throws AnalysisException { + List, TRow>> partitionInfosInrernal = getPartitionInfosInrernal(); + return partitionInfosInrernal.stream().map(pair -> pair.second).collect(Collectors.toList()); + } + + private List, TRow>> getPartitionInfosInrernal() throws AnalysisException { Preconditions.checkNotNull(db); Preconditions.checkNotNull(olapTable); Preconditions.checkState(olapTable.isManagedTable()); // get info - List> partitionInfos = new ArrayList>(); + List, TRow>> partitionInfos = new ArrayList, TRow>>(); olapTable.readLock(); try { List partitionIds; @@ -259,13 +271,19 @@ private List> getPartitionInfos() throws AnalysisException { Partition partition = olapTable.getPartition(partitionId); List partitionInfo = new ArrayList(); + TRow trow = new TRow(); String partitionName = partition.getName(); partitionInfo.add(partitionId); + trow.addToColumnValue(new TCell().setLongVal(partitionId)); partitionInfo.add(partitionName); + trow.addToColumnValue(new TCell().setStringVal(partitionName)); partitionInfo.add(partition.getVisibleVersion()); - partitionInfo.add(TimeUtils.longToTimeString(partition.getVisibleVersionTime())); + trow.addToColumnValue(new TCell().setLongVal(partition.getVisibleVersion())); + String visibleTime = TimeUtils.longToTimeString(partition.getVisibleVersionTime()); + partitionInfo.add(visibleTime); + trow.addToColumnValue(new TCell().setStringVal(visibleTime)); partitionInfo.add(partition.getState()); - + trow.addToColumnValue(new TCell().setStringVal(partition.getState().toString())); if (tblPartitionInfo.getType() == PartitionType.RANGE || tblPartitionInfo.getType() == PartitionType.LIST) { List partitionColumns = tblPartitionInfo.getPartitionColumns(); @@ -273,11 +291,17 @@ private List> getPartitionInfos() throws AnalysisException { for (Column column : partitionColumns) { colNames.add(column.getName()); } - partitionInfo.add(joiner.join(colNames)); - partitionInfo.add(tblPartitionInfo.getItem(partitionId).getItems().toString()); + String colNamesStr = joiner.join(colNames); + partitionInfo.add(colNamesStr); + trow.addToColumnValue(new TCell().setStringVal(colNamesStr)); + String itemStr = tblPartitionInfo.getItem(partitionId).getItems().toString(); + partitionInfo.add(itemStr); + trow.addToColumnValue(new TCell().setStringVal(itemStr)); } else { partitionInfo.add(""); + trow.addToColumnValue(new TCell().setStringVal("")); partitionInfo.add(""); + trow.addToColumnValue(new TCell().setStringVal("")); } // distribution @@ -293,47 +317,70 @@ private List> getPartitionInfos() throws AnalysisException { sb.append(distributionColumns.get(i).getName()); } partitionInfo.add(sb.toString()); + trow.addToColumnValue(new TCell().setStringVal(sb.toString())); } else { partitionInfo.add("RANDOM"); + trow.addToColumnValue(new TCell().setStringVal("RANDOM")); } partitionInfo.add(distributionInfo.getBucketNum()); + trow.addToColumnValue(new TCell().setIntVal(distributionInfo.getBucketNum())); // replica num - partitionInfo.add(tblPartitionInfo.getReplicaAllocation(partitionId).getTotalReplicaNum()); + short totalReplicaNum = tblPartitionInfo.getReplicaAllocation(partitionId).getTotalReplicaNum(); + partitionInfo.add(totalReplicaNum); + trow.addToColumnValue(new TCell().setIntVal(totalReplicaNum)); DataProperty dataProperty = tblPartitionInfo.getDataProperty(partitionId); partitionInfo.add(dataProperty.getStorageMedium().name()); - partitionInfo.add(TimeUtils.longToTimeString(dataProperty.getCooldownTimeMs())); + trow.addToColumnValue(new TCell().setStringVal(dataProperty.getStorageMedium().name())); + String cooldownTimeStr = TimeUtils.longToTimeString(dataProperty.getCooldownTimeMs()); + partitionInfo.add(cooldownTimeStr); + trow.addToColumnValue(new TCell().setStringVal(cooldownTimeStr)); partitionInfo.add(dataProperty.getStoragePolicy()); - - partitionInfo.add(TimeUtils.longToTimeString(partition.getLastCheckTime())); - + trow.addToColumnValue(new TCell().setStringVal(dataProperty.getStoragePolicy())); + String lastCheckTime = TimeUtils.longToTimeString(partition.getLastCheckTime()); + partitionInfo.add(lastCheckTime); + trow.addToColumnValue(new TCell().setStringVal(lastCheckTime)); long dataSize = partition.getDataSize(false); Pair sizePair = DebugUtil.getByteUint(dataSize); String readableSize = DebugUtil.DECIMAL_FORMAT_SCALE_3.format(sizePair.first) + " " + sizePair.second; partitionInfo.add(readableSize); - partitionInfo.add(tblPartitionInfo.getIsInMemory(partitionId)); + trow.addToColumnValue(new TCell().setStringVal(readableSize)); + boolean isInMemory = tblPartitionInfo.getIsInMemory(partitionId); + partitionInfo.add(isInMemory); + trow.addToColumnValue(new TCell().setBoolVal(isInMemory)); // replica allocation - partitionInfo.add(tblPartitionInfo.getReplicaAllocation(partitionId).toCreateStmt()); + String replica = tblPartitionInfo.getReplicaAllocation(partitionId).toCreateStmt(); + partitionInfo.add(replica); + trow.addToColumnValue(new TCell().setStringVal(replica)); - partitionInfo.add(tblPartitionInfo.getIsMutable(partitionId)); + boolean isMutable = tblPartitionInfo.getIsMutable(partitionId); + partitionInfo.add(isMutable); + trow.addToColumnValue(new TCell().setBoolVal(isMutable)); if (olapTable instanceof MTMV) { if (StringUtils.isEmpty(mtmvPartitionSyncErrorMsg)) { List partitionUnSyncTables = partitionsUnSyncTables.getOrDefault(partitionId, Lists.newArrayList()); - partitionInfo.add(CollectionUtils.isEmpty(partitionUnSyncTables)); + boolean isSync = CollectionUtils.isEmpty(partitionUnSyncTables); + partitionInfo.add(isSync); + trow.addToColumnValue(new TCell().setBoolVal(isSync)); partitionInfo.add(partitionUnSyncTables.toString()); + trow.addToColumnValue(new TCell().setStringVal(partitionUnSyncTables.toString())); } else { partitionInfo.add(false); + trow.addToColumnValue(new TCell().setBoolVal(false)); partitionInfo.add(mtmvPartitionSyncErrorMsg); + trow.addToColumnValue(new TCell().setStringVal(mtmvPartitionSyncErrorMsg)); } } else { partitionInfo.add(true); + trow.addToColumnValue(new TCell().setBoolVal(true)); partitionInfo.add(FeConstants.null_string); + trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); } - partitionInfos.add(partitionInfo); + partitionInfos.add(Pair.of(partitionInfo, trow)); } } finally { olapTable.readUnlock(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Partitions.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Partitions.java new file mode 100644 index 00000000000000..6dcd31a9651b86 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Partitions.java @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.expressions.functions.table; + +import org.apache.doris.catalog.FunctionSignature; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.expressions.Properties; +import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; +import org.apache.doris.nereids.types.coercion.AnyDataType; +import org.apache.doris.tablefunction.PartitionsTableValuedFunction; +import org.apache.doris.tablefunction.TableValuedFunctionIf; + +import java.util.Map; + +/** + * partitions + */ +public class Partitions extends TableValuedFunction { + public Partitions(Properties properties) { + super("partitions", properties); + } + + @Override + public FunctionSignature customSignature() { + return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes()); + } + + @Override + protected TableValuedFunctionIf toCatalogFunction() { + try { + Map arguments = getTVFProperties().getMap(); + return new PartitionsTableValuedFunction(arguments); + } catch (Throwable t) { + throw new AnalysisException("Can not build PartitionsTableValuedFunction by " + + this + ": " + t.getMessage(), t); + } + } + + @Override + public R accept(ExpressionVisitor visitor, C context) { + return visitor.visitPartitions(this, context); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java index bd09a81b0114f4..ca14edd87def39 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java @@ -29,6 +29,7 @@ import org.apache.doris.nereids.trees.expressions.functions.table.Local; import org.apache.doris.nereids.trees.expressions.functions.table.MvInfos; import org.apache.doris.nereids.trees.expressions.functions.table.Numbers; +import org.apache.doris.nereids.trees.expressions.functions.table.Partitions; import org.apache.doris.nereids.trees.expressions.functions.table.Query; import org.apache.doris.nereids.trees.expressions.functions.table.S3; import org.apache.doris.nereids.trees.expressions.functions.table.TableValuedFunction; @@ -54,6 +55,10 @@ default R visitMvInfos(MvInfos mvInfos, C context) { return visitTableValuedFunction(mvInfos, context); } + default R visitPartitions(Partitions partitions, C context) { + return visitTableValuedFunction(partitions, context); + } + default R visitJobs(Jobs jobs, C context) { return visitTableValuedFunction(jobs, context); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java index 42b76d3c05706e..d7ae1bd64f7389 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java @@ -19,21 +19,27 @@ import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Database; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MTMV; +import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.SchemaTable; import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.TableIf; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ClientPool; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.proc.FrontendsProcNode; +import org.apache.doris.common.proc.PartitionsProcDir; import org.apache.doris.common.util.NetUtils; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergMetadataCache; +import org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog; import org.apache.doris.job.common.JobType; import org.apache.doris.job.extensions.mtmv.MTMVJob; import org.apache.doris.job.task.AbstractTask; @@ -61,6 +67,7 @@ import org.apache.doris.thrift.TMetadataTableRequestParams; import org.apache.doris.thrift.TMetadataType; import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TPartitionsMetadataParams; import org.apache.doris.thrift.TPipelineWorkloadGroup; import org.apache.doris.thrift.TRow; import org.apache.doris.thrift.TSchemaTableRequestParams; @@ -160,6 +167,9 @@ public static TFetchSchemaTableDataResult getMetadataTable(TFetchSchemaTableData case MATERIALIZED_VIEWS: result = mtmvMetadataResult(params); break; + case PARTITIONS: + result = partitionMetadataResult(params); + break; case JOBS: result = jobMetadataResult(params); break; @@ -756,6 +766,104 @@ private static TFetchSchemaTableDataResult mtmvMetadataResult(TMetadataTableRequ return result; } + private static TFetchSchemaTableDataResult partitionMetadataResult(TMetadataTableRequestParams params) { + if (LOG.isDebugEnabled()) { + LOG.debug("partitionMetadataResult() start"); + } + if (!params.isSetPartitionsMetadataParams()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Partitions metadata params is not set."); + } + return errorResult("Partitions metadata params is not set."); + } + + TPartitionsMetadataParams partitionsMetadataParams = params.getPartitionsMetadataParams(); + String catalogName = partitionsMetadataParams.getCatalog(); + if (LOG.isDebugEnabled()) { + LOG.debug("catalogName: " + catalogName); + } + String dbName = partitionsMetadataParams.getDatabase(); + if (LOG.isDebugEnabled()) { + LOG.debug("dbName: " + dbName); + } + String tableName = partitionsMetadataParams.getTable(); + if (LOG.isDebugEnabled()) { + LOG.debug("tableName: " + tableName); + } + + CatalogIf catalog; + TableIf table; + DatabaseIf db; + try { + catalog = Env.getCurrentEnv().getCatalogMgr() + .getCatalogOrAnalysisException(catalogName); + db = catalog.getDbOrAnalysisException(dbName); + table = db.getTableOrAnalysisException(tableName); + } catch (AnalysisException e) { + LOG.warn(e.getMessage()); + return errorResult(e.getMessage()); + } + + if (catalog instanceof InternalCatalog) { + return dealInternalCatalog((Database) db, table); + } else if (catalog instanceof MaxComputeExternalCatalog) { + return dealMaxComputeCatalog((MaxComputeExternalCatalog) catalog, dbName, tableName); + } else if (catalog instanceof HMSExternalCatalog) { + return dealHMSCatalog((HMSExternalCatalog) catalog, dbName, tableName); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("partitionMetadataResult() end"); + } + return errorResult("not support catalog: " + catalogName); + } + + private static TFetchSchemaTableDataResult dealHMSCatalog(HMSExternalCatalog catalog, String dbName, + String tableName) { + List dataBatch = Lists.newArrayList(); + List partitionNames = catalog.getClient().listPartitionNames(dbName, tableName); + for (String partition : partitionNames) { + TRow trow = new TRow(); + trow.addToColumnValue(new TCell().setStringVal(partition)); + dataBatch.add(trow); + } + TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult(); + result.setDataBatch(dataBatch); + result.setStatus(new TStatus(TStatusCode.OK)); + return result; + } + + private static TFetchSchemaTableDataResult dealMaxComputeCatalog(MaxComputeExternalCatalog catalog, String dbName, + String tableName) { + List dataBatch = Lists.newArrayList(); + List partitionNames = catalog.listPartitionNames(dbName, tableName); + for (String partition : partitionNames) { + TRow trow = new TRow(); + trow.addToColumnValue(new TCell().setStringVal(partition)); + dataBatch.add(trow); + } + TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult(); + result.setDataBatch(dataBatch); + result.setStatus(new TStatus(TStatusCode.OK)); + return result; + } + + private static TFetchSchemaTableDataResult dealInternalCatalog(Database db, TableIf table) { + TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult(); + if (!(table instanceof OlapTable)) { + return errorResult("not olap table"); + } + PartitionsProcDir dir = new PartitionsProcDir(db, (OlapTable) table, false); + try { + List dataBatch = dir.getPartitionInfosForTvf(); + result.setDataBatch(dataBatch); + } catch (AnalysisException e) { + return errorResult(e.getMessage()); + } + result.setStatus(new TStatus(TStatusCode.OK)); + return result; + } + private static TFetchSchemaTableDataResult jobMetadataResult(TMetadataTableRequestParams params) { if (!params.isSetJobsMetadataParams()) { return errorResult("Jobs metadata params is not set."); diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java index af37bcd10e4c15..a7e25bc7f82445 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java @@ -44,6 +44,8 @@ public static Integer getColumnIndexFromColumnName(TMetadataType type, String co return CatalogsTableValuedFunction.getColumnIndexFromColumnName(columnName); case MATERIALIZED_VIEWS: return MvInfosTableValuedFunction.getColumnIndexFromColumnName(columnName); + case PARTITIONS: + return PartitionsTableValuedFunction.getColumnIndexFromColumnName(columnName, params); case JOBS: return JobsTableValuedFunction.getColumnIndexFromColumnName(columnName, params); case TASKS: diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionsTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionsTableValuedFunction.java new file mode 100644 index 00000000000000..1ceddeb89cf4a8 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionsTableValuedFunction.java @@ -0,0 +1,243 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.TableIf.TableType; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.datasource.hive.HMSExternalCatalog; +import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog; +import org.apache.doris.datasource.maxcompute.MaxComputeExternalTable; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.thrift.TMetaScanRange; +import org.apache.doris.thrift.TMetadataTableRequestParams; +import org.apache.doris.thrift.TMetadataType; +import org.apache.doris.thrift.TPartitionsMetadataParams; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * The Implement of table valued function + * partitions("database" = "db1","table" = "table1"). + */ +public class PartitionsTableValuedFunction extends MetadataTableValuedFunction { + private static final Logger LOG = LogManager.getLogger(PartitionsTableValuedFunction.class); + + public static final String NAME = "partitions"; + + private static final String CATALOG = "catalog"; + private static final String DB = "database"; + private static final String TABLE = "table"; + + private static final ImmutableSet PROPERTIES_SET = ImmutableSet.of(CATALOG, DB, TABLE); + + private static final ImmutableList SCHEMA_FOR_OLAP_TABLE = ImmutableList.of( + new Column("PartitionId", ScalarType.createType(PrimitiveType.BIGINT)), + new Column("PartitionName", ScalarType.createStringType()), + new Column("VisibleVersion", ScalarType.createType(PrimitiveType.BIGINT)), + new Column("VisibleVersionTime", ScalarType.createStringType()), + new Column("State", ScalarType.createStringType()), + new Column("PartitionKey", ScalarType.createStringType()), + new Column("Range", ScalarType.createStringType()), + new Column("DistributionKey", ScalarType.createStringType()), + new Column("Buckets", ScalarType.createType(PrimitiveType.INT)), + new Column("ReplicationNum", ScalarType.createType(PrimitiveType.INT)), + new Column("StorageMedium", ScalarType.createStringType()), + new Column("CooldownTime", ScalarType.createStringType()), + new Column("RemoteStoragePolicy", ScalarType.createStringType()), + new Column("LastConsistencyCheckTime", ScalarType.createStringType()), + new Column("DataSize", ScalarType.createStringType()), + new Column("IsInMemory", ScalarType.createType(PrimitiveType.BOOLEAN)), + new Column("ReplicaAllocation", ScalarType.createStringType()), + new Column("IsMutable", ScalarType.createType(PrimitiveType.BOOLEAN)), + new Column("SyncWithBaseTables", ScalarType.createType(PrimitiveType.BOOLEAN)), + new Column("UnsyncTables", ScalarType.createStringType())); + + private static final ImmutableList SCHEMA_FOR_EXTERNAL_TABLE = ImmutableList.of( + new Column("Partition", ScalarType.createStringType())); + + private static final ImmutableMap OLAP_TABLE_COLUMN_TO_INDEX; + private static final ImmutableMap EXTERNAL_TABLE_COLUMN_TO_INDEX; + + static { + ImmutableMap.Builder builder = new ImmutableMap.Builder(); + for (int i = 0; i < SCHEMA_FOR_OLAP_TABLE.size(); i++) { + builder.put(SCHEMA_FOR_OLAP_TABLE.get(i).getName().toLowerCase(), i); + } + OLAP_TABLE_COLUMN_TO_INDEX = builder.build(); + + ImmutableMap.Builder otherBuilder = new ImmutableMap.Builder(); + for (int i = 0; i < SCHEMA_FOR_EXTERNAL_TABLE.size(); i++) { + otherBuilder.put(SCHEMA_FOR_EXTERNAL_TABLE.get(i).getName().toLowerCase(), i); + } + EXTERNAL_TABLE_COLUMN_TO_INDEX = otherBuilder.build(); + } + + public static Integer getColumnIndexFromColumnName(String columnName, TMetadataTableRequestParams params) + throws org.apache.doris.common.AnalysisException { + if (!params.isSetPartitionsMetadataParams()) { + throw new org.apache.doris.common.AnalysisException("Partitions metadata params is not set."); + } + TPartitionsMetadataParams partitionsMetadataParams = params.getPartitionsMetadataParams(); + String catalogName = partitionsMetadataParams.getCatalog(); + if (InternalCatalog.INTERNAL_CATALOG_NAME.equals(catalogName)) { + return OLAP_TABLE_COLUMN_TO_INDEX.get(columnName.toLowerCase()); + } else { + return EXTERNAL_TABLE_COLUMN_TO_INDEX.get(columnName.toLowerCase()); + } + } + + private final String catalogName; + private final String databaseName; + private final String tableName; + + public PartitionsTableValuedFunction(Map params) throws AnalysisException { + if (LOG.isDebugEnabled()) { + LOG.debug("PartitionsTableValuedFunction() start"); + } + Map validParams = Maps.newHashMap(); + for (String key : params.keySet()) { + if (!PROPERTIES_SET.contains(key.toLowerCase())) { + throw new AnalysisException("'" + key + "' is invalid property"); + } + // check ctl, db, tbl + validParams.put(key.toLowerCase(), params.get(key)); + } + String catalogName = validParams.get(CATALOG); + String dbName = validParams.get(DB); + String tableName = validParams.get(TABLE); + if (StringUtils.isEmpty(catalogName) || StringUtils.isEmpty(dbName) || StringUtils.isEmpty(tableName)) { + throw new AnalysisException("Invalid partitions metadata query"); + } + analyze(catalogName, dbName, tableName); + this.catalogName = catalogName; + this.databaseName = dbName; + this.tableName = tableName; + if (LOG.isDebugEnabled()) { + LOG.debug("PartitionsTableValuedFunction() end"); + } + } + + private void analyze(String catalogName, String dbName, String tableName) { + if (!Env.getCurrentEnv().getAccessManager() + .checkTblPriv(ConnectContext.get(), catalogName, dbName, + tableName, PrivPredicate.SHOW)) { + String message = ErrorCode.ERR_TABLEACCESS_DENIED_ERROR.formatErrorMsg("SHOW PARTITIONS", + ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(), + catalogName + ": " + dbName + ": " + tableName); + throw new AnalysisException(message); + } + CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName); + if (catalog == null) { + throw new AnalysisException("can not find catalog: " + catalogName); + } + // disallow unsupported catalog + if (!(catalog.isInternalCatalog() || catalog instanceof HMSExternalCatalog + || catalog instanceof MaxComputeExternalCatalog)) { + throw new AnalysisException(String.format("Catalog of type '%s' is not allowed in ShowPartitionsStmt", + catalog.getType())); + } + + Optional db = catalog.getDb(dbName); + if (!db.isPresent()) { + throw new AnalysisException("can not find database: " + dbName); + } + TableIf table = null; + try { + table = db.get().getTableOrMetaException(tableName, TableType.OLAP, + TableType.HMS_EXTERNAL_TABLE, TableType.MAX_COMPUTE_EXTERNAL_TABLE); + } catch (MetaNotFoundException e) { + throw new AnalysisException(e.getMessage(), e); + } + + if (table instanceof HMSExternalTable) { + if (((HMSExternalTable) table).isView()) { + throw new AnalysisException("Table " + tableName + " is not a partitioned table"); + } + if (CollectionUtils.isEmpty(((HMSExternalTable) table).getPartitionColumns())) { + throw new AnalysisException("Table " + tableName + " is not a partitioned table"); + } + return; + } + + if (table instanceof MaxComputeExternalTable) { + if (((MaxComputeExternalTable) table).getOdpsTable().getPartitions().isEmpty()) { + throw new AnalysisException("Table " + tableName + " is not a partitioned table"); + } + } + } + + @Override + public TMetadataType getMetadataType() { + return TMetadataType.PARTITIONS; + } + + @Override + public TMetaScanRange getMetaScanRange() { + if (LOG.isDebugEnabled()) { + LOG.debug("getMetaScanRange() start"); + } + TMetaScanRange metaScanRange = new TMetaScanRange(); + metaScanRange.setMetadataType(TMetadataType.PARTITIONS); + TPartitionsMetadataParams partitionParam = new TPartitionsMetadataParams(); + partitionParam.setCatalog(catalogName); + partitionParam.setDatabase(databaseName); + partitionParam.setTable(tableName); + metaScanRange.setPartitionsParams(partitionParam); + if (LOG.isDebugEnabled()) { + LOG.debug("getMetaScanRange() end"); + } + return metaScanRange; + } + + @Override + public String getTableName() { + return "PartitionsTableValuedFunction"; + } + + @Override + public List getTableColumns() throws AnalysisException { + if (InternalCatalog.INTERNAL_CATALOG_NAME.equals(catalogName)) { + return SCHEMA_FOR_OLAP_TABLE; + } else { + return SCHEMA_FOR_EXTERNAL_TABLE; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java index 579ae168c4e476..6b6fda088a8a04 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java @@ -67,6 +67,8 @@ public static TableValuedFunctionIf getTableFunction(String funcName, Map> res = sql """ select * from partitions('catalog'='internal',"database"="${dbName}","table"="${tableName}"); """ + logger.info("res: " + res.toString()) + + assertEquals(1, res.size()); + // PartitionName + assertEquals("p1", res[0][1]); + // State + assertEquals("NORMAL", res[0][4]); + // PartitionKey + assertEquals("k3", res[0][5]); + // Buckets + assertEquals(2, res[0][8]); + // ReplicationNum + assertEquals(1, res[0][9]); + // StorageMedium + assertEquals("HDD", res[0][10]); + // ReplicaAllocation + assertEquals("tag.location.default: 1", res[0][16]); + // IsMutable + assertEquals(true, res[0][17]); + // SyncWithBaseTables + assertEquals(true, res[0][18]); + + + // test exception + test { + sql """ select * from partitions("catalog"="internal","database"="${dbName}","table"="xxx"); """ + // check exception + exception "xxx" + } + test { + sql """ select * from partitions("database"="${dbName}"); """ + // check exception + exception "Invalid" + } + + sql """drop table if exists `${tableName}`""" +} From 63d95ffe5bcabcf5fb8f57d935063101dbb050a7 Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Fri, 5 Jul 2024 09:57:59 +0800 Subject: [PATCH 2/2] [fix](case)Fix case failure caused by different pipeline configurations (#37201) - force_olap_table_replication_num will force a change in the number of copies of the table, resulting in a case failure - other user login to information_schema db, avoiding unauthorized access --- .../suites/external_table_p0/tvf/test_partitions_tvf.groovy | 4 ++-- .../external_table_p0/tvf/test_s3_tvf_with_resource.groovy | 6 ++++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/regression-test/suites/external_table_p0/tvf/test_partitions_tvf.groovy b/regression-test/suites/external_table_p0/tvf/test_partitions_tvf.groovy index f61ccf6d224b87..0939528c3148ad 100644 --- a/regression-test/suites/external_table_p0/tvf/test_partitions_tvf.groovy +++ b/regression-test/suites/external_table_p0/tvf/test_partitions_tvf.groovy @@ -50,8 +50,8 @@ suite("test_partitions_tvf","p0,external,tvf,external_docker") { assertEquals("k3", res[0][5]); // Buckets assertEquals(2, res[0][8]); - // ReplicationNum - assertEquals(1, res[0][9]); + // ReplicationNum: if force_olap_table_replication_num is set to 3,here will be 3 + // assertEquals(1, res[0][9]); // StorageMedium assertEquals("HDD", res[0][10]); // ReplicaAllocation diff --git a/regression-test/suites/external_table_p0/tvf/test_s3_tvf_with_resource.groovy b/regression-test/suites/external_table_p0/tvf/test_s3_tvf_with_resource.groovy index 2477ee1142340d..7a8912025ff11a 100644 --- a/regression-test/suites/external_table_p0/tvf/test_s3_tvf_with_resource.groovy +++ b/regression-test/suites/external_table_p0/tvf/test_s3_tvf_with_resource.groovy @@ -180,6 +180,8 @@ suite("test_s3_tvf_with_resource", "p0") { } // test auth + def tokens = context.config.jdbcUrl.split('/') + def url=tokens[0] + "//" + tokens[2] + "/" + "information_schema" + "?" String user = 'test_s3_tvf_with_resource_user' String pwd = 'C123_567p' String viewName = "test_s3_tvf_with_resource_view" @@ -198,7 +200,7 @@ suite("test_s3_tvf_with_resource", "p0") { """ // not have usage priv, can not select tvf with resource - connect(user=user, password="${pwd}", url=context.config.jdbcUrl) { + connect(user=user, password="${pwd}", url=url) { test { sql """ SELECT * FROM S3 ( @@ -213,7 +215,7 @@ suite("test_s3_tvf_with_resource", "p0") { } // only have select_priv of view,can select view with resource - connect(user=user, password="${pwd}", url=context.config.jdbcUrl) { + connect(user=user, password="${pwd}", url=url) { sql """SELECT * FROM ${viewName};""" }