Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.doris.analysis.DropDbStmt;
import org.apache.doris.analysis.DropTableStmt;
import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.InfoSchemaDb;
Expand Down Expand Up @@ -386,7 +385,7 @@ public void onRefresh(boolean invalidCache) {
}
}

public final List<Column> getSchema(String dbName, String tblName) {
public final Optional<SchemaCacheValue> getSchema(String dbName, String tblName) {
makeSureInitialized();
Optional<ExternalDatabase<? extends ExternalTable>> db = getDb(dbName);
if (db.isPresent()) {
Expand All @@ -395,9 +394,7 @@ public final List<Column> getSchema(String dbName, String tblName) {
return table.get().initSchemaAndUpdateTime();
}
}
// return one column with unsupported type.
// not return empty to avoid some unexpected issue.
return Lists.newArrayList(Column.UNSUPPORTED_COLUMN);
return Optional.empty();
}

@Override
Expand Down Expand Up @@ -507,7 +504,7 @@ public ExternalDatabase<? extends ExternalTable> getDbNullable(long dbId) {
}

if (useMetaCache.get()) {
return metaCache.getMetaObjById(dbId).get();
return metaCache.getMetaObjById(dbId).orElse(null);
} else {
return idToDb.get(dbId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ public Set<String> getTableNamesWithLock() {
public T getTableNullable(String tableName) {
makeSureInitialized();
if (extCatalog.getUseMetaCache().get()) {
return metaCache.getMetaObj(tableName).get();
return metaCache.getMetaObj(tableName).orElse(null);
} else {
if (!tableNameToId.containsKey(tableName)) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ExecutorService;
Expand All @@ -42,7 +42,7 @@ public class ExternalSchemaCache {
private static final Logger LOG = LogManager.getLogger(ExternalSchemaCache.class);
private final ExternalCatalog catalog;

private LoadingCache<SchemaCacheKey, ImmutableList<Column>> schemaCache;
private LoadingCache<SchemaCacheKey, Optional<SchemaCacheValue>> schemaCache;

public ExternalSchemaCache(ExternalCatalog catalog, ExecutorService executor) {
this.catalog = catalog;
Expand Down Expand Up @@ -73,22 +73,22 @@ public Long getValue() {
MetricRepo.DORIS_METRIC_REGISTER.addMetrics(schemaCacheGauge);
}

private ImmutableList<Column> loadSchema(SchemaCacheKey key) {
ImmutableList<Column> schema = ImmutableList.copyOf(catalog.getSchema(key.dbName, key.tblName));
private Optional<SchemaCacheValue> loadSchema(SchemaCacheKey key) {
Optional<SchemaCacheValue> schema = catalog.getSchema(key.dbName, key.tblName);
if (LOG.isDebugEnabled()) {
LOG.debug("load schema for {} in catalog {}", key, catalog.getName());
}
return schema;
}

public List<Column> getSchema(String dbName, String tblName) {
public Optional<SchemaCacheValue> getSchemaValue(String dbName, String tblName) {
SchemaCacheKey key = new SchemaCacheKey(dbName, tblName);
return schemaCache.get(key);
}

public void addSchemaForTest(String dbName, String tblName, ImmutableList<Column> schema) {
SchemaCacheKey key = new SchemaCacheKey(dbName, tblName);
schemaCache.put(key, schema);
schemaCache.put(key, Optional.of(new SchemaCacheValue(schema)));
}

public void invalidateTableCache(String dbName, String tblName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ public TableType getType() {
@Override
public List<Column> getFullSchema() {
ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog);
return cache.getSchema(dbName, name);
Optional<SchemaCacheValue> schemaCacheValue = cache.getSchemaValue(dbName, name);
return schemaCacheValue.map(SchemaCacheValue::getSchema).orElse(null);
}

@Override
Expand All @@ -161,7 +162,6 @@ public List<Column> getBaseSchema(boolean full) {
return getFullSchema();
}


@Override
public void setNewFullSchema(List<Column> newSchema) {
}
Expand Down Expand Up @@ -301,12 +301,12 @@ public Optional<ColumnStatistic> getColumnStatistic(String colName) {
*
* @return
*/
public List<Column> initSchemaAndUpdateTime() {
public Optional<SchemaCacheValue> initSchemaAndUpdateTime() {
schemaUpdateTime = System.currentTimeMillis();
return initSchema();
}

public List<Column> initSchema() {
public Optional<SchemaCacheValue> initSchema() {
throw new NotImplementedException("implement in sub class");
}

Expand Down Expand Up @@ -365,4 +365,9 @@ public List<Pair<String, String>> getColumnIndexPairs(Set<String> columns) {
public List<Long> getChunkSizes() {
throw new NotImplementedException("getChunkSized not implemented");
}

protected Optional<SchemaCacheValue> getSchemaCacheValue() {
ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog);
return cache.getSchemaValue(dbName, name);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource;

import org.apache.doris.catalog.Column;

import java.util.List;

/**
* The cache value of ExternalSchemaCache.
* Different external table type has different schema cache value.
* For example, Hive table has HMSSchemaCacheValue, Paimon table has PaimonSchemaCacheValue.
* All objects that should be refreshed along with schema should be put in this class.
*/
public class SchemaCacheValue {
protected List<Column> schema;

public SchemaCacheValue(List<Column> schema) {
this.schema = schema;
}

public List<Column> getSchema() {
return schema;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.EsTable;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.thrift.TEsTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;

import java.util.List;
import java.util.Optional;

/**
* Elasticsearch external table.
Expand Down Expand Up @@ -69,9 +71,11 @@ public TTableDescriptor toThrift() {
}

@Override
public List<Column> initSchema() {
public Optional<SchemaCacheValue> initSchema() {
EsRestClient restClient = ((EsExternalCatalog) catalog).getEsRestClient();
return EsUtil.genColumnsFromEs(restClient, name, null, ((EsExternalCatalog) catalog).enableMappingEsId());
return Optional.of(new SchemaCacheValue(
EsUtil.genColumnsFromEs(restClient, name, null,
((EsExternalCatalog) catalog).enableMappingEsId())));
}

private EsTable toEsTable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.datasource.hudi.HudiUtils;
import org.apache.doris.datasource.hudi.source.COWIncrementalRelation;
import org.apache.doris.datasource.hudi.source.IncrementalRelation;
Expand Down Expand Up @@ -159,7 +160,6 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
}

private volatile org.apache.hadoop.hive.metastore.api.Table remoteTable = null;
private List<Column> partitionColumns;

private DLAType dlaType = DLAType.UNKNOWN;

Expand Down Expand Up @@ -296,15 +296,17 @@ public org.apache.hadoop.hive.metastore.api.Table getRemoteTable() {

public List<Type> getPartitionColumnTypes() {
makeSureInitialized();
getFullSchema();
return partitionColumns.stream().map(c -> c.getType()).collect(Collectors.toList());
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
return schemaCacheValue.map(value -> ((HMSSchemaCacheValue) value).getPartitionColTypes())
.orElse(Collections.emptyList());
}

@Override
public List<Column> getPartitionColumns() {
makeSureInitialized();
getFullSchema();
return partitionColumns;
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
return schemaCacheValue.map(value -> ((HMSSchemaCacheValue) value).getPartitionColumns())
.orElse(Collections.emptyList());
}

public TableScanParams getScanParams() {
Expand Down Expand Up @@ -532,7 +534,7 @@ public Set<String> getPartitionNames() {
}

@Override
public List<Column> initSchemaAndUpdateTime() {
public Optional<SchemaCacheValue> initSchemaAndUpdateTime() {
org.apache.hadoop.hive.metastore.api.Table table = ((HMSExternalCatalog) catalog).getClient()
.getTable(dbName, name);
// try to use transient_lastDdlTime from hms client
Expand All @@ -554,7 +556,7 @@ public long getLastDdlTime() {
}

@Override
public List<Column> initSchema() {
public Optional<SchemaCacheValue> initSchema() {
makeSureInitialized();
List<Column> columns;
if (dlaType.equals(DLAType.ICEBERG)) {
Expand All @@ -564,8 +566,8 @@ public List<Column> initSchema() {
} else {
columns = getHiveSchema();
}
initPartitionColumns(columns);
return columns;
List<Column> partitionColumns = initPartitionColumns(columns);
return Optional.of(new HMSSchemaCacheValue(columns, partitionColumns));
}

private List<Column> getIcebergSchema() {
Expand All @@ -585,18 +587,16 @@ private List<Column> getHudiSchema() {

private List<Column> getHiveSchema() {
HMSCachedClient client = ((HMSExternalCatalog) catalog).getClient();
List<Column> columns;
List<FieldSchema> schema = client.getSchema(dbName, name);
Map<String, String> colDefaultValues = client.getDefaultColumnValues(dbName, name);
List<Column> tmpSchema = Lists.newArrayListWithCapacity(schema.size());
List<Column> columns = Lists.newArrayListWithCapacity(schema.size());
for (FieldSchema field : schema) {
String fieldName = field.getName().toLowerCase(Locale.ROOT);
String defaultValue = colDefaultValues.getOrDefault(fieldName, null);
tmpSchema.add(new Column(fieldName,
columns.add(new Column(fieldName,
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null,
true, defaultValue, field.getComment(), true, -1));
}
columns = tmpSchema;
return columns;
}

Expand All @@ -613,10 +613,10 @@ public long fetchRowCount() {
return rowCount;
}

private void initPartitionColumns(List<Column> schema) {
private List<Column> initPartitionColumns(List<Column> schema) {
List<String> partitionKeys = remoteTable.getPartitionKeys().stream().map(FieldSchema::getName)
.collect(Collectors.toList());
partitionColumns = Lists.newArrayListWithCapacity(partitionKeys.size());
List<Column> partitionColumns = Lists.newArrayListWithCapacity(partitionKeys.size());
for (String partitionKey : partitionKeys) {
// Do not use "getColumn()", which will cause dead loop
for (Column column : schema) {
Expand All @@ -636,6 +636,7 @@ private void initPartitionColumns(List<Column> schema) {
if (LOG.isDebugEnabled()) {
LOG.debug("get {} partition columns for table: {}", partitionColumns.size(), name);
}
return partitionColumns;
}

public boolean hasColumnStatistics(String colName) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.hive;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Type;
import org.apache.doris.datasource.SchemaCacheValue;

import java.util.List;
import java.util.stream.Collectors;

public class HMSSchemaCacheValue extends SchemaCacheValue {

private List<Column> partitionColumns;

public HMSSchemaCacheValue(List<Column> schema, List<Column> partitionColumns) {
super(schema);
this.partitionColumns = partitionColumns;
}

public List<Column> getPartitionColumns() {
return partitionColumns;
}

public List<Type> getPartitionColTypes() {
return partitionColumns.stream().map(Column::getType).collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.catalog.Column;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.ExternalAnalysisTask;
Expand All @@ -29,6 +30,7 @@

import java.util.HashMap;
import java.util.List;
import java.util.Optional;

public class IcebergExternalTable extends ExternalTable {

Expand All @@ -48,8 +50,8 @@ protected synchronized void makeSureInitialized() {
}

@Override
public List<Column> initSchema() {
return IcebergUtils.getSchema(catalog, dbName, name);
public Optional<SchemaCacheValue> initSchema() {
return Optional.of(new SchemaCacheValue(IcebergUtils.getSchema(catalog, dbName, name)));
}

@Override
Expand Down
Loading