diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index 72eacbb1de5401..435722e06b6687 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -22,7 +22,6 @@ import org.apache.doris.analysis.DropDbStmt; import org.apache.doris.analysis.DropTableStmt; import org.apache.doris.analysis.TableName; -import org.apache.doris.catalog.Column; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.InfoSchemaDb; @@ -386,7 +385,7 @@ public void onRefresh(boolean invalidCache) { } } - public final List getSchema(String dbName, String tblName) { + public final Optional getSchema(String dbName, String tblName) { makeSureInitialized(); Optional> db = getDb(dbName); if (db.isPresent()) { @@ -395,9 +394,7 @@ public final List getSchema(String dbName, String tblName) { return table.get().initSchemaAndUpdateTime(); } } - // return one column with unsupported type. - // not return empty to avoid some unexpected issue. - return Lists.newArrayList(Column.UNSUPPORTED_COLUMN); + return Optional.empty(); } @Override @@ -507,7 +504,7 @@ public ExternalDatabase getDbNullable(long dbId) { } if (useMetaCache.get()) { - return metaCache.getMetaObjById(dbId).get(); + return metaCache.getMetaObjById(dbId).orElse(null); } else { return idToDb.get(dbId); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java index 43c24b5ebd5d3f..6ab3421abc3dfa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java @@ -370,7 +370,7 @@ public Set getTableNamesWithLock() { public T getTableNullable(String tableName) { makeSureInitialized(); if (extCatalog.getUseMetaCache().get()) { - return metaCache.getMetaObj(tableName).get(); + return metaCache.getMetaObj(tableName).orElse(null); } else { if (!tableNameToId.containsKey(tableName)) { return null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java index 9d0ddcfad2f204..ad1c1306e343d7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java @@ -31,8 +31,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.ExecutorService; @@ -42,7 +42,7 @@ public class ExternalSchemaCache { private static final Logger LOG = LogManager.getLogger(ExternalSchemaCache.class); private final ExternalCatalog catalog; - private LoadingCache> schemaCache; + private LoadingCache> schemaCache; public ExternalSchemaCache(ExternalCatalog catalog, ExecutorService executor) { this.catalog = catalog; @@ -73,22 +73,22 @@ public Long getValue() { MetricRepo.DORIS_METRIC_REGISTER.addMetrics(schemaCacheGauge); } - private ImmutableList loadSchema(SchemaCacheKey key) { - ImmutableList schema = ImmutableList.copyOf(catalog.getSchema(key.dbName, key.tblName)); + private Optional loadSchema(SchemaCacheKey key) { + Optional schema = catalog.getSchema(key.dbName, key.tblName); if (LOG.isDebugEnabled()) { LOG.debug("load schema for {} in catalog {}", key, catalog.getName()); } return schema; } - public List getSchema(String dbName, String tblName) { + public Optional getSchemaValue(String dbName, String tblName) { SchemaCacheKey key = new SchemaCacheKey(dbName, tblName); return schemaCache.get(key); } public void addSchemaForTest(String dbName, String tblName, ImmutableList schema) { SchemaCacheKey key = new SchemaCacheKey(dbName, tblName); - schemaCache.put(key, schema); + schemaCache.put(key, Optional.of(new SchemaCacheValue(schema))); } public void invalidateTableCache(String dbName, String tblName) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java index 952b5c64cf8fd5..b394b85054aaab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java @@ -143,7 +143,8 @@ public TableType getType() { @Override public List getFullSchema() { ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog); - return cache.getSchema(dbName, name); + Optional schemaCacheValue = cache.getSchemaValue(dbName, name); + return schemaCacheValue.map(SchemaCacheValue::getSchema).orElse(null); } @Override @@ -161,7 +162,6 @@ public List getBaseSchema(boolean full) { return getFullSchema(); } - @Override public void setNewFullSchema(List newSchema) { } @@ -301,12 +301,12 @@ public Optional getColumnStatistic(String colName) { * * @return */ - public List initSchemaAndUpdateTime() { + public Optional initSchemaAndUpdateTime() { schemaUpdateTime = System.currentTimeMillis(); return initSchema(); } - public List initSchema() { + public Optional initSchema() { throw new NotImplementedException("implement in sub class"); } @@ -365,4 +365,9 @@ public List> getColumnIndexPairs(Set columns) { public List getChunkSizes() { throw new NotImplementedException("getChunkSized not implemented"); } + + protected Optional getSchemaCacheValue() { + ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog); + return cache.getSchemaValue(dbName, name); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/SchemaCacheValue.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/SchemaCacheValue.java new file mode 100644 index 00000000000000..b02b8bda840c3c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SchemaCacheValue.java @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.apache.doris.catalog.Column; + +import java.util.List; + +/** + * The cache value of ExternalSchemaCache. + * Different external table type has different schema cache value. + * For example, Hive table has HMSSchemaCacheValue, Paimon table has PaimonSchemaCacheValue. + * All objects that should be refreshed along with schema should be put in this class. + */ +public class SchemaCacheValue { + protected List schema; + + public SchemaCacheValue(List schema) { + this.schema = schema; + } + + public List getSchema() { + return schema; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/es/EsExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/es/EsExternalTable.java index 6399f89da55e1f..cfde5e794a3aa4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/es/EsExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/es/EsExternalTable.java @@ -20,11 +20,13 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.EsTable; import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.thrift.TEsTable; import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; import java.util.List; +import java.util.Optional; /** * Elasticsearch external table. @@ -69,9 +71,11 @@ public TTableDescriptor toThrift() { } @Override - public List initSchema() { + public Optional initSchema() { EsRestClient restClient = ((EsExternalCatalog) catalog).getEsRestClient(); - return EsUtil.genColumnsFromEs(restClient, name, null, ((EsExternalCatalog) catalog).enableMappingEsId()); + return Optional.of(new SchemaCacheValue( + EsUtil.genColumnsFromEs(restClient, name, null, + ((EsExternalCatalog) catalog).enableMappingEsId()))); } private EsTable toEsTable() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index e29bafd5dc7a0a..c2099a1acc865f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -29,6 +29,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.datasource.hudi.HudiUtils; import org.apache.doris.datasource.hudi.source.COWIncrementalRelation; import org.apache.doris.datasource.hudi.source.IncrementalRelation; @@ -159,7 +160,6 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI } private volatile org.apache.hadoop.hive.metastore.api.Table remoteTable = null; - private List partitionColumns; private DLAType dlaType = DLAType.UNKNOWN; @@ -296,15 +296,17 @@ public org.apache.hadoop.hive.metastore.api.Table getRemoteTable() { public List getPartitionColumnTypes() { makeSureInitialized(); - getFullSchema(); - return partitionColumns.stream().map(c -> c.getType()).collect(Collectors.toList()); + Optional schemaCacheValue = getSchemaCacheValue(); + return schemaCacheValue.map(value -> ((HMSSchemaCacheValue) value).getPartitionColTypes()) + .orElse(Collections.emptyList()); } @Override public List getPartitionColumns() { makeSureInitialized(); - getFullSchema(); - return partitionColumns; + Optional schemaCacheValue = getSchemaCacheValue(); + return schemaCacheValue.map(value -> ((HMSSchemaCacheValue) value).getPartitionColumns()) + .orElse(Collections.emptyList()); } public TableScanParams getScanParams() { @@ -532,7 +534,7 @@ public Set getPartitionNames() { } @Override - public List initSchemaAndUpdateTime() { + public Optional initSchemaAndUpdateTime() { org.apache.hadoop.hive.metastore.api.Table table = ((HMSExternalCatalog) catalog).getClient() .getTable(dbName, name); // try to use transient_lastDdlTime from hms client @@ -554,7 +556,7 @@ public long getLastDdlTime() { } @Override - public List initSchema() { + public Optional initSchema() { makeSureInitialized(); List columns; if (dlaType.equals(DLAType.ICEBERG)) { @@ -564,8 +566,8 @@ public List initSchema() { } else { columns = getHiveSchema(); } - initPartitionColumns(columns); - return columns; + List partitionColumns = initPartitionColumns(columns); + return Optional.of(new HMSSchemaCacheValue(columns, partitionColumns)); } private List getIcebergSchema() { @@ -585,18 +587,16 @@ private List getHudiSchema() { private List getHiveSchema() { HMSCachedClient client = ((HMSExternalCatalog) catalog).getClient(); - List columns; List schema = client.getSchema(dbName, name); Map colDefaultValues = client.getDefaultColumnValues(dbName, name); - List tmpSchema = Lists.newArrayListWithCapacity(schema.size()); + List columns = Lists.newArrayListWithCapacity(schema.size()); for (FieldSchema field : schema) { String fieldName = field.getName().toLowerCase(Locale.ROOT); String defaultValue = colDefaultValues.getOrDefault(fieldName, null); - tmpSchema.add(new Column(fieldName, + columns.add(new Column(fieldName, HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null, true, defaultValue, field.getComment(), true, -1)); } - columns = tmpSchema; return columns; } @@ -613,10 +613,10 @@ public long fetchRowCount() { return rowCount; } - private void initPartitionColumns(List schema) { + private List initPartitionColumns(List schema) { List partitionKeys = remoteTable.getPartitionKeys().stream().map(FieldSchema::getName) .collect(Collectors.toList()); - partitionColumns = Lists.newArrayListWithCapacity(partitionKeys.size()); + List partitionColumns = Lists.newArrayListWithCapacity(partitionKeys.size()); for (String partitionKey : partitionKeys) { // Do not use "getColumn()", which will cause dead loop for (Column column : schema) { @@ -636,6 +636,7 @@ private void initPartitionColumns(List schema) { if (LOG.isDebugEnabled()) { LOG.debug("get {} partition columns for table: {}", partitionColumns.size(), name); } + return partitionColumns; } public boolean hasColumnStatistics(String colName) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSSchemaCacheValue.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSSchemaCacheValue.java new file mode 100644 index 00000000000000..79631e90db0989 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSSchemaCacheValue.java @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hive; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Type; +import org.apache.doris.datasource.SchemaCacheValue; + +import java.util.List; +import java.util.stream.Collectors; + +public class HMSSchemaCacheValue extends SchemaCacheValue { + + private List partitionColumns; + + public HMSSchemaCacheValue(List schema, List partitionColumns) { + super(schema); + this.partitionColumns = partitionColumns; + } + + public List getPartitionColumns() { + return partitionColumns; + } + + public List getPartitionColTypes() { + return partitionColumns.stream().map(Column::getType).collect(Collectors.toList()); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java index 5266d8745dee66..de9c3814fd68b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java @@ -19,6 +19,7 @@ import org.apache.doris.catalog.Column; import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.BaseAnalysisTask; import org.apache.doris.statistics.ExternalAnalysisTask; @@ -29,6 +30,7 @@ import java.util.HashMap; import java.util.List; +import java.util.Optional; public class IcebergExternalTable extends ExternalTable { @@ -48,8 +50,8 @@ protected synchronized void makeSureInitialized() { } @Override - public List initSchema() { - return IcebergUtils.getSchema(catalog, dbName, name); + public Optional initSchema() { + return Optional.of(new SchemaCacheValue(IcebergUtils.getSchema(catalog, dbName, name))); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/infoschema/ExternalInfoSchemaTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/infoschema/ExternalInfoSchemaTable.java index 6faf965752b8c8..9d1336396128fd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/infoschema/ExternalInfoSchemaTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/infoschema/ExternalInfoSchemaTable.java @@ -18,16 +18,16 @@ package org.apache.doris.datasource.infoschema; import org.apache.doris.analysis.SchemaTableType; -import org.apache.doris.catalog.Column; import org.apache.doris.catalog.InfoSchemaDb; import org.apache.doris.catalog.SchemaTable; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.thrift.TSchemaTable; import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; -import java.util.List; +import java.util.Optional; public class ExternalInfoSchemaTable extends ExternalTable { @@ -36,10 +36,9 @@ public ExternalInfoSchemaTable(long id, String name, ExternalCatalog catalog) { } @Override - public List initSchema() { + public Optional initSchema() { makeSureInitialized(); - List columns = SchemaTable.TABLE_MAP.get(name).getFullSchema(); - return columns; + return Optional.of(new SchemaCacheValue(SchemaTable.TABLE_MAP.get(name).getFullSchema())); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/infoschema/ExternalMysqlTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/infoschema/ExternalMysqlTable.java index fc64bc053a479e..6f277a5690619b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/infoschema/ExternalMysqlTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/infoschema/ExternalMysqlTable.java @@ -18,16 +18,16 @@ package org.apache.doris.datasource.infoschema; import org.apache.doris.analysis.SchemaTableType; -import org.apache.doris.catalog.Column; import org.apache.doris.catalog.MysqlDBTable; import org.apache.doris.catalog.MysqlDb; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.thrift.TSchemaTable; import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; -import java.util.List; +import java.util.Optional; public class ExternalMysqlTable extends ExternalTable { public ExternalMysqlTable(long id, String name, ExternalCatalog catalog) { @@ -35,9 +35,9 @@ public ExternalMysqlTable(long id, String name, ExternalCatalog catalog) { } @Override - public List initSchema() { + public Optional initSchema() { makeSureInitialized(); - return MysqlDBTable.TABLE_MAP.get(name).getFullSchema(); + return Optional.of(new SchemaCacheValue(MysqlDBTable.TABLE_MAP.get(name).getFullSchema())); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java index 64fd25525e5b21..242b973b87ee27 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java @@ -20,6 +20,7 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.JdbcTable; import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.BaseAnalysisTask; import org.apache.doris.statistics.JdbcAnalysisTask; @@ -29,6 +30,7 @@ import org.apache.logging.log4j.Logger; import java.util.List; +import java.util.Optional; /** * Elasticsearch external table. @@ -71,8 +73,9 @@ public TTableDescriptor toThrift() { } @Override - public List initSchema() { - return ((JdbcExternalCatalog) catalog).getJdbcClient().getColumnsFromJdbc(dbName, name); + public Optional initSchema() { + return Optional.of(new SchemaCacheValue(((JdbcExternalCatalog) catalog).getJdbcClient() + .getColumnsFromJdbc(dbName, name))); } private JdbcTable toJdbcTable() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java index 363b7ce689dab1..297a4c0fa0950f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java @@ -26,6 +26,7 @@ import org.apache.doris.catalog.StructType; import org.apache.doris.catalog.Type; import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.datasource.TablePartitionValues; import org.apache.doris.thrift.TMCTable; import org.apache.doris.thrift.TTableDescriptor; @@ -43,11 +44,13 @@ import com.aliyun.odps.type.VarcharTypeInfo; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import java.util.ArrayList; -import java.util.LinkedHashMap; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -55,12 +58,6 @@ * MaxCompute external table. */ public class MaxComputeExternalTable extends ExternalTable { - - private Table odpsTable; - private List partitionSpecs; - private Map partitionNameToColumns; - private List partitionTypes; - public MaxComputeExternalTable(long id, String name, String dbName, MaxComputeExternalCatalog catalog) { super(id, name, catalog, dbName, TableType.MAX_COMPUTE_EXTERNAL_TABLE); } @@ -69,8 +66,6 @@ public MaxComputeExternalTable(long id, String name, String dbName, MaxComputeEx protected synchronized void makeSureInitialized() { super.makeSureInitialized(); if (!objectCreated) { - odpsTable = ((MaxComputeExternalCatalog) catalog).getClient().tables().get(name); - initTablePartitions(); objectCreated = true; } } @@ -100,26 +95,37 @@ private long loadRowCount(MaxComputeExternalCatalog catalog, MaxComputeCacheKey @Override public Set getPartitionNames() { makeSureInitialized(); - return partitionNameToColumns.keySet(); + Optional schemaCacheValue = getSchemaCacheValue(); + return schemaCacheValue.map(value -> ((MaxComputeSchemaCacheValue) value).getPartitionColNames()) + .orElse(Collections.emptySet()); } public List getPartitionColumns() { makeSureInitialized(); - return new ArrayList<>(partitionNameToColumns.values()); + Optional schemaCacheValue = getSchemaCacheValue(); + return schemaCacheValue.map(value -> ((MaxComputeSchemaCacheValue) value).getPartitionColumns()) + .orElse(Collections.emptyList()); } public TablePartitionValues getPartitionValues() { makeSureInitialized(); - // Make sure to call it after initSchema() completes + Optional schemaCacheValue = getSchemaCacheValue(); + if (!schemaCacheValue.isPresent()) { + return new TablePartitionValues(); + } + Table odpsTable = ((MaxComputeSchemaCacheValue) schemaCacheValue.get()).getOdpsTable(); String projectName = odpsTable.getProject(); String tableName = odpsTable.getName(); MaxComputeMetadataCache metadataCache = Env.getCurrentEnv().getExtMetaCacheMgr() .getMaxComputeMetadataCache(catalog.getId()); return metadataCache.getCachedPartitionValues( - new MaxComputeCacheKey(projectName, tableName), key -> loadPartitionValues(key)); + new MaxComputeCacheKey(projectName, tableName), + key -> loadPartitionValues((MaxComputeSchemaCacheValue) schemaCacheValue.get())); } - private TablePartitionValues loadPartitionValues(MaxComputeCacheKey key) { + private TablePartitionValues loadPartitionValues(MaxComputeSchemaCacheValue schemaCacheValue) { + List partitionSpecs = schemaCacheValue.getPartitionSpecs(); + List partitionTypes = schemaCacheValue.getPartitionTypes(); TablePartitionValues partitionValues = new TablePartitionValues(); partitionValues.addPartitions(partitionSpecs, partitionSpecs.stream() @@ -154,21 +160,19 @@ private static List parsePartitionValues(List partitionColumns, } @Override - public List initSchema() { + public Optional initSchema() { // this method will be called at semantic parsing. makeSureInitialized(); + Table odpsTable = ((MaxComputeExternalCatalog) catalog).getClient().tables().get(name); List columns = odpsTable.getSchema().getColumns(); - List result = Lists.newArrayListWithCapacity(columns.size()); + List schema = Lists.newArrayListWithCapacity(columns.size()); for (com.aliyun.odps.Column field : columns) { - result.add(new Column(field.getName(), mcTypeToDorisType(field.getTypeInfo()), true, null, + schema.add(new Column(field.getName(), mcTypeToDorisType(field.getTypeInfo()), true, null, true, field.getComment(), true, -1)); } - result.addAll(partitionNameToColumns.values()); - return result; - } - private void initTablePartitions() { List partitionColumns = odpsTable.getSchema().getPartitionColumns(); + List partitionSpecs; if (!partitionColumns.isEmpty()) { partitionSpecs = odpsTable.getPartitions().stream() .map(e -> e.getPartitionSpec().toString(false, true)) @@ -177,17 +181,21 @@ private void initTablePartitions() { partitionSpecs = ImmutableList.of(); } // sort partition columns to align partitionTypes and partitionName. - partitionNameToColumns = new LinkedHashMap<>(); + Map partitionNameToColumns = Maps.newHashMap(); for (com.aliyun.odps.Column partColumn : partitionColumns) { Column dorisCol = new Column(partColumn.getName(), mcTypeToDorisType(partColumn.getTypeInfo()), true, null, true, partColumn.getComment(), true, -1); partitionNameToColumns.put(dorisCol.getName(), dorisCol); } - partitionTypes = partitionNameToColumns.values() + List partitionTypes = partitionNameToColumns.values() .stream() .map(Column::getType) .collect(Collectors.toList()); + + schema.addAll(partitionNameToColumns.values()); + return Optional.of(new MaxComputeSchemaCacheValue(schema, odpsTable, partitionSpecs, partitionNameToColumns, + partitionTypes)); } private Type mcTypeToDorisType(TypeInfo typeInfo) { @@ -295,6 +303,8 @@ public TTableDescriptor toThrift() { public Table getOdpsTable() { makeSureInitialized(); - return odpsTable; + Optional schemaCacheValue = getSchemaCacheValue(); + return schemaCacheValue.map(value -> ((MaxComputeSchemaCacheValue) value).getOdpsTable()) + .orElse(null); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeSchemaCacheValue.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeSchemaCacheValue.java new file mode 100644 index 00000000000000..b8337d96120a5f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeSchemaCacheValue.java @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.maxcompute; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Type; +import org.apache.doris.datasource.SchemaCacheValue; + +import com.aliyun.odps.Table; +import com.google.common.collect.Lists; +import lombok.Getter; +import lombok.Setter; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +@Getter +@Setter +public class MaxComputeSchemaCacheValue extends SchemaCacheValue { + private Table odpsTable; + private List partitionSpecs; + private Map partitionNameToColumns; + private List partitionTypes; + + public MaxComputeSchemaCacheValue(List schema, Table odpsTable, List partitionSpecs, + Map partitionNameToColumns, List partitionTypes) { + super(schema); + this.odpsTable = odpsTable; + this.partitionSpecs = partitionSpecs; + this.partitionNameToColumns = partitionNameToColumns; + this.partitionTypes = partitionTypes; + } + + public Set getPartitionColNames() { + return partitionNameToColumns.keySet(); + } + + public List getPartitionColumns() { + return Lists.newArrayList(partitionNameToColumns.values()); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/MetaCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/MetaCache.java index 023396670d828d..da8f068dfd4bb1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/MetaCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/MetaCache.java @@ -49,14 +49,26 @@ public MetaCache(String name, RemovalListener> removalListener) { this.name = name; - CacheFactory cacheFactory = new CacheFactory( + // ATTN: + // The refreshAfterWriteSec is only used for metaObjCache, not for namesCache. + // Because namesCache need to be refreshed at interval so that user can get the latest meta list. + // But metaObjCache does not need to be refreshed at interval, because the object is actually not + // from remote datasource, it is just a local generated object to represent the meta info. + // So it only need to be expired after specified duration. + CacheFactory namesCacheFactory = new CacheFactory( expireAfterWriteSec, refreshAfterWriteSec, maxSize, true, null); - namesCache = cacheFactory.buildCache(namesCacheLoader, null, executor); - metaObjCache = cacheFactory.buildCache(metaObjCacheLoader, removalListener, executor); + CacheFactory objCacheFactory = new CacheFactory( + expireAfterWriteSec, + OptionalLong.empty(), + maxSize, + true, + null); + namesCache = namesCacheFactory.buildCache(namesCacheLoader, null, executor); + metaObjCache = objCacheFactory.buildCache(metaObjCacheLoader, removalListener, executor); } public List listNames() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index 7d870f36059c85..d9e43bdd6cfacf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -21,6 +21,7 @@ import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Type; import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.BaseAnalysisTask; import org.apache.doris.statistics.ExternalAnalysisTask; @@ -44,14 +45,13 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; public class PaimonExternalTable extends ExternalTable { private static final Logger LOG = LogManager.getLogger(PaimonExternalTable.class); - private Table originTable = null; - public PaimonExternalTable(long id, String name, String dbName, PaimonExternalCatalog catalog) { super(id, name, catalog, dbName, TableType.PAIMON_EXTERNAL_TABLE); } @@ -63,23 +63,20 @@ public String getPaimonCatalogType() { protected synchronized void makeSureInitialized() { super.makeSureInitialized(); if (!objectCreated) { - originTable = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName, name); - schemaUpdateTime = System.currentTimeMillis(); objectCreated = true; } } - public Table getOriginTable() { + public Table getPaimonTable() { makeSureInitialized(); - return originTable; + Optional schemaCacheValue = getSchemaCacheValue(); + return schemaCacheValue.map(value -> ((PaimonSchemaCacheValue) value).getPaimonTable()).orElse(null); } @Override - public List initSchema() { - //init schema need update lastUpdateTime and get latest schema - objectCreated = false; - Table table = getOriginTable(); - TableSchema schema = ((FileStoreTable) table).schema(); + public Optional initSchema() { + Table paimonTable = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName, name); + TableSchema schema = ((FileStoreTable) paimonTable).schema(); List columns = schema.fields(); List tmpSchema = Lists.newArrayListWithCapacity(columns.size()); for (DataField field : columns) { @@ -87,7 +84,7 @@ public List initSchema() { paimonTypeToDorisType(field.type()), true, null, true, field.description(), true, field.id())); } - return tmpSchema; + return Optional.of(new PaimonSchemaCacheValue(tmpSchema, paimonTable)); } private Type paimonPrimitiveTypeToDorisType(org.apache.paimon.types.DataType dataType) { @@ -180,7 +177,13 @@ public long fetchRowCount() { makeSureInitialized(); try { long rowCount = 0; - List splits = originTable.newReadBuilder().newScan().plan().splits(); + Optional schemaCacheValue = getSchemaCacheValue(); + Table paimonTable = schemaCacheValue.map(value -> ((PaimonSchemaCacheValue) value).getPaimonTable()) + .orElse(null); + if (paimonTable == null) { + return -1; + } + List splits = paimonTable.newReadBuilder().newScan().plan().splits(); for (Split split : splits) { rowCount += split.rowCount(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java new file mode 100644 index 00000000000000..aaaefe7f32db2b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.doris.catalog.Column; +import org.apache.doris.datasource.SchemaCacheValue; + +import org.apache.paimon.table.Table; + +import java.util.List; + +public class PaimonSchemaCacheValue extends SchemaCacheValue { + + private Table paimonTable; + + public PaimonSchemaCacheValue(List schema, Table paimonTable) { + super(schema); + this.paimonTable = paimonTable; + } + + public Table getPaimonTable() { + return paimonTable; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java index a405aa92ea65bc..9ac44537e8aeec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java @@ -39,7 +39,7 @@ public class PaimonSource { public PaimonSource(PaimonExternalTable table, TupleDescriptor desc, Map columnNameToRange) { this.paimonExtTable = table; - this.originTable = paimonExtTable.getOriginTable(); + this.originTable = paimonExtTable.getPaimonTable(); this.desc = desc; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/test/TestExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/test/TestExternalTable.java index 99f0238c170641..6da0981b97ef54 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/test/TestExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/test/TestExternalTable.java @@ -17,15 +17,15 @@ package org.apache.doris.datasource.test; -import org.apache.doris.catalog.Column; import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.List; +import java.util.Optional; /** * TestExternalTable is a table for unit test. @@ -53,7 +53,7 @@ public TTableDescriptor toThrift() { } @Override - public List initSchema() { - return ((TestExternalCatalog) catalog).mockedSchema(dbName, name); + public Optional initSchema() { + return Optional.of(new SchemaCacheValue(((TestExternalCatalog) catalog).mockedSchema(dbName, name))); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java index c3cdf8a955aaa4..3d783f28cb3857 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java @@ -101,8 +101,8 @@ public void handleEvent(AcceptingChannel channel) { } context.setStartTime(); int userQueryTimeout = context.getEnv().getAuth().getQueryTimeout(context.getQualifiedUser()); - if (userQueryTimeout <= 0) { - LOG.warn("Connection set query timeout to {}", + if (userQueryTimeout <= 0 && LOG.isDebugEnabled()) { + LOG.debug("Connection set query timeout to {}", context.getSessionVariable().getQueryTimeoutS()); } context.setUserQueryTimeout(userQueryTimeout);