Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import shade.doris.hive.org.apache.thrift.TException;
Expand All @@ -90,7 +89,6 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -896,20 +894,6 @@ public static String showCreateTable(org.apache.hadoop.hive.metastore.api.Table
return output.toString();
}

public static org.apache.iceberg.Table getIcebergTable(HMSExternalTable table) {
String metastoreUri = table.getMetastoreUri();
org.apache.iceberg.hive.HiveCatalog hiveCatalog = new org.apache.iceberg.hive.HiveCatalog();
Configuration conf = getConfiguration(table);
hiveCatalog.setConf(conf);
// initialize hive catalog
Map<String, String> catalogProperties = new HashMap<>();
catalogProperties.put(HMSProperties.HIVE_METASTORE_URIS, metastoreUri);
catalogProperties.put("uri", metastoreUri);
hiveCatalog.initialize("hive", catalogProperties);

return hiveCatalog.loadTable(TableIdentifier.of(table.getDbName(), table.getName()));
}

public static Schema getHudiTableSchema(HMSExternalTable table) {
HoodieTableMetaClient metaClient = getHudiClient(table);
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ public long estimatedRowCount() {
}

private List<Column> getIcebergSchema(List<FieldSchema> hmsSchema) {
Table icebergTable = HiveMetaStoreClientHelper.getIcebergTable(this);
Table icebergTable = Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(this);
Schema schema = icebergTable.schema();
List<Column> tmpSchema = Lists.newArrayListWithCapacity(hmsSchema.size());
for (FieldSchema field : hmsSchema) {
Expand Down Expand Up @@ -470,7 +470,8 @@ public Optional<ColumnStatistic> getColumnStatistic(String colName) {
case HIVE:
return getHiveColumnStats(colName);
case ICEBERG:
return StatisticsUtil.getIcebergColumnStats(colName, HiveMetaStoreClientHelper.getIcebergTable(this));
return StatisticsUtil.getIcebergColumnStats(colName,
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(this));
default:
LOG.warn("get column stats for dlaType {} is not supported.", dlaType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.doris.fs.FileSystemCache;
import org.apache.doris.planner.external.hudi.HudiPartitionMgr;
import org.apache.doris.planner.external.hudi.HudiPartitionProcessor;
import org.apache.doris.planner.external.iceberg.IcebergMetadataCache;
import org.apache.doris.planner.external.iceberg.IcebergMetadataCacheMgr;

import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
Expand All @@ -52,6 +54,7 @@ public class ExternalMetaCacheMgr {
private ExecutorService executor;
// all catalogs could share the same fsCache.
private FileSystemCache fsCache;
private final IcebergMetadataCacheMgr icebergMetadataCacheMgr;

public ExternalMetaCacheMgr() {
executor = ThreadPoolManager.newDaemonFixedThreadPool(
Expand All @@ -60,6 +63,7 @@ public ExternalMetaCacheMgr() {
"ExternalMetaCacheMgr", 120, true);
hudiPartitionMgr = HudiPartitionMgr.get(executor);
fsCache = new FileSystemCache(executor);
icebergMetadataCacheMgr = new IcebergMetadataCacheMgr();
}

public HiveMetaStoreCache getMetaStoreCache(HMSExternalCatalog catalog) {
Expand Down Expand Up @@ -92,6 +96,10 @@ public HudiPartitionProcessor getHudiPartitionProcess(ExternalCatalog catalog) {
return hudiPartitionMgr.getPartitionProcessor(catalog);
}

public IcebergMetadataCache getIcebergMetadataCache() {
return icebergMetadataCacheMgr.getIcebergMetadataCache();
}

public FileSystemCache getFsCache() {
return fsCache;
}
Expand All @@ -104,6 +112,7 @@ public void removeCache(long catalogId) {
LOG.info("remove schema cache for catalog {}", catalogId);
}
hudiPartitionMgr.removePartitionProcessor(catalogId);
icebergMetadataCacheMgr.removeCache(catalogId);
}

public void invalidateTableCache(long catalogId, String dbName, String tblName) {
Expand All @@ -117,6 +126,7 @@ public void invalidateTableCache(long catalogId, String dbName, String tblName)
metaCache.invalidateTableCache(dbName, tblName);
}
hudiPartitionMgr.cleanTablePartitions(catalogId, dbName, tblName);
icebergMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName);
LOG.debug("invalid table cache for {}.{} in catalog {}", dbName, tblName, catalogId);
}

Expand All @@ -131,6 +141,7 @@ public void invalidateDbCache(long catalogId, String dbName) {
metaCache.invalidateDbCache(dbName);
}
hudiPartitionMgr.cleanDatabasePartitions(catalogId, dbName);
icebergMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
LOG.debug("invalid db cache for {} in catalog {}", dbName, catalogId);
}

Expand All @@ -144,6 +155,7 @@ public void invalidateCatalogCache(long catalogId) {
metaCache.invalidateAll();
}
hudiPartitionMgr.cleanPartitionProcess(catalogId);
icebergMetadataCacheMgr.invalidateCatalogCache(catalogId);
LOG.debug("invalid catalog cache for {}", catalogId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.datasource.iceberg;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.util.Util;
Expand Down Expand Up @@ -48,9 +49,11 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog {
protected String icebergCatalogType;
protected Catalog catalog;
protected SupportsNamespaces nsCatalog;
private final long catalogId;

public IcebergExternalCatalog(long catalogId, String name, String comment) {
super(catalogId, name, InitCatalogLog.Type.ICEBERG, comment);
this.catalogId = catalogId;
}

@Override
Expand Down Expand Up @@ -113,6 +116,9 @@ public List<String> listTableNames(SessionContext ctx, String dbName) {

public org.apache.iceberg.Table getIcebergTable(String dbName, String tblName) {
makeSureInitialized();
return catalog.loadTable(TableIdentifier.of(dbName, tblName));
return Env.getCurrentEnv()
.getExtMetaCacheMgr()
.getIcebergMetadataCache()
.getIcebergTable(catalog, catalogId, dbName, tblName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.planner.external.iceberg;

import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.external.IcebergExternalTable;
import org.apache.doris.common.MetaNotFoundException;
Expand Down Expand Up @@ -45,8 +46,13 @@ public class IcebergApiSource implements IcebergSource {
public IcebergApiSource(IcebergExternalTable table, TupleDescriptor desc,
Map<String, ColumnRange> columnNameToRange) {
this.icebergExtTable = table;
this.originTable = ((IcebergExternalCatalog) icebergExtTable.getCatalog())
.getIcebergTable(icebergExtTable.getDbName(), icebergExtTable.getName());

this.originTable = Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(
((IcebergExternalCatalog) icebergExtTable.getCatalog()).getCatalog(),
icebergExtTable.getCatalog().getId(),
icebergExtTable.getDbName(),
icebergExtTable.getName());

this.desc = desc;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.doris.planner.external.iceberg;

import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.DdlException;
Expand All @@ -39,12 +39,15 @@ public class IcebergHMSSource implements IcebergSource {
private final HMSExternalTable hmsTable;
private final TupleDescriptor desc;
private final Map<String, ColumnRange> columnNameToRange;
private final org.apache.iceberg.Table icebergTable;

public IcebergHMSSource(HMSExternalTable hmsTable, TupleDescriptor desc,
Map<String, ColumnRange> columnNameToRange) {
this.hmsTable = hmsTable;
this.desc = desc;
this.columnNameToRange = columnNameToRange;
this.icebergTable =
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(hmsTable);
}

@Override
Expand All @@ -59,7 +62,7 @@ public String getFileFormat() throws DdlException, MetaNotFoundException {
}

public org.apache.iceberg.Table getIcebergTable() throws MetaNotFoundException {
return HiveMetaStoreClientHelper.getIcebergTable(hmsTable);
return icebergTable;
}

@Override
Expand Down
Loading