From 31fe50efd69fcd44d2d9a37b6b840c8a3c851119 Mon Sep 17 00:00:00 2001 From: slothever Date: Fri, 17 Nov 2023 09:55:28 +0800 Subject: [PATCH 1/7] [fix](multi-catalog)add max compute partition prune --- be/src/runtime/descriptors.cpp | 1 - be/src/runtime/descriptors.h | 2 - .../format/table/max_compute_jni_reader.cpp | 9 +- .../format/table/max_compute_jni_reader.h | 2 + be/src/vec/exec/scan/vfile_scanner.cpp | 7 +- .../doris/analysis/ShowPartitionsStmt.java | 14 +- .../external/MaxComputeExternalTable.java | 164 ++++++++++-------- .../datasource/MaxComputeExternalCatalog.java | 36 +++- .../planner/external/FileQueryScanNode.java | 2 + .../planner/external/MaxComputeScanNode.java | 116 ++++++++++--- .../planner/external/MaxComputeSplit.java | 40 +++++ .../planner/external/TableFormatType.java | 1 + .../hudi/HudiCachedPartitionProcessor.java | 8 +- .../org/apache/doris/qe/ShowExecutor.java | 15 ++ gensrc/thrift/Descriptors.thrift | 1 - gensrc/thrift/PlanNodes.thrift | 4 + 16 files changed, 299 insertions(+), 123 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeSplit.java diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp index b5c23a2afd0a06..721950abbee741 100644 --- a/be/src/runtime/descriptors.cpp +++ b/be/src/runtime/descriptors.cpp @@ -195,7 +195,6 @@ MaxComputeTableDescriptor::MaxComputeTableDescriptor(const TTableDescriptor& tde _table(tdesc.mcTable.table), _access_key(tdesc.mcTable.access_key), _secret_key(tdesc.mcTable.secret_key), - _partition_spec(tdesc.mcTable.partition_spec), _public_access(tdesc.mcTable.public_access) {} MaxComputeTableDescriptor::~MaxComputeTableDescriptor() = default; diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h index fbd233a18d1d7b..03673d29c57bce 100644 --- a/be/src/runtime/descriptors.h +++ b/be/src/runtime/descriptors.h @@ -240,7 +240,6 @@ class MaxComputeTableDescriptor : public TableDescriptor { const std::string table() const { return _table; } const std::string access_key() const { return _access_key; } const std::string secret_key() const { return _secret_key; } - const std::string partition_spec() const { return _partition_spec; } const std::string public_access() const { return _public_access; } private: @@ -249,7 +248,6 @@ class MaxComputeTableDescriptor : public TableDescriptor { std::string _table; std::string _access_key; std::string _secret_key; - std::string _partition_spec; std::string _public_access; }; diff --git a/be/src/vec/exec/format/table/max_compute_jni_reader.cpp b/be/src/vec/exec/format/table/max_compute_jni_reader.cpp index e762bbbbf83abe..2884c413103f1b 100644 --- a/be/src/vec/exec/format/table/max_compute_jni_reader.cpp +++ b/be/src/vec/exec/format/table/max_compute_jni_reader.cpp @@ -38,10 +38,15 @@ class Block; namespace doris::vectorized { MaxComputeJniReader::MaxComputeJniReader(const MaxComputeTableDescriptor* mc_desc, + const TMaxComputeFileDesc& max_compute_params, const std::vector& file_slot_descs, const TFileRangeDesc& range, RuntimeState* state, RuntimeProfile* profile) - : _file_slot_descs(file_slot_descs), _range(range), _state(state), _profile(profile) { + : _max_compute_params(max_compute_params), + _file_slot_descs(file_slot_descs), + _range(range), + _state(state), + _profile(profile) { _table_desc = mc_desc; std::ostringstream required_fields; std::ostringstream columns_types; @@ -64,7 +69,7 @@ MaxComputeJniReader::MaxComputeJniReader(const MaxComputeTableDescriptor* mc_des {"access_key", _table_desc->access_key()}, {"secret_key", _table_desc->secret_key()}, {"project", _table_desc->project()}, - {"partition_spec", _table_desc->partition_spec()}, + {"partition_spec", _max_compute_params.partition_spec}, {"table", _table_desc->table()}, {"public_access", _table_desc->public_access()}, {"start_offset", std::to_string(_range.start_offset)}, diff --git a/be/src/vec/exec/format/table/max_compute_jni_reader.h b/be/src/vec/exec/format/table/max_compute_jni_reader.h index 0b3c809c50243f..403892ef090306 100644 --- a/be/src/vec/exec/format/table/max_compute_jni_reader.h +++ b/be/src/vec/exec/format/table/max_compute_jni_reader.h @@ -54,6 +54,7 @@ class MaxComputeJniReader : public GenericReader { public: MaxComputeJniReader(const MaxComputeTableDescriptor* mc_desc, + const TMaxComputeFileDesc& max_compute_params, const std::vector& file_slot_descs, const TFileRangeDesc& range, RuntimeState* state, RuntimeProfile* profile); @@ -69,6 +70,7 @@ class MaxComputeJniReader : public GenericReader { private: const MaxComputeTableDescriptor* _table_desc; + const TMaxComputeFileDesc& _max_compute_params; const std::vector& _file_slot_descs; const TFileRangeDesc& _range; RuntimeState* _state; diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 7eb93b5984597f..7985abb5d39be6 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -749,13 +749,14 @@ Status VFileScanner::_get_next_reader() { bool need_to_get_parsed_schema = false; switch (format_type) { case TFileFormatType::FORMAT_JNI: { - if (_real_tuple_desc->table_desc()->table_type() == - ::doris::TTableType::type::MAX_COMPUTE_TABLE) { + if (range.__isset.table_format_params && + range.table_format_params.table_format_type == "max_compute") { const MaxComputeTableDescriptor* mc_desc = static_cast( _real_tuple_desc->table_desc()); std::unique_ptr mc_reader = MaxComputeJniReader::create_unique( - mc_desc, _file_slot_descs, range, _state, _profile); + mc_desc, range.table_format_params.max_compute_params, _file_slot_descs, + range, _state, _profile); init_status = mc_reader->init_reader(_colname_to_value_range); _cur_reader = std::move(mc_reader); } else if (range.__isset.table_format_params && diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowPartitionsStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowPartitionsStmt.java index f6e9b06e0b1eea..dc1d360f290ad7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowPartitionsStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowPartitionsStmt.java @@ -26,6 +26,7 @@ import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.catalog.Type; import org.apache.doris.catalog.external.HMSExternalTable; +import org.apache.doris.catalog.external.MaxComputeExternalTable; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; @@ -37,6 +38,7 @@ import org.apache.doris.common.util.OrderByPair; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.HMSExternalCatalog; +import org.apache.doris.datasource.MaxComputeExternalCatalog; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.ShowResultSetMetaData; @@ -126,7 +128,7 @@ public void analyze(Analyzer analyzer) throws UserException { DatabaseIf db = catalog.getDbOrAnalysisException(dbName); TableIf table = db.getTableOrMetaException(tblName, Table.TableType.OLAP, TableType.MATERIALIZED_VIEW, - TableType.HMS_EXTERNAL_TABLE); + TableType.HMS_EXTERNAL_TABLE, TableType.MAX_COMPUTE_EXTERNAL_TABLE); if (table instanceof HMSExternalTable) { if (((HMSExternalTable) table).isView()) { @@ -138,6 +140,13 @@ public void analyze(Analyzer analyzer) throws UserException { return; } + if (table instanceof MaxComputeExternalTable) { + if (((MaxComputeExternalTable) table).getOdpsTable().getPartitions().isEmpty()) { + throw new AnalysisException("Table " + tblName + " is not a partitioned table"); + } + return; + } + table.readLock(); try { // build proc path @@ -170,7 +179,8 @@ public void analyzeImpl(Analyzer analyzer) throws UserException { } // disallow unsupported catalog - if (!(catalog.isInternalCatalog() || catalog instanceof HMSExternalCatalog)) { + if (!(catalog.isInternalCatalog() || catalog instanceof HMSExternalCatalog + || catalog instanceof MaxComputeExternalCatalog)) { throw new AnalysisException(String.format("Catalog of type '%s' is not allowed in ShowPartitionsStmt", catalog.getType())); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java index 3c2f3bada03574..3e887d04ac1ac6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java @@ -17,11 +17,6 @@ package org.apache.doris.catalog.external; -import org.apache.doris.analysis.BinaryPredicate; -import org.apache.doris.analysis.Expr; -import org.apache.doris.analysis.InPredicate; -import org.apache.doris.analysis.Predicate; -import org.apache.doris.analysis.SlotRef; import org.apache.doris.catalog.ArrayType; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.MapType; @@ -29,7 +24,10 @@ import org.apache.doris.catalog.StructField; import org.apache.doris.catalog.StructType; import org.apache.doris.catalog.Type; +import org.apache.doris.common.Config; import org.apache.doris.datasource.MaxComputeExternalCatalog; +import org.apache.doris.planner.external.TablePartitionValues; +import org.apache.doris.planner.external.TablePartitionValues.TablePartitionKey; import org.apache.doris.thrift.TMCTable; import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; @@ -43,26 +41,36 @@ import com.aliyun.odps.type.StructTypeInfo; import com.aliyun.odps.type.TypeInfo; import com.aliyun.odps.type.VarcharTypeInfo; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; -import java.util.StringJoiner; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * MaxCompute external table. */ public class MaxComputeExternalTable extends ExternalTable { + private static final Cache partitionValuesCache; private Table odpsTable; - private Set partitionKeys; - private String partitionSpec; + private List partitionSpecs; + private Map partitionNameToColumns; + private List partitionTypes; + + static { + partitionValuesCache = CacheBuilder.newBuilder().maximumSize(Config.max_hive_partition_cache_num) + .expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES) + .build(); + } public MaxComputeExternalTable(long id, String name, String dbName, MaxComputeExternalCatalog catalog) { super(id, name, catalog, dbName, TableType.MAX_COMPUTE_EXTERNAL_TABLE); @@ -77,8 +85,59 @@ protected synchronized void makeSureInitialized() { } } + @Override + public Set getPartitionNames() { + makeSureInitialized(); + return partitionNameToColumns.keySet(); + } + + public List getPartitionColumns() { + makeSureInitialized(); + return new ArrayList<>(partitionNameToColumns.values()); + } + + public TablePartitionValues getPartitionValues() { + makeSureInitialized(); + // Make sure to call it after initSchema() completes + String projectName = odpsTable.getProject(); + String tableName = odpsTable.getName(); + TablePartitionKey tablePartitionKey = new TablePartitionKey(projectName, tableName, partitionTypes); + try { + return partitionValuesCache.get(tablePartitionKey, () -> { + TablePartitionValues partitionValues = new TablePartitionValues(); + partitionValues.addPartitions(partitionSpecs, + partitionSpecs.stream() + .map(p -> parsePartitionValues(new ArrayList<>(getPartitionNames()), p)) + .collect(Collectors.toList()), + partitionTypes); + return partitionValues; + }); + } catch (ExecutionException e) { + throw new RuntimeException("Fail to load partition values for table:" + + " '" + projectName + "." + tableName + "'"); + } + } + + private static List parsePartitionValues(List partitionColumns, String partitionPath) { + String[] partitionFragments = partitionPath.split("/"); + if (partitionFragments.length != partitionColumns.size()) { + throw new RuntimeException("Failed to parse partition values of path: " + partitionPath); + } + List partitionValues = new ArrayList<>(partitionFragments.length); + for (int i = 0; i < partitionFragments.length; i++) { + String prefix = partitionColumns.get(i) + "="; + if (partitionFragments[i].startsWith(prefix)) { + partitionValues.add(partitionFragments[i].substring(prefix.length())); + } else { + partitionValues.add(partitionFragments[i]); + } + } + return partitionValues; + } + @Override public List initSchema() { + // this method will be called at semantic parsing. makeSureInitialized(); List columns = odpsTable.getSchema().getColumns(); List result = Lists.newArrayListWithCapacity(columns.size()); @@ -86,72 +145,31 @@ public List initSchema() { result.add(new Column(field.getName(), mcTypeToDorisType(field.getTypeInfo()), true, null, true, field.getComment(), true, -1)); } - List partitionColumns = odpsTable.getSchema().getPartitionColumns(); - partitionKeys = new HashSet<>(); - for (com.aliyun.odps.Column partColumn : partitionColumns) { - result.add(new Column(partColumn.getName(), mcTypeToDorisType(partColumn.getTypeInfo()), true, null, - true, partColumn.getComment(), true, -1)); - partitionKeys.add(partColumn.getName()); - } + initTablePartitions(); + result.addAll(partitionNameToColumns.values()); return result; } - public Optional getPartitionSpec(List conjuncts) { - if (!partitionKeys.isEmpty()) { - if (conjuncts.isEmpty()) { - throw new IllegalArgumentException("Max Compute partition table need partition predicate."); - } - // recreate partitionSpec when conjuncts is changed. - List partitionConjuncts = parsePartitionConjuncts(conjuncts, partitionKeys); - StringJoiner partitionSpec = new StringJoiner(","); - partitionConjuncts.forEach(partitionSpec::add); - this.partitionSpec = partitionSpec.toString(); - return Optional.of(this.partitionSpec); - } - return Optional.empty(); - } - - private static List parsePartitionConjuncts(List conjuncts, Set partitionKeys) { - List partitionConjuncts = new ArrayList<>(); - Set predicates = Sets.newHashSet(); - for (Expr conjunct : conjuncts) { - // collect depart predicate - conjunct.collect(BinaryPredicate.class, predicates); - conjunct.collect(InPredicate.class, predicates); - } - Map slotToConjuncts = new HashMap<>(); - for (Predicate predicate : predicates) { - List slotRefs = new ArrayList<>(); - if (predicate instanceof BinaryPredicate) { - if (((BinaryPredicate) predicate).getOp() != BinaryPredicate.Operator.EQ) { - // max compute only support the EQ operator: pt='pt-value' - continue; - } - // BinaryPredicate has one left slotRef, and partition value not slotRef - predicate.collect(SlotRef.class, slotRefs); - slotToConjuncts.put(slotRefs.get(0).getColumnName(), predicate); - } else if (predicate instanceof InPredicate) { - predicate.collect(SlotRef.class, slotRefs); - slotToConjuncts.put(slotRefs.get(0).getColumnName(), predicate); - } + private void initTablePartitions() { + List partitionColumns = odpsTable.getSchema().getPartitionColumns(); + if (!partitionColumns.isEmpty()) { + partitionSpecs = odpsTable.getPartitions().stream() + .map(e -> e.getPartitionSpec().toString(false, true)) + .collect(Collectors.toList()); + } else { + partitionSpecs = ImmutableList.of(); } - for (String partitionKey : partitionKeys) { - Predicate partitionPredicate = slotToConjuncts.get(partitionKey); - if (partitionPredicate == null) { - continue; - } - if (partitionPredicate instanceof InPredicate) { - List inList = ((InPredicate) partitionPredicate).getListChildren(); - for (Expr expr : inList) { - String partitionConjunct = partitionKey + "=" + expr.toSql(); - partitionConjuncts.add(partitionConjunct.replace("`", "")); - } - } else { - String partitionConjunct = partitionPredicate.toSql(); - partitionConjuncts.add(partitionConjunct.replace("`", "")); - } + partitionNameToColumns = new HashMap<>(); + for (com.aliyun.odps.Column partColumn : partitionColumns) { + Column dorisCol = new Column(partColumn.getName(), + mcTypeToDorisType(partColumn.getTypeInfo()), true, null, + true, partColumn.getComment(), true, -1); + partitionNameToColumns.put(dorisCol.getName(), dorisCol); } - return partitionConjuncts; + partitionTypes = partitionNameToColumns.values() + .stream() + .map(Column::getType) + .collect(Collectors.toList()); } private Type mcTypeToDorisType(TypeInfo typeInfo) { @@ -245,7 +263,6 @@ public TTableDescriptor toThrift() { tMcTable.setRegion(mcCatalog.getRegion()); tMcTable.setAccessKey(mcCatalog.getAccessKey()); tMcTable.setSecretKey(mcCatalog.getSecretKey()); - tMcTable.setPartitionSpec(this.partitionSpec); tMcTable.setPublicAccess(String.valueOf(mcCatalog.enablePublicAccess())); // use mc project as dbName tMcTable.setProject(dbName); @@ -264,6 +281,5 @@ public Table getOdpsTable() { public String getMysqlType() { return "BASE TABLE"; } - } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java index 0cd99678baded3..2b34f939e61c60 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java @@ -36,7 +36,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Optional; +import java.util.stream.Collectors; public class MaxComputeExternalCatalog extends ExternalCatalog { private Odps odps; @@ -95,21 +95,25 @@ protected void initLocalObjectsImpl() { odps.setDefaultProject(defaultProject); } - public long getTotalRows(String project, String table, Optional partitionSpec) throws TunnelException { + public long getTotalRows(String project, String table) throws TunnelException { + return getTableTunnel().getDownloadSession(project, table, null).getRecordCount(); + } + + public long getTotalRows(String project, String table, String partitionSpec) throws TunnelException { + return getTableTunnel() + .getDownloadSession(project, table, new PartitionSpec(partitionSpec), null) + .getRecordCount(); + } + + private TableTunnel getTableTunnel() { makeSureInitialized(); TableTunnel tunnel = new TableTunnel(odps); String tunnelUrl = tunnelUrlTemplate.replace("{}", region); if (enablePublicAccess) { tunnelUrl = tunnelUrl.replace("-inc", ""); } - TableTunnel.DownloadSession downloadSession; tunnel.setEndpoint(tunnelUrl); - if (!partitionSpec.isPresent()) { - downloadSession = tunnel.getDownloadSession(project, table, null); - } else { - downloadSession = tunnel.getDownloadSession(project, table, new PartitionSpec(partitionSpec.get()), null); - } - return downloadSession.getRecordCount(); + return tunnel; } public Odps getClient() { @@ -139,6 +143,20 @@ public boolean tableExist(SessionContext ctx, String dbName, String tblName) { } } + public List listPartitionNames(String dbName, String tbl) { + try { + if (getClient().projects().exists(dbName)) { + return getClient().tables().get(tbl).getPartitions().stream() + .map(p -> p.getPartitionSpec().toString(false, true)) + .collect(Collectors.toList()); + } else { + throw new OdpsException("Max compute project: " + dbName + " not exists."); + } + } catch (OdpsException e) { + throw new RuntimeException(e); + } + } + @Override public List listTableNames(SessionContext ctx, String dbName) { makeSureInitialized(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java index 7f7be8f51b60ff..a7ca42dbf7723c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java @@ -364,6 +364,8 @@ public void createScanRangeLocations() throws UserException { PaimonScanNode.setPaimonParams(rangeDesc, (PaimonSplit) fileSplit); } else if (fileSplit instanceof HudiSplit) { HudiScanNode.setHudiParams(rangeDesc, (HudiSplit) fileSplit); + } else if (fileSplit instanceof MaxComputeSplit) { + MaxComputeScanNode.setScanParams(rangeDesc, (MaxComputeSplit) fileSplit); } curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java index d7f8d599a61555..800f01df71f9e1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java @@ -18,21 +18,29 @@ package org.apache.doris.planner.external; import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.external.MaxComputeExternalTable; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.datasource.MaxComputeExternalCatalog; +import org.apache.doris.planner.ListPartitionPrunerV2; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileRangeDesc; import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TMaxComputeFileDesc; +import org.apache.doris.thrift.TTableFormatFileDesc; +import com.aliyun.odps.Table; import com.aliyun.odps.tunnel.TunnelException; import org.apache.hadoop.fs.Path; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -56,6 +64,17 @@ public MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeNa catalog = (MaxComputeExternalCatalog) table.getCatalog(); } + public static void setScanParams(TFileRangeDesc rangeDesc, MaxComputeSplit maxComputeSplit) { + TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc(); + tableFormatFileDesc.setTableFormatType(TableFormatType.MAX_COMPUTE.value()); + TMaxComputeFileDesc fileDesc = new TMaxComputeFileDesc(); + if (maxComputeSplit.getPartitionSpec().isPresent()) { + fileDesc.setPartitionSpec(maxComputeSplit.getPartitionSpec().get()); + } + tableFormatFileDesc.setMaxComputeParams(fileDesc); + rangeDesc.setTableFormatParams(tableFormatFileDesc); + } + @Override protected TFileType getLocationType() throws UserException { return getLocationType(null); @@ -89,38 +108,23 @@ protected Map getLocationProperties() throws UserException { @Override protected List getSplits() throws UserException { List result = new ArrayList<>(); - // String splitPath = catalog.getTunnelUrl(); - // TODO: use single max compute scan node rather than file scan node com.aliyun.odps.Table odpsTable = table.getOdpsTable(); if (desc.getSlots().isEmpty() || odpsTable.getFileNum() <= 0) { return result; } try { - List> sliceRange = new ArrayList<>(); - Optional partitionSpec = table.getPartitionSpec(conjuncts); - long totalRows = catalog.getTotalRows(table.getDbName(), table.getName(), partitionSpec); - long fileNum = odpsTable.getFileNum(); - long start = 0; - long splitSize = (long) Math.ceil((double) totalRows / fileNum); - if (splitSize <= 0 || totalRows < MIN_SPLIT_SIZE) { - // use whole split - sliceRange.add(Pair.of(start, totalRows)); - } else { - for (int i = 0; i < fileNum; i++) { - if (start > totalRows) { - break; - } - sliceRange.add(Pair.of(start, splitSize)); - start += splitSize; + if (!table.getPartitionNames().isEmpty()) { + if (conjuncts.isEmpty()) { + throw new IllegalArgumentException("Max Compute partition table need partition predicate."); } - } - long modificationTime = odpsTable.getLastDataModifiedTime().getTime(); - if (!sliceRange.isEmpty()) { - for (int i = 0; i < sliceRange.size(); i++) { - Pair range = sliceRange.get(i); - result.add(new FileSplit(new Path("/virtual_slice_" + i), range.first, range.second, - totalRows, modificationTime, null, Collections.emptyList())); + List partitionSpecs = getPartitionSpecs(); + for (String partitionSpec : partitionSpecs) { + long totalRows = catalog.getTotalRows(table.getDbName(), table.getName(), partitionSpec); + addBatchSplits(result, odpsTable, Optional.of(partitionSpec), totalRows); } + } else { + long totalRows = catalog.getTotalRows(table.getDbName(), table.getName()); + addBatchSplits(result, odpsTable, Optional.empty(), totalRows); } } catch (TunnelException e) { throw new UserException("Max Compute tunnel SDK exception.", e); @@ -128,4 +132,66 @@ protected List getSplits() throws UserException { } return result; } + + private static void addBatchSplits(List result, Table odpsTable, + Optional partitionSpec, long totalRows) { + List> sliceRange = new ArrayList<>(); + long fileNum = odpsTable.getFileNum(); + long start = 0; + long splitSize = (long) Math.ceil((double) totalRows / fileNum); + if (splitSize <= 0 || totalRows < MIN_SPLIT_SIZE) { + // use whole split + sliceRange.add(Pair.of(start, totalRows)); + } else { + for (int i = 0; i < fileNum; i++) { + if (start > totalRows) { + break; + } + sliceRange.add(Pair.of(start, splitSize)); + start += splitSize; + } + } + long modificationTime = odpsTable.getLastDataModifiedTime().getTime(); + if (!sliceRange.isEmpty()) { + for (int i = 0; i < sliceRange.size(); i++) { + Pair range = sliceRange.get(i); + FileSplit rangeSplit = new FileSplit(new Path("/virtual_slice_" + i), + range.first, range.second, totalRows, modificationTime, null, Collections.emptyList()); + if (partitionSpec.isPresent()) { + result.add(new MaxComputeSplit(partitionSpec.get(), rangeSplit)); + } else { + result.add(new MaxComputeSplit(rangeSplit)); + } + } + } + } + + private List getPartitionSpecs() throws AnalysisException { + return getPrunedPartitionSpecs(); + } + + private List getPrunedPartitionSpecs() throws AnalysisException { + List result = new ArrayList<>(); + TablePartitionValues partitionValues = table.getPartitionValues(); + // prune partitions by expr + partitionValues.readLock().lock(); + try { + Map idToPartitionItem = partitionValues.getIdToPartitionItem(); + this.totalPartitionNum = idToPartitionItem.size(); + ListPartitionPrunerV2 pruner = new ListPartitionPrunerV2(idToPartitionItem, + table.getPartitionColumns(), columnNameToRange, + partitionValues.getUidToPartitionRange(), + partitionValues.getRangeToId(), + partitionValues.getSingleColumnRangeMap(), + false); + Collection filteredPartitionIds = pruner.prune(); + this.readPartitionNum = filteredPartitionIds.size(); + // get partitions from cache + Map partitionIdToNameMap = partitionValues.getPartitionIdToNameMap(); + filteredPartitionIds.forEach(id -> result.add(partitionIdToNameMap.get(id))); + return result; + } finally { + partitionValues.readLock().unlock(); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeSplit.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeSplit.java new file mode 100644 index 00000000000000..a14e5fe22a6375 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeSplit.java @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external; + +import java.util.Optional; + +public class MaxComputeSplit extends FileSplit { + private final Optional partitionSpec; + + public MaxComputeSplit(FileSplit rangeSplit) { + super(rangeSplit.path, rangeSplit.start, rangeSplit.length, rangeSplit.fileLength, + rangeSplit.hosts, rangeSplit.partitionValues); + this.partitionSpec = Optional.empty(); + } + + public MaxComputeSplit(String partitionSpec, FileSplit rangeSplit) { + super(rangeSplit.path, rangeSplit.start, rangeSplit.length, rangeSplit.fileLength, + rangeSplit.hosts, rangeSplit.partitionValues); + this.partitionSpec = Optional.of(partitionSpec); + } + + public Optional getPartitionSpec() { + return partitionSpec; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java index 891e138db6b17d..b5f41f97ba4d43 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java @@ -22,6 +22,7 @@ public enum TableFormatType { ICEBERG("iceberg"), HUDI("hudi"), PAIMON("paimon"), + MAX_COMPUTE("max_compute"), TRANSACTIONAL_HIVE("transactional_hive"); private final String tableFormatType; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiCachedPartitionProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiCachedPartitionProcessor.java index 37225c2339cd71..ba793ecf407772 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiCachedPartitionProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiCachedPartitionProcessor.java @@ -96,11 +96,11 @@ public TablePartitionValues getSnapshotPartitionValues(HMSExternalTable table, if (Long.parseLong(timestamp) == lastTimestamp) { return getPartitionValues(table, tableMetaClient); } - List partitionNames = getPartitionNamesBeforeOrEquals(timeline, timestamp); - List partitionColumnsList = Arrays.asList(partitionColumns.get()); + List partitionNameAndValues = getPartitionNamesBeforeOrEquals(timeline, timestamp); + List partitionNames = Arrays.asList(partitionColumns.get()); TablePartitionValues partitionValues = new TablePartitionValues(); - partitionValues.addPartitions(partitionNames, - partitionNames.stream().map(p -> parsePartitionValues(partitionColumnsList, p)) + partitionValues.addPartitions(partitionNameAndValues, + partitionNameAndValues.stream().map(p -> parsePartitionValues(partitionNames, p)) .collect(Collectors.toList()), table.getPartitionColumnTypes()); return partitionValues; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index 2aadab78852349..2d407e20a6a024 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -179,6 +179,7 @@ import org.apache.doris.common.util.Util; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.HMSExternalCatalog; +import org.apache.doris.datasource.MaxComputeExternalCatalog; import org.apache.doris.external.iceberg.IcebergTableCreationRecord; import org.apache.doris.load.DeleteHandler; import org.apache.doris.load.ExportJobState; @@ -1670,6 +1671,20 @@ private void handleShowPartitions() throws AnalysisException { List> rows = ((PartitionsProcDir) procNodeI).fetchResultByFilter(showStmt.getFilterMap(), showStmt.getOrderByPairs(), showStmt.getLimitElement()).getRows(); resultSet = new ShowResultSet(showStmt.getMetaData(), rows); + } else if (showStmt.getCatalog() instanceof MaxComputeExternalCatalog) { + MaxComputeExternalCatalog catalog = (MaxComputeExternalCatalog) (showStmt.getCatalog()); + List> rows = new ArrayList<>(); + String dbName = ClusterNamespace.getNameFromFullName(showStmt.getTableName().getDb()); + List partitionNames = catalog.listPartitionNames(dbName, + showStmt.getTableName().getTbl()); + for (String partition : partitionNames) { + List list = new ArrayList<>(); + list.add(partition); + rows.add(list); + } + // sort by partition name + rows.sort(Comparator.comparing(x -> x.get(0))); + resultSet = new ShowResultSet(showStmt.getMetaData(), rows); } else { handleShowHMSTablePartitions(showStmt); } diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index 21bea8cac59dae..d59784ebc967d8 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -333,7 +333,6 @@ struct TMCTable { 4: optional string access_key 5: optional string secret_key 6: optional string public_access - 7: optional string partition_spec } // "Union" of all table types. diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index cb656d26defed7..c30c106b4118bc 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -312,6 +312,9 @@ struct TPaimonFileDesc { 10: optional i64 last_update_time } +struct TMaxComputeFileDesc { + 1: optional string partition_spec +} struct THudiFileDesc { 1: optional string instant_time; @@ -342,6 +345,7 @@ struct TTableFormatFileDesc { 3: optional THudiFileDesc hudi_params 4: optional TPaimonFileDesc paimon_params 5: optional TTransactionalHiveDesc transactional_hive_params + 6: optional TMaxComputeFileDesc max_compute_params } enum TTextSerdeType { From 87daa756542758c421f0afe7fcdaffc72582ea1b Mon Sep 17 00:00:00 2001 From: slothever Date: Tue, 21 Nov 2023 15:44:21 +0800 Subject: [PATCH 2/7] 2 --- be/src/vec/exec/scan/vfile_scanner.cpp | 5 +- .../external/MaxComputeExternalTable.java | 5 +- .../datasource/MaxComputeExternalCatalog.java | 28 ++++++++++- .../hive/PooledHiveMetaStoreClient.java | 7 ++- .../org/apache/doris/qe/ShowExecutor.java | 48 +++++++++++++------ .../test_external_catalog_maxcompute.out | 39 +++++++++++++++ .../test_external_catalog_maxcompute.groovy | 14 ++++++ 7 files changed, 122 insertions(+), 24 deletions(-) diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 3334d56313fef8..149ec75fa4be41 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -747,9 +747,8 @@ Status VFileScanner::_get_next_reader() { case TFileFormatType::FORMAT_JNI: { if (range.__isset.table_format_params && range.table_format_params.table_format_type == "max_compute") { - const MaxComputeTableDescriptor* mc_desc = - static_cast( - _real_tuple_desc->table_desc()); + const auto* mc_desc = static_cast( + _real_tuple_desc->table_desc()); std::unique_ptr mc_reader = MaxComputeJniReader::create_unique( mc_desc, range.table_format_params.max_compute_params, _file_slot_descs, range, _state, _profile); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java index 3e887d04ac1ac6..c4f4ae76fab8a4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java @@ -47,7 +47,7 @@ import com.google.common.collect.Lists; import java.util.ArrayList; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -159,7 +159,8 @@ private void initTablePartitions() { } else { partitionSpecs = ImmutableList.of(); } - partitionNameToColumns = new HashMap<>(); + // sort partition columns to align partitionTypes and partitionName. + partitionNameToColumns = new LinkedHashMap<>(); for (com.aliyun.odps.Column partColumn : partitionColumns) { Column dorisCol = new Column(partColumn.getName(), mcTypeToDorisType(partColumn.getTypeInfo()), true, null, diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java index 2b34f939e61c60..d0f6458fa30da0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java @@ -24,6 +24,7 @@ import com.aliyun.odps.Odps; import com.aliyun.odps.OdpsException; +import com.aliyun.odps.Partition; import com.aliyun.odps.PartitionSpec; import com.aliyun.odps.account.Account; import com.aliyun.odps.account.AliyunAccount; @@ -34,6 +35,7 @@ import com.google.gson.annotations.SerializedName; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -144,10 +146,32 @@ public boolean tableExist(SessionContext ctx, String dbName, String tblName) { } public List listPartitionNames(String dbName, String tbl) { + return listPartitionNames(dbName, tbl, 0, -1); + } + + public List listPartitionNames(String dbName, String tbl, long skip, long limit) { try { if (getClient().projects().exists(dbName)) { - return getClient().tables().get(tbl).getPartitions().stream() - .map(p -> p.getPartitionSpec().toString(false, true)) + List parts; + if (limit < 0) { + parts = getClient().tables().get(tbl).getPartitions(); + } else { + skip = skip < 0 ? 0 : skip; + parts = new ArrayList<>(); + Iterator it = getClient().tables().get(tbl).getPartitionIterator(); + int count = 0; + while (it.hasNext()) { + if (count < skip) { + count++; + it.next(); + } else if (parts.size() >= limit) { + break; + } else { + parts.add(it.next()); + } + } + } + return parts.stream().map(p -> p.getPartitionSpec().toString(false, true)) .collect(Collectors.toList()); } else { throw new OdpsException("Max compute project: " + dbName + " not exists."); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PooledHiveMetaStoreClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PooledHiveMetaStoreClient.java index f3c2557a1debc1..3b2afe94cb5fa3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PooledHiveMetaStoreClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PooledHiveMetaStoreClient.java @@ -27,7 +27,6 @@ import com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.common.ValidReaderWriteIdList; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidTxnWriteIdList; @@ -126,9 +125,13 @@ public boolean tableExists(String dbName, String tblName) { } public List listPartitionNames(String dbName, String tblName) { + return listPartitionNames(dbName, tblName, MAX_LIST_PARTITION_NUM); + } + + public List listPartitionNames(String dbName, String tblName, short max) { try (CachedClient client = getClient()) { try { - return client.client.listPartitionNames(dbName, tblName, MAX_LIST_PARTITION_NUM); + return client.client.listPartitionNames(dbName, tblName, max); } catch (Exception e) { client.setThrowable(e); throw e; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index 18c3bd963b8094..14af2ed7ce64e2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -25,6 +25,7 @@ import org.apache.doris.analysis.AdminShowTabletStorageFormatStmt; import org.apache.doris.analysis.DescribeStmt; import org.apache.doris.analysis.HelpStmt; +import org.apache.doris.analysis.LimitElement; import org.apache.doris.analysis.PartitionNames; import org.apache.doris.analysis.ShowAlterStmt; import org.apache.doris.analysis.ShowAnalyzeStmt; @@ -1659,31 +1660,48 @@ private void handleShowPartitions() throws AnalysisException { showStmt.getOrderByPairs(), showStmt.getLimitElement()).getRows(); resultSet = new ShowResultSet(showStmt.getMetaData(), rows); } else if (showStmt.getCatalog() instanceof MaxComputeExternalCatalog) { - MaxComputeExternalCatalog catalog = (MaxComputeExternalCatalog) (showStmt.getCatalog()); - List> rows = new ArrayList<>(); - String dbName = ClusterNamespace.getNameFromFullName(showStmt.getTableName().getDb()); - List partitionNames = catalog.listPartitionNames(dbName, - showStmt.getTableName().getTbl()); - for (String partition : partitionNames) { - List list = new ArrayList<>(); - list.add(partition); - rows.add(list); - } - // sort by partition name - rows.sort(Comparator.comparing(x -> x.get(0))); - resultSet = new ShowResultSet(showStmt.getMetaData(), rows); + handleShowMaxComputeTablePartitions(showStmt); } else { handleShowHMSTablePartitions(showStmt); } } + private void handleShowMaxComputeTablePartitions(ShowPartitionsStmt showStmt) { + MaxComputeExternalCatalog catalog = (MaxComputeExternalCatalog) (showStmt.getCatalog()); + List> rows = new ArrayList<>(); + String dbName = ClusterNamespace.getNameFromFullName(showStmt.getTableName().getDb()); + List partitionNames; + LimitElement limit = showStmt.getLimitElement(); + if (limit != null && limit.hasLimit()) { + partitionNames = catalog.listPartitionNames(dbName, + showStmt.getTableName().getTbl(), limit.getOffset(), limit.getLimit()); + } else { + partitionNames = catalog.listPartitionNames(dbName, showStmt.getTableName().getTbl()); + } + for (String partition : partitionNames) { + List list = new ArrayList<>(); + list.add(partition); + rows.add(list); + } + // sort by partition name + rows.sort(Comparator.comparing(x -> x.get(0))); + resultSet = new ShowResultSet(showStmt.getMetaData(), rows); + } + private void handleShowHMSTablePartitions(ShowPartitionsStmt showStmt) { HMSExternalCatalog catalog = (HMSExternalCatalog) (showStmt.getCatalog()); List> rows = new ArrayList<>(); String dbName = ClusterNamespace.getNameFromFullName(showStmt.getTableName().getDb()); - List partitionNames = catalog.getClient().listPartitionNames(dbName, - showStmt.getTableName().getTbl()); + List partitionNames; + LimitElement limit = showStmt.getLimitElement(); + if (limit != null && limit.hasLimit()) { + // only short limit is valid on Hive + short limited = (short) limit.getLimit(); + partitionNames = catalog.getClient().listPartitionNames(dbName, showStmt.getTableName().getTbl(), limited); + } else { + partitionNames = catalog.getClient().listPartitionNames(dbName, showStmt.getTableName().getTbl()); + } for (String partition : partitionNames) { List list = new ArrayList<>(); list.add(partition); diff --git a/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out b/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out index 6cd91cf2ee30e1..b1d36ad56a01e6 100644 --- a/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out +++ b/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out @@ -25,3 +25,42 @@ true 77 8920 182239402452 -- !replay_q6 -- 9601 qewtoll 2020-09-21 + +-- !multi_partition_q1 -- +pt=11/yy=2021/mm=12/dd=21 +pt=11/yy=2021/mm=12/dd=22 +pt=12/yy=2021/mm=12/dd=21 +pt=12/yy=2021/mm=12/dd=22 +pt=13/yy=2021/mm=12/dd=21 +pt=13/yy=2021/mm=12/dd=22 +pt=14/yy=2021/mm=12/dd=21 +pt=14/yy=2021/mm=12/dd=22 +pt=14/yy=2021/mm=12/dd=23 +pt=14/yy=2022/mm=01/dd=01 + +-- !multi_partition_q2 -- +17 2022-04-23 19:12:30 2021 12 22 +17 2022-04-23 19:12:30 2021 12 21 +16 2022-04-23 19:12:30 2021 12 22 + +-- !multi_partition_q3 -- +14 2022-04-23 19:12:30 2022 01 01 +14 2022-04-23 19:12:30 2022 01 02 +98 2022-04-23 19:12:30 2021 12 21 + +-- !multi_partition_q4 -- +17 + +-- !multi_partition_q5 -- +2022-04-23 19:12:30 2021 12 21 +2022-04-23 19:12:30 2021 12 21 +2022-04-23 19:12:30 2021 12 21 + +-- !multi_partition_q6 -- +17 2021 12 + +-- !multi_partition_q7 -- +15 + +-- !multi_partition_q8 -- +11 diff --git a/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy b/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy index 23d0c0b252de32..6e72bf63a2f856 100644 --- a/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy +++ b/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy @@ -73,5 +73,19 @@ suite("test_external_catalog_maxcompute", "p2,external,maxcompute,external_remot sql """ switch `${mc_catalog_name}`; """ sql """ use `${mc_db}`; """ qt_replay_q6 """ select * from mc_parts where dt = '2020-09-21' and mc_bigint > 6223 """ + + // test multi partitions prune + sql """ refresh catalog ${mc_catalog_name} """ + sql """ switch `${mc_catalog_name}`; """ + sql """ use `${mc_db}`; """ + qt_multi_partition_q1 """ show partitions from multi_partitions limit 10; """ + qt_multi_partition_q2 """ select pt, create_time, yy, mm, dd from multi_partitions where pt>-1 and yy > '' and mm > '' and dd >'' order by pt desc limit 3; """ + qt_multi_partition_q3 """ select sum(pt), create_time, yy, mm, dd from multi_partitions where yy > '' and mm > '' and dd >'' group by create_time, yy, mm, dd order by dd limit 3; """ + qt_multi_partition_q4 """ select count(*) from multi_partitions where pt>-1 and yy > '' and mm > '' and dd <= '30'; """ + qt_multi_partition_q5 """ select create_time, yy, mm, dd from multi_partitions where yy = '2021' and mm='12' and dd='21' order by pt limit 3; """ + qt_multi_partition_q6 """ select max(pt), yy, mm from multi_partitions where yy = '2021' and mm='12' group by yy, mm order by yy, mm; """ + qt_multi_partition_q7 """ select count(*) from multi_partitions where yy < '2022'; """ + qt_multi_partition_q8 """ select count(*) from multi_partitions where pt>=14; """ + } } From 50122c88f370f67bf1f94dd9bfccad6e6ef65534 Mon Sep 17 00:00:00 2001 From: slothever Date: Wed, 22 Nov 2023 10:37:42 +0800 Subject: [PATCH 3/7] 3 --- .../doris/datasource/hive/PooledHiveMetaStoreClient.java | 6 ++++-- .../src/main/java/org/apache/doris/qe/ShowExecutor.java | 6 +++--- .../maxcompute/test_external_catalog_maxcompute.out | 7 ------- .../maxcompute/test_external_catalog_maxcompute.groovy | 2 +- 4 files changed, 8 insertions(+), 13 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PooledHiveMetaStoreClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PooledHiveMetaStoreClient.java index 3b2afe94cb5fa3..c699be330a101a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PooledHiveMetaStoreClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PooledHiveMetaStoreClient.java @@ -128,10 +128,12 @@ public List listPartitionNames(String dbName, String tblName) { return listPartitionNames(dbName, tblName, MAX_LIST_PARTITION_NUM); } - public List listPartitionNames(String dbName, String tblName, short max) { + public List listPartitionNames(String dbName, String tblName, long max) { + // list all parts when the limit is greater than the short maximum + short limited = max <= Short.MAX_VALUE ? (short) max : MAX_LIST_PARTITION_NUM; try (CachedClient client = getClient()) { try { - return client.client.listPartitionNames(dbName, tblName, max); + return client.client.listPartitionNames(dbName, tblName, limited); } catch (Exception e) { client.setThrowable(e); throw e; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index 14af2ed7ce64e2..cf2039f3fcac6c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -1696,9 +1696,9 @@ private void handleShowHMSTablePartitions(ShowPartitionsStmt showStmt) { List partitionNames; LimitElement limit = showStmt.getLimitElement(); if (limit != null && limit.hasLimit()) { - // only short limit is valid on Hive - short limited = (short) limit.getLimit(); - partitionNames = catalog.getClient().listPartitionNames(dbName, showStmt.getTableName().getTbl(), limited); + // only limit is valid on Hive + partitionNames = catalog.getClient() + .listPartitionNames(dbName, showStmt.getTableName().getTbl(), limit.getLimit()); } else { partitionNames = catalog.getClient().listPartitionNames(dbName, showStmt.getTableName().getTbl()); } diff --git a/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out b/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out index b1d36ad56a01e6..8fff6e13f23ca4 100644 --- a/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out +++ b/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out @@ -27,16 +27,9 @@ true 77 8920 182239402452 9601 qewtoll 2020-09-21 -- !multi_partition_q1 -- -pt=11/yy=2021/mm=12/dd=21 -pt=11/yy=2021/mm=12/dd=22 -pt=12/yy=2021/mm=12/dd=21 -pt=12/yy=2021/mm=12/dd=22 -pt=13/yy=2021/mm=12/dd=21 pt=13/yy=2021/mm=12/dd=22 pt=14/yy=2021/mm=12/dd=21 pt=14/yy=2021/mm=12/dd=22 -pt=14/yy=2021/mm=12/dd=23 -pt=14/yy=2022/mm=01/dd=01 -- !multi_partition_q2 -- 17 2022-04-23 19:12:30 2021 12 22 diff --git a/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy b/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy index 6e72bf63a2f856..183309cb76bb59 100644 --- a/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy +++ b/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy @@ -78,7 +78,7 @@ suite("test_external_catalog_maxcompute", "p2,external,maxcompute,external_remot sql """ refresh catalog ${mc_catalog_name} """ sql """ switch `${mc_catalog_name}`; """ sql """ use `${mc_db}`; """ - qt_multi_partition_q1 """ show partitions from multi_partitions limit 10; """ + qt_multi_partition_q1 """ show partitions from multi_partitions limit 5,3; """ qt_multi_partition_q2 """ select pt, create_time, yy, mm, dd from multi_partitions where pt>-1 and yy > '' and mm > '' and dd >'' order by pt desc limit 3; """ qt_multi_partition_q3 """ select sum(pt), create_time, yy, mm, dd from multi_partitions where yy > '' and mm > '' and dd >'' group by create_time, yy, mm, dd order by dd limit 3; """ qt_multi_partition_q4 """ select count(*) from multi_partitions where pt>-1 and yy > '' and mm > '' and dd <= '30'; """ From cb08227cc7f375e43d079f1080be64d9365e9b30 Mon Sep 17 00:00:00 2001 From: slothever Date: Wed, 22 Nov 2023 15:27:03 +0800 Subject: [PATCH 4/7] optimize cache --- .../maxcompute/MaxComputeJniScanner.java | 26 +++++- .../external/MaxComputeExternalTable.java | 63 ++++++------- .../datasource/ExternalMetaCacheMgr.java | 10 +++ .../doris/datasource/MaxComputeCacheKey.java | 65 ++++++++++++++ .../datasource/MaxComputeExternalCatalog.java | 23 ++--- .../datasource/MaxComputeMetadataCache.java | 90 +++++++++++++++++++ .../MaxComputeMetadataCacheMgr.java | 64 +++++++++++++ .../planner/external/MaxComputeScanNode.java | 27 +++--- .../external/TablePartitionValues.java | 9 +- .../test_external_catalog_maxcompute.out | 36 +++++--- .../test_external_catalog_maxcompute.groovy | 7 +- 11 files changed, 337 insertions(+), 83 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeCacheKey.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeMetadataCache.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeMetadataCacheMgr.java diff --git a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java index 35a9bcc24745ef..502110e9b0245d 100644 --- a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java +++ b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java @@ -61,11 +61,11 @@ public class MaxComputeJniScanner extends JniScanner { private static final String START_OFFSET = "start_offset"; private static final String SPLIT_SIZE = "split_size"; private static final String PUBLIC_ACCESS = "public_access"; - private final RootAllocator arrowAllocator = new RootAllocator(Integer.MAX_VALUE); private final Map tableScans = new ConcurrentHashMap<>(); private final String region; private final String project; private final String table; + private RootAllocator arrowAllocator; private PartitionSpec partitionSpec; private Set partitionColumns; private MaxComputeTableScan curTableScan; @@ -173,6 +173,11 @@ public void open() throws IOException { .collect(Collectors.toSet()); List maxComputeColumns = new ArrayList<>(readColumns); maxComputeColumns.removeIf(e -> partitionColumns.contains(e.getName())); + if (maxComputeColumns.isEmpty() && !partitionColumns.isEmpty()) { + // query columns required non-null, when query partition table + maxComputeColumns.add(session.getSchema().getColumn(0)); + } + arrowAllocator = new RootAllocator(Integer.MAX_VALUE); curReader = session.openArrowRecordReader(start, totalRows, maxComputeColumns, arrowAllocator); remainBatchRows = totalRows; } catch (TunnelException e) { @@ -256,7 +261,8 @@ public void close() throws IOException { startOffset = -1; splitSize = -1; if (curReader != null) { - arrowAllocator.releaseBytes(arrowAllocator.getAllocatedMemory()); + arrowAllocator.close(); + arrowAllocator = null; curReader.close(); curReader = null; } @@ -281,15 +287,25 @@ protected int getNext() throws IOException { private int readVectors(int expectedRows) throws IOException { VectorSchemaRoot batch; int curReadRows = 0; - while (curReadRows < expectedRows && (batch = curReader.read()) != null) { + while (curReadRows < expectedRows) { + batch = curReader.read(); + if (batch == null) { + break; + } try { List fieldVectors = batch.getFieldVectors(); int batchRows = 0; for (FieldVector column : fieldVectors) { + Integer readColumnId = readColumnsToId.get(column.getName()); + if (readColumnId == null) { + // use for partition if no column need to read. + batchRows = column.getValueCount(); + continue; + } columnValue.reset(column); batchRows = column.getValueCount(); for (int j = 0; j < batchRows; j++) { - appendData(readColumnsToId.get(column.getName()), columnValue); + appendData(readColumnId, columnValue); } } if (partitionSpec != null) { @@ -305,6 +321,8 @@ private int readVectors(int expectedRows) throws IOException { } } curReadRows += batchRows; + } catch (Exception e) { + throw new RuntimeException("Fail to read arrow data, reason: " + e.getMessage(), e); } finally { batch.close(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java index c4f4ae76fab8a4..0816d550c69f59 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java @@ -19,21 +19,23 @@ import org.apache.doris.catalog.ArrayType; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MapType; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.StructField; import org.apache.doris.catalog.StructType; import org.apache.doris.catalog.Type; -import org.apache.doris.common.Config; +import org.apache.doris.datasource.MaxComputeCacheKey; import org.apache.doris.datasource.MaxComputeExternalCatalog; +import org.apache.doris.datasource.MaxComputeMetadataCache; import org.apache.doris.planner.external.TablePartitionValues; -import org.apache.doris.planner.external.TablePartitionValues.TablePartitionKey; import org.apache.doris.thrift.TMCTable; import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; import com.aliyun.odps.OdpsType; import com.aliyun.odps.Table; +import com.aliyun.odps.tunnel.TunnelException; import com.aliyun.odps.type.ArrayTypeInfo; import com.aliyun.odps.type.CharTypeInfo; import com.aliyun.odps.type.DecimalTypeInfo; @@ -41,8 +43,6 @@ import com.aliyun.odps.type.StructTypeInfo; import com.aliyun.odps.type.TypeInfo; import com.aliyun.odps.type.VarcharTypeInfo; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -51,8 +51,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** @@ -60,31 +58,37 @@ */ public class MaxComputeExternalTable extends ExternalTable { - private static final Cache partitionValuesCache; + private final MaxComputeExternalCatalog mcCatalog; private Table odpsTable; private List partitionSpecs; private Map partitionNameToColumns; private List partitionTypes; - static { - partitionValuesCache = CacheBuilder.newBuilder().maximumSize(Config.max_hive_partition_cache_num) - .expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES) - .build(); - } - public MaxComputeExternalTable(long id, String name, String dbName, MaxComputeExternalCatalog catalog) { super(id, name, catalog, dbName, TableType.MAX_COMPUTE_EXTERNAL_TABLE); + mcCatalog = catalog; } @Override protected synchronized void makeSureInitialized() { super.makeSureInitialized(); if (!objectCreated) { - odpsTable = ((MaxComputeExternalCatalog) catalog).getClient().tables().get(name); + odpsTable = mcCatalog.getClient().tables().get(name); + initTablePartitions(); objectCreated = true; } } + public long getTotalRows() throws TunnelException { + // use for non-partitioned table + makeSureInitialized(); + MaxComputeMetadataCache metadataCache = Env.getCurrentEnv().getExtMetaCacheMgr() + .getMaxComputeMetadataCache(catalog.getId()); + return metadataCache.getCachedRowCount(dbName, name, null, () -> mcCatalog.getTableTunnel() + .getDownloadSession(dbName, name, null) + .getRecordCount()); + } + @Override public Set getPartitionNames() { makeSureInitialized(); @@ -101,21 +105,19 @@ public TablePartitionValues getPartitionValues() { // Make sure to call it after initSchema() completes String projectName = odpsTable.getProject(); String tableName = odpsTable.getName(); - TablePartitionKey tablePartitionKey = new TablePartitionKey(projectName, tableName, partitionTypes); - try { - return partitionValuesCache.get(tablePartitionKey, () -> { - TablePartitionValues partitionValues = new TablePartitionValues(); - partitionValues.addPartitions(partitionSpecs, - partitionSpecs.stream() - .map(p -> parsePartitionValues(new ArrayList<>(getPartitionNames()), p)) - .collect(Collectors.toList()), - partitionTypes); - return partitionValues; - }); - } catch (ExecutionException e) { - throw new RuntimeException("Fail to load partition values for table:" - + " '" + projectName + "." + tableName + "'"); - } + MaxComputeMetadataCache metadataCache = Env.getCurrentEnv().getExtMetaCacheMgr() + .getMaxComputeMetadataCache(catalog.getId()); + return metadataCache.getCachedPartitionValues( + new MaxComputeCacheKey(projectName, tableName), + () -> { + TablePartitionValues partitionValues = new TablePartitionValues(); + partitionValues.addPartitions(partitionSpecs, + partitionSpecs.stream() + .map(p -> parsePartitionValues(new ArrayList<>(getPartitionNames()), p)) + .collect(Collectors.toList()), + partitionTypes); + return partitionValues; + }); } private static List parsePartitionValues(List partitionColumns, String partitionPath) { @@ -145,7 +147,6 @@ public List initSchema() { result.add(new Column(field.getName(), mcTypeToDorisType(field.getTypeInfo()), true, null, true, field.getComment(), true, -1)); } - initTablePartitions(); result.addAll(partitionNameToColumns.values()); return result; } @@ -260,7 +261,6 @@ private Type mcTypeToDorisType(TypeInfo typeInfo) { public TTableDescriptor toThrift() { List schema = getFullSchema(); TMCTable tMcTable = new TMCTable(); - MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog) catalog; tMcTable.setRegion(mcCatalog.getRegion()); tMcTable.setAccessKey(mcCatalog.getAccessKey()); tMcTable.setSecretKey(mcCatalog.getSecretKey()); @@ -275,6 +275,7 @@ public TTableDescriptor toThrift() { } public Table getOdpsTable() { + makeSureInitialized(); return odpsTable; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java index 03a46c625e892c..ef62f498695a66 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java @@ -54,6 +54,7 @@ public class ExternalMetaCacheMgr { // all catalogs could share the same fsCache. private FileSystemCache fsCache; private final IcebergMetadataCacheMgr icebergMetadataCacheMgr; + private final MaxComputeMetadataCacheMgr maxComputeMetadataCacheMgr; public ExternalMetaCacheMgr() { executor = ThreadPoolManager.newDaemonFixedThreadPool( @@ -63,6 +64,7 @@ public ExternalMetaCacheMgr() { hudiPartitionMgr = HudiPartitionMgr.get(executor); fsCache = new FileSystemCache(executor); icebergMetadataCacheMgr = new IcebergMetadataCacheMgr(); + maxComputeMetadataCacheMgr = new MaxComputeMetadataCacheMgr(); } public HiveMetaStoreCache getMetaStoreCache(HMSExternalCatalog catalog) { @@ -99,6 +101,10 @@ public IcebergMetadataCache getIcebergMetadataCache() { return icebergMetadataCacheMgr.getIcebergMetadataCache(); } + public MaxComputeMetadataCache getMaxComputeMetadataCache(long catalogId) { + return maxComputeMetadataCacheMgr.getMaxComputeMetadataCache(catalogId); + } + public FileSystemCache getFsCache() { return fsCache; } @@ -112,6 +118,7 @@ public void removeCache(long catalogId) { } hudiPartitionMgr.removePartitionProcessor(catalogId); icebergMetadataCacheMgr.removeCache(catalogId); + maxComputeMetadataCacheMgr.removeCache(catalogId); } public void invalidateTableCache(long catalogId, String dbName, String tblName) { @@ -126,6 +133,7 @@ public void invalidateTableCache(long catalogId, String dbName, String tblName) } hudiPartitionMgr.cleanTablePartitions(catalogId, dbName, tblName); icebergMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName); + maxComputeMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName); LOG.debug("invalid table cache for {}.{} in catalog {}", dbName, tblName, catalogId); } @@ -141,6 +149,7 @@ public void invalidateDbCache(long catalogId, String dbName) { } hudiPartitionMgr.cleanDatabasePartitions(catalogId, dbName); icebergMetadataCacheMgr.invalidateDbCache(catalogId, dbName); + maxComputeMetadataCacheMgr.invalidateDbCache(catalogId, dbName); LOG.debug("invalid db cache for {} in catalog {}", dbName, catalogId); } @@ -155,6 +164,7 @@ public void invalidateCatalogCache(long catalogId) { } hudiPartitionMgr.cleanPartitionProcess(catalogId); icebergMetadataCacheMgr.invalidateCatalogCache(catalogId); + maxComputeMetadataCacheMgr.invalidateCatalogCache(catalogId); LOG.debug("invalid catalog cache for {}", catalogId); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeCacheKey.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeCacheKey.java new file mode 100644 index 00000000000000..441c2e84474aa4 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeCacheKey.java @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import lombok.Data; + +import java.util.Objects; + +@Data +public class MaxComputeCacheKey { + private final String dbName; + private final String tblName; + private String partitionSpec; // optional + + public MaxComputeCacheKey(String dbName, String tblName) { + this(dbName, tblName, null); + } + + public MaxComputeCacheKey(String dbName, String tblName, String partitionSpec) { + this.dbName = dbName; + this.tblName = tblName; + this.partitionSpec = partitionSpec; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof MaxComputeCacheKey)) { + return false; + } + boolean partitionEquals = true; + if (partitionSpec != null) { + partitionEquals = partitionSpec.equals(((MaxComputeCacheKey) obj).partitionSpec); + } + return partitionEquals && dbName.equals(((MaxComputeCacheKey) obj).dbName) + && tblName.equals(((MaxComputeCacheKey) obj).tblName); + } + + @Override + public int hashCode() { + return Objects.hash(dbName, tblName); + } + + @Override + public String toString() { + return "TablePartitionKey{" + "dbName='" + dbName + '\'' + ", tblName='" + tblName + '\'' + '}'; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java index d0f6458fa30da0..b361d0c8144fa2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java @@ -25,11 +25,9 @@ import com.aliyun.odps.Odps; import com.aliyun.odps.OdpsException; import com.aliyun.odps.Partition; -import com.aliyun.odps.PartitionSpec; import com.aliyun.odps.account.Account; import com.aliyun.odps.account.AliyunAccount; import com.aliyun.odps.tunnel.TableTunnel; -import com.aliyun.odps.tunnel.TunnelException; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.gson.annotations.SerializedName; @@ -42,6 +40,7 @@ public class MaxComputeExternalCatalog extends ExternalCatalog { private Odps odps; + private TableTunnel tunnel; @SerializedName(value = "region") private String region; @SerializedName(value = "accessKey") @@ -95,26 +94,16 @@ protected void initLocalObjectsImpl() { } odps.setEndpoint(odpsUrl); odps.setDefaultProject(defaultProject); - } - - public long getTotalRows(String project, String table) throws TunnelException { - return getTableTunnel().getDownloadSession(project, table, null).getRecordCount(); - } - - public long getTotalRows(String project, String table, String partitionSpec) throws TunnelException { - return getTableTunnel() - .getDownloadSession(project, table, new PartitionSpec(partitionSpec), null) - .getRecordCount(); - } - - private TableTunnel getTableTunnel() { - makeSureInitialized(); - TableTunnel tunnel = new TableTunnel(odps); + tunnel = new TableTunnel(odps); String tunnelUrl = tunnelUrlTemplate.replace("{}", region); if (enablePublicAccess) { tunnelUrl = tunnelUrl.replace("-inc", ""); } tunnel.setEndpoint(tunnelUrl); + } + + public TableTunnel getTableTunnel() { + makeSureInitialized(); return tunnel; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeMetadataCache.java new file mode 100644 index 00000000000000..98b835813d9d8d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeMetadataCache.java @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.apache.doris.common.Config; +import org.apache.doris.planner.external.TablePartitionValues; + +import com.aliyun.odps.tunnel.TunnelException; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class MaxComputeMetadataCache { + private final Cache partitionValuesCache; + private final Cache tableRowCountCache; + + public MaxComputeMetadataCache() { + partitionValuesCache = CacheBuilder.newBuilder().maximumSize(Config.max_hive_partition_cache_num) + .expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES) + .build(); + tableRowCountCache = CacheBuilder.newBuilder().maximumSize(10000) + .expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES) + .build(); + } + + public Long getCachedRowCount(String dbName, String tblName, String partitionSpec, + Callable loader) throws TunnelException { + try { + MaxComputeCacheKey tablePartitionKey = new MaxComputeCacheKey(dbName, tblName, partitionSpec); + return tableRowCountCache.get(tablePartitionKey, loader); + } catch (ExecutionException e) { + throw new TunnelException(e.getMessage(), e); + } + } + + public TablePartitionValues getCachedPartitionValues(MaxComputeCacheKey tablePartitionKey, + Callable loader) { + try { + return partitionValuesCache.get(tablePartitionKey, loader); + } catch (ExecutionException e) { + throw new RuntimeException("Fail to load partition values for table:" + + " '" + tablePartitionKey.getDbName() + "." + tablePartitionKey.getTblName() + "'"); + } + } + + public void cleanUp() { + partitionValuesCache.invalidateAll(); + tableRowCountCache.invalidateAll(); + } + + public void cleanDatabaseCache(String dbName) { + List removeCacheList = partitionValuesCache.asMap().keySet() + .stream() + .filter(k -> k.getDbName().equalsIgnoreCase(dbName)) + .collect(Collectors.toList()); + partitionValuesCache.invalidateAll(removeCacheList); + + List removeCacheRowCountList = tableRowCountCache.asMap().keySet() + .stream() + .filter(k -> k.getDbName().equalsIgnoreCase(dbName)) + .collect(Collectors.toList()); + tableRowCountCache.invalidateAll(removeCacheRowCountList); + } + + public void cleanTableCache(String dbName, String tblName) { + MaxComputeCacheKey cacheKey = new MaxComputeCacheKey(dbName, tblName); + partitionValuesCache.invalidate(cacheKey); + tableRowCountCache.invalidate(cacheKey); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeMetadataCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeMetadataCacheMgr.java new file mode 100644 index 00000000000000..72449b61949cd2 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeMetadataCacheMgr.java @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import com.google.common.collect.Maps; + +import java.util.Map; + +public class MaxComputeMetadataCacheMgr { + + private static final Map maxComputeMetadataCaches = Maps.newConcurrentMap(); + + public MaxComputeMetadataCache getMaxComputeMetadataCache(long catalogId) { + MaxComputeMetadataCache cache = maxComputeMetadataCaches.get(catalogId); + if (cache == null) { + cache = new MaxComputeMetadataCache(); + maxComputeMetadataCaches.put(catalogId, cache); + } + return cache; + } + + public void removeCache(long catalogId) { + MaxComputeMetadataCache cache = maxComputeMetadataCaches.remove(catalogId); + if (cache != null) { + cache.cleanUp(); + } + } + + public void invalidateCatalogCache(long catalogId) { + MaxComputeMetadataCache cache = maxComputeMetadataCaches.get(catalogId); + if (cache != null) { + cache.cleanUp(); + } + } + + public void invalidateDbCache(long catalogId, String dbName) { + MaxComputeMetadataCache cache = maxComputeMetadataCaches.get(catalogId); + if (cache != null) { + cache.cleanDatabaseCache(dbName); + } + } + + public void invalidateTableCache(long catalogId, String dbName, String tblName) { + MaxComputeMetadataCache cache = maxComputeMetadataCaches.get(catalogId); + if (cache != null) { + cache.cleanTableCache(dbName, tblName); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java index 800f01df71f9e1..ae0b424ad815af 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java @@ -45,7 +45,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; public class MaxComputeScanNode extends FileQueryScanNode { @@ -119,22 +118,28 @@ protected List getSplits() throws UserException { } List partitionSpecs = getPartitionSpecs(); for (String partitionSpec : partitionSpecs) { - long totalRows = catalog.getTotalRows(table.getDbName(), table.getName(), partitionSpec); - addBatchSplits(result, odpsTable, Optional.of(partitionSpec), totalRows); + addPartitionSplits(result, odpsTable, partitionSpec); } } else { - long totalRows = catalog.getTotalRows(table.getDbName(), table.getName()); - addBatchSplits(result, odpsTable, Optional.empty(), totalRows); + addBatchSplits(result, odpsTable, table.getTotalRows()); } } catch (TunnelException e) { - throw new UserException("Max Compute tunnel SDK exception.", e); + throw new UserException("Max Compute tunnel SDK exception: " + e.getMessage(), e); } return result; } - private static void addBatchSplits(List result, Table odpsTable, - Optional partitionSpec, long totalRows) { + private static void addPartitionSplits(List result, Table odpsTable, String partitionSpec) { + long modificationTime = odpsTable.getLastDataModifiedTime().getTime(); + // use '-1' to read whole partition, avoid expending too much time on calling table.getTotalRows() + Pair range = Pair.of(0L, -1L); + FileSplit rangeSplit = new FileSplit(new Path("/virtual_slice_part"), + range.first, range.second, -1, modificationTime, null, Collections.emptyList()); + result.add(new MaxComputeSplit(partitionSpec, rangeSplit)); + } + + private static void addBatchSplits(List result, Table odpsTable, long totalRows) { List> sliceRange = new ArrayList<>(); long fileNum = odpsTable.getFileNum(); long start = 0; @@ -157,11 +162,7 @@ private static void addBatchSplits(List result, Table odpsTable, Pair range = sliceRange.get(i); FileSplit rangeSplit = new FileSplit(new Path("/virtual_slice_" + i), range.first, range.second, totalRows, modificationTime, null, Collections.emptyList()); - if (partitionSpec.isPresent()) { - result.add(new MaxComputeSplit(partitionSpec.get(), rangeSplit)); - } else { - result.add(new MaxComputeSplit(rangeSplit)); - } + result.add(new MaxComputeSplit(rangeSplit)); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TablePartitionValues.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TablePartitionValues.java index a207f5f082a180..acd44a50900a78 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TablePartitionValues.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TablePartitionValues.java @@ -218,11 +218,16 @@ private List getHivePartitionValues(String partitionName) { @Data public static class TablePartitionKey { - private String dbName; - private String tblName; + private final String dbName; + private final String tblName; // not in key private List types; + public TablePartitionKey(String dbName, String tblName) { + this.dbName = dbName; + this.tblName = tblName; + } + public TablePartitionKey(String dbName, String tblName, List types) { this.dbName = dbName; this.tblName = tblName; diff --git a/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out b/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out index 8fff6e13f23ca4..e75e12c137b7a2 100644 --- a/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out +++ b/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out @@ -20,8 +20,9 @@ true 77 8920 182239402452 9601 qewtoll 2020-09-21 -- !q7 -- -6223 maxam 2020-09-21 -9601 qewtoll 2020-09-21 +1633 siwtow 2021-08-21 +1633 siwtow 20210821 +1633 siwtow 20210921 -- !replay_q6 -- 9601 qewtoll 2020-09-21 @@ -32,28 +33,37 @@ pt=14/yy=2021/mm=12/dd=21 pt=14/yy=2021/mm=12/dd=22 -- !multi_partition_q2 -- -17 2022-04-23 19:12:30 2021 12 22 -17 2022-04-23 19:12:30 2021 12 21 -16 2022-04-23 19:12:30 2021 12 22 +17 2022-04-23T11:12:30 2021 12 22 +17 2022-04-23T11:12:30 2021 12 21 +16 2022-04-23T11:12:30 2021 12 22 -- !multi_partition_q3 -- -14 2022-04-23 19:12:30 2022 01 01 -14 2022-04-23 19:12:30 2022 01 02 -98 2022-04-23 19:12:30 2021 12 21 +14 2022-04-23T11:12:30 2022 01 01 +14 2022-04-23T11:12:30 2022 01 02 +98 2022-04-23T11:12:30 2021 12 21 -- !multi_partition_q4 -- -17 +22 -- !multi_partition_q5 -- -2022-04-23 19:12:30 2021 12 21 -2022-04-23 19:12:30 2021 12 21 -2022-04-23 19:12:30 2021 12 21 +2022-04-23T11:12:30 2021 12 21 +2022-04-23T11:12:30 2021 12 21 +2022-04-23T11:12:30 2021 12 21 -- !multi_partition_q6 -- 17 2021 12 -- !multi_partition_q7 -- -15 +20 -- !multi_partition_q8 -- 11 + +-- !multi_partition_q9 -- +lweu 8920 true 2023-11-23T12:03:54.952 0.123 2022-04-23 2022-04-23T11:12:30 12 2021 12 22 +wert 8920 true 2023-11-23T12:05:01.693 0.123 2022-04-23 2022-04-23T11:12:30 12 2021 12 22 + +-- !multi_partition_q10 -- +12 2021 12 21 +12 2021 12 22 +12 2021 12 22 diff --git a/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy b/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy index 183309cb76bb59..c016f8b91f2e97 100644 --- a/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy +++ b/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy @@ -49,7 +49,7 @@ suite("test_external_catalog_maxcompute", "p2,external,maxcompute,external_remot qt_q4 """ select * from mc_parts where dt = '2020-09-21' """ qt_q5 """ select * from mc_parts where dt = '2021-08-21' """ qt_q6 """ select * from mc_parts where dt = '2020-09-21' and mc_bigint > 6223 """ - qt_q7 """ select * from mc_parts where dt = '2020-09-21' or mc_bigint > 0 """ + qt_q7 """ select * from mc_parts where dt = '2020-09-21' or (mc_bigint > 0 and dt > '2020-09-20') order by mc_bigint, dt limit 3; """ } sql """ switch `${mc_catalog_name}`; """ @@ -79,13 +79,14 @@ suite("test_external_catalog_maxcompute", "p2,external,maxcompute,external_remot sql """ switch `${mc_catalog_name}`; """ sql """ use `${mc_db}`; """ qt_multi_partition_q1 """ show partitions from multi_partitions limit 5,3; """ - qt_multi_partition_q2 """ select pt, create_time, yy, mm, dd from multi_partitions where pt>-1 and yy > '' and mm > '' and dd >'' order by pt desc limit 3; """ + qt_multi_partition_q2 """ select pt, create_time, yy, mm, dd from multi_partitions where pt>-1 and yy > '' and mm > '' and dd >'' order by pt desc, dd desc limit 3; """ qt_multi_partition_q3 """ select sum(pt), create_time, yy, mm, dd from multi_partitions where yy > '' and mm > '' and dd >'' group by create_time, yy, mm, dd order by dd limit 3; """ qt_multi_partition_q4 """ select count(*) from multi_partitions where pt>-1 and yy > '' and mm > '' and dd <= '30'; """ qt_multi_partition_q5 """ select create_time, yy, mm, dd from multi_partitions where yy = '2021' and mm='12' and dd='21' order by pt limit 3; """ qt_multi_partition_q6 """ select max(pt), yy, mm from multi_partitions where yy = '2021' and mm='12' group by yy, mm order by yy, mm; """ qt_multi_partition_q7 """ select count(*) from multi_partitions where yy < '2022'; """ qt_multi_partition_q8 """ select count(*) from multi_partitions where pt>=14; """ - + qt_multi_partition_q9 """ select city,mnt,gender,finished_time,order_rate,cut_date,create_time,pt, yy, mm, dd from multi_partitions where pt >= 12 and pt < 14 and finished_time is not null; """ + qt_multi_partition_q10 """ select pt, yy, mm, dd from multi_partitions where pt >= 12 and create_time > '2022-04-23 11:11:00' order by pt, yy, mm, dd limit 3; """ } } From 67cffa398be801d80a4436a7de236cb809499e94 Mon Sep 17 00:00:00 2001 From: slothever Date: Wed, 22 Nov 2023 15:27:03 +0800 Subject: [PATCH 5/7] optimize cache --- .../doris/catalog/external/MaxComputeExternalTable.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java index 0816d550c69f59..f4af5c4883648e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java @@ -58,7 +58,6 @@ */ public class MaxComputeExternalTable extends ExternalTable { - private final MaxComputeExternalCatalog mcCatalog; private Table odpsTable; private List partitionSpecs; private Map partitionNameToColumns; @@ -66,14 +65,13 @@ public class MaxComputeExternalTable extends ExternalTable { public MaxComputeExternalTable(long id, String name, String dbName, MaxComputeExternalCatalog catalog) { super(id, name, catalog, dbName, TableType.MAX_COMPUTE_EXTERNAL_TABLE); - mcCatalog = catalog; } @Override protected synchronized void makeSureInitialized() { super.makeSureInitialized(); if (!objectCreated) { - odpsTable = mcCatalog.getClient().tables().get(name); + odpsTable = ((MaxComputeExternalCatalog) catalog).getClient().tables().get(name); initTablePartitions(); objectCreated = true; } @@ -84,6 +82,7 @@ public long getTotalRows() throws TunnelException { makeSureInitialized(); MaxComputeMetadataCache metadataCache = Env.getCurrentEnv().getExtMetaCacheMgr() .getMaxComputeMetadataCache(catalog.getId()); + MaxComputeExternalCatalog mcCatalog = ((MaxComputeExternalCatalog) catalog); return metadataCache.getCachedRowCount(dbName, name, null, () -> mcCatalog.getTableTunnel() .getDownloadSession(dbName, name, null) .getRecordCount()); @@ -261,6 +260,7 @@ private Type mcTypeToDorisType(TypeInfo typeInfo) { public TTableDescriptor toThrift() { List schema = getFullSchema(); TMCTable tMcTable = new TMCTable(); + MaxComputeExternalCatalog mcCatalog = ((MaxComputeExternalCatalog) catalog); tMcTable.setRegion(mcCatalog.getRegion()); tMcTable.setAccessKey(mcCatalog.getAccessKey()); tMcTable.setSecretKey(mcCatalog.getSecretKey()); From 638817f0615d5798b51270262d37dba7af661a90 Mon Sep 17 00:00:00 2001 From: slothever Date: Mon, 27 Nov 2023 10:20:24 +0800 Subject: [PATCH 6/7] 4 --- .../apache/doris/maxcompute/MaxComputeJniScanner.java | 10 +++++----- .../catalog/external/MaxComputeExternalTable.java | 7 +++++++ 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java index 502110e9b0245d..2003286e1f7676 100644 --- a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java +++ b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java @@ -171,14 +171,14 @@ public void open() throws IOException { partitionColumns = session.getSchema().getPartitionColumns().stream() .map(Column::getName) .collect(Collectors.toSet()); - List maxComputeColumns = new ArrayList<>(readColumns); - maxComputeColumns.removeIf(e -> partitionColumns.contains(e.getName())); - if (maxComputeColumns.isEmpty() && !partitionColumns.isEmpty()) { + List pushDownColumns = new ArrayList<>(readColumns); + pushDownColumns.removeIf(e -> partitionColumns.contains(e.getName())); + if (pushDownColumns.isEmpty() && !partitionColumns.isEmpty()) { // query columns required non-null, when query partition table - maxComputeColumns.add(session.getSchema().getColumn(0)); + pushDownColumns.add(session.getSchema().getColumn(0)); } arrowAllocator = new RootAllocator(Integer.MAX_VALUE); - curReader = session.openArrowRecordReader(start, totalRows, maxComputeColumns, arrowAllocator); + curReader = session.openArrowRecordReader(start, totalRows, pushDownColumns, arrowAllocator); remainBatchRows = totalRows; } catch (TunnelException e) { if (retryCount > 0 && e.getErrorMsg().contains("TableModified")) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java index f4af5c4883648e..5c25cf6cce0e04 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java @@ -79,6 +79,7 @@ protected synchronized void makeSureInitialized() { public long getTotalRows() throws TunnelException { // use for non-partitioned table + // partition table will read the entire partition on FE so get total rows is unnecessary. makeSureInitialized(); MaxComputeMetadataCache metadataCache = Env.getCurrentEnv().getExtMetaCacheMgr() .getMaxComputeMetadataCache(catalog.getId()); @@ -119,6 +120,12 @@ public TablePartitionValues getPartitionValues() { }); } + /** + * parse all values from partitionPath to a single list. + * @param partitionColumns partitionColumns can contain the part1,part2,part3... + * @param partitionPath partitionPath format is like the 'part1=123/part2=abc/part3=1bc' + * @return all values of partitionPath + */ private static List parsePartitionValues(List partitionColumns, String partitionPath) { String[] partitionFragments = partitionPath.split("/"); if (partitionFragments.length != partitionColumns.size()) { From 1ff31c7c54cc9e5516eb2d75edc27176063f441a Mon Sep 17 00:00:00 2001 From: slothever Date: Wed, 29 Nov 2023 14:32:06 +0800 Subject: [PATCH 7/7] fix ut --- fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java | 1 - 1 file changed, 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index 9ed22989d89d53..430f790ecd3768 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -181,7 +181,6 @@ import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.HMSExternalCatalog; import org.apache.doris.datasource.MaxComputeExternalCatalog; -import org.apache.doris.external.iceberg.IcebergTableCreationRecord; import org.apache.doris.job.task.AbstractTask; import org.apache.doris.load.DeleteHandler; import org.apache.doris.load.ExportJobState;