From 8c986568c7b47ee71a04aac9a44ac68046c45f37 Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Fri, 28 Jul 2023 11:53:12 +0800 Subject: [PATCH 1/2] Add iceberg metadata cache, include : snapshotList/tableRef/manifestContent --- .../catalog/HiveMetaStoreClientHelper.java | 16 -- .../catalog/external/HMSExternalTable.java | 5 +- .../datasource/ExternalMetaCacheMgr.java | 12 + .../iceberg/IcebergExternalCatalog.java | 8 +- .../external/iceberg/IcebergApiSource.java | 10 +- .../external/iceberg/IcebergHMSSource.java | 7 +- .../iceberg/IcebergMetadataCache.java | 265 ++++++++++++++++++ .../iceberg/IcebergMetadataCacheMgr.java | 57 ++++ .../external/iceberg/IcebergScanNode.java | 58 ++-- .../doris/statistics/util/StatisticsUtil.java | 6 +- .../tablefunction/MetadataGenerator.java | 25 +- 11 files changed, 411 insertions(+), 58 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCacheMgr.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java index fbb3c71b29bb66..586bdd73c4f730 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java @@ -76,7 +76,6 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; -import org.apache.iceberg.catalog.TableIdentifier; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import shade.doris.hive.org.apache.thrift.TException; @@ -90,7 +89,6 @@ import java.util.ArrayList; import java.util.Date; import java.util.Deque; -import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -896,20 +894,6 @@ public static String showCreateTable(org.apache.hadoop.hive.metastore.api.Table return output.toString(); } - public static org.apache.iceberg.Table getIcebergTable(HMSExternalTable table) { - String metastoreUri = table.getMetastoreUri(); - org.apache.iceberg.hive.HiveCatalog hiveCatalog = new org.apache.iceberg.hive.HiveCatalog(); - Configuration conf = getConfiguration(table); - hiveCatalog.setConf(conf); - // initialize hive catalog - Map catalogProperties = new HashMap<>(); - catalogProperties.put(HMSProperties.HIVE_METASTORE_URIS, metastoreUri); - catalogProperties.put("uri", metastoreUri); - hiveCatalog.initialize("hive", catalogProperties); - - return hiveCatalog.loadTable(TableIdentifier.of(table.getDbName(), table.getName())); - } - public static Schema getHudiTableSchema(HMSExternalTable table) { HoodieTableMetaClient metaClient = getHudiClient(table); TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java index b985e4cddd7d72..beb1917b05c2fb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java @@ -434,7 +434,7 @@ public long estimatedRowCount() { } private List getIcebergSchema(List hmsSchema) { - Table icebergTable = HiveMetaStoreClientHelper.getIcebergTable(this); + Table icebergTable = Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(this); Schema schema = icebergTable.schema(); List tmpSchema = Lists.newArrayListWithCapacity(hmsSchema.size()); for (FieldSchema field : hmsSchema) { @@ -470,7 +470,8 @@ public Optional getColumnStatistic(String colName) { case HIVE: return getHiveColumnStats(colName); case ICEBERG: - return StatisticsUtil.getIcebergColumnStats(colName, HiveMetaStoreClientHelper.getIcebergTable(this)); + return StatisticsUtil.getIcebergColumnStats(colName, + Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(this)); default: LOG.warn("get column stats for dlaType {} is not supported.", dlaType); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java index 57f473f1ee53bd..68e77b1a2e5ed4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java @@ -26,6 +26,8 @@ import org.apache.doris.fs.FileSystemCache; import org.apache.doris.planner.external.hudi.HudiPartitionMgr; import org.apache.doris.planner.external.hudi.HudiPartitionProcessor; +import org.apache.doris.planner.external.iceberg.IcebergMetadataCache; +import org.apache.doris.planner.external.iceberg.IcebergMetadataCacheMgr; import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; @@ -52,6 +54,7 @@ public class ExternalMetaCacheMgr { private ExecutorService executor; // all catalogs could share the same fsCache. private FileSystemCache fsCache; + private final IcebergMetadataCacheMgr icebergMetadataCacheMgr; public ExternalMetaCacheMgr() { executor = ThreadPoolManager.newDaemonFixedThreadPool( @@ -60,6 +63,7 @@ public ExternalMetaCacheMgr() { "ExternalMetaCacheMgr", 120, true); hudiPartitionMgr = HudiPartitionMgr.get(executor); fsCache = new FileSystemCache(executor); + icebergMetadataCacheMgr = IcebergMetadataCacheMgr.get(); } public HiveMetaStoreCache getMetaStoreCache(HMSExternalCatalog catalog) { @@ -92,6 +96,10 @@ public HudiPartitionProcessor getHudiPartitionProcess(ExternalCatalog catalog) { return hudiPartitionMgr.getPartitionProcessor(catalog); } + public IcebergMetadataCache getIcebergMetadataCache() { + return icebergMetadataCacheMgr.getIcebergMetadataCache(); + } + public FileSystemCache getFsCache() { return fsCache; } @@ -104,6 +112,7 @@ public void removeCache(long catalogId) { LOG.info("remove schema cache for catalog {}", catalogId); } hudiPartitionMgr.removePartitionProcessor(catalogId); + icebergMetadataCacheMgr.removeCache(catalogId); } public void invalidateTableCache(long catalogId, String dbName, String tblName) { @@ -117,6 +126,7 @@ public void invalidateTableCache(long catalogId, String dbName, String tblName) metaCache.invalidateTableCache(dbName, tblName); } hudiPartitionMgr.cleanTablePartitions(catalogId, dbName, tblName); + icebergMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName); LOG.debug("invalid table cache for {}.{} in catalog {}", dbName, tblName, catalogId); } @@ -131,6 +141,7 @@ public void invalidateDbCache(long catalogId, String dbName) { metaCache.invalidateDbCache(dbName); } hudiPartitionMgr.cleanDatabasePartitions(catalogId, dbName); + icebergMetadataCacheMgr.invalidateDbCache(catalogId, dbName); LOG.debug("invalid db cache for {} in catalog {}", dbName, catalogId); } @@ -144,6 +155,7 @@ public void invalidateCatalogCache(long catalogId) { metaCache.invalidateAll(); } hudiPartitionMgr.cleanPartitionProcess(catalogId); + icebergMetadataCacheMgr.invalidateCatalogCache(catalogId); LOG.debug("invalid catalog cache for {}", catalogId); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java index a5b7f4d7afff08..50816b77a097fa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java @@ -17,6 +17,7 @@ package org.apache.doris.datasource.iceberg; +import org.apache.doris.catalog.Env; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.FeNameFormat; import org.apache.doris.common.util.Util; @@ -48,9 +49,11 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog { protected String icebergCatalogType; protected Catalog catalog; protected SupportsNamespaces nsCatalog; + private final long catalogId; public IcebergExternalCatalog(long catalogId, String name, String comment) { super(catalogId, name, InitCatalogLog.Type.ICEBERG, comment); + this.catalogId = catalogId; } @Override @@ -113,6 +116,9 @@ public List listTableNames(SessionContext ctx, String dbName) { public org.apache.iceberg.Table getIcebergTable(String dbName, String tblName) { makeSureInitialized(); - return catalog.loadTable(TableIdentifier.of(dbName, tblName)); + return Env.getCurrentEnv() + .getExtMetaCacheMgr() + .getIcebergMetadataCache() + .getIcebergTable(catalog, catalogId, dbName, tblName); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergApiSource.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergApiSource.java index 4e4bf30c50042b..0e20747dfe232b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergApiSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergApiSource.java @@ -18,6 +18,7 @@ package org.apache.doris.planner.external.iceberg; import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.external.IcebergExternalTable; import org.apache.doris.common.MetaNotFoundException; @@ -45,8 +46,13 @@ public class IcebergApiSource implements IcebergSource { public IcebergApiSource(IcebergExternalTable table, TupleDescriptor desc, Map columnNameToRange) { this.icebergExtTable = table; - this.originTable = ((IcebergExternalCatalog) icebergExtTable.getCatalog()) - .getIcebergTable(icebergExtTable.getDbName(), icebergExtTable.getName()); + + this.originTable = Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable( + ((IcebergExternalCatalog) icebergExtTable.getCatalog()).getCatalog(), + icebergExtTable.getCatalog().getId(), + icebergExtTable.getDbName(), + icebergExtTable.getName()); + this.desc = desc; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java index 0fc0f39dffc47f..478e78c0d0e96e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java @@ -18,7 +18,7 @@ package org.apache.doris.planner.external.iceberg; import org.apache.doris.analysis.TupleDescriptor; -import org.apache.doris.catalog.HiveMetaStoreClientHelper; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.DdlException; @@ -39,12 +39,15 @@ public class IcebergHMSSource implements IcebergSource { private final HMSExternalTable hmsTable; private final TupleDescriptor desc; private final Map columnNameToRange; + private final org.apache.iceberg.Table icebergTable; public IcebergHMSSource(HMSExternalTable hmsTable, TupleDescriptor desc, Map columnNameToRange) { this.hmsTable = hmsTable; this.desc = desc; this.columnNameToRange = columnNameToRange; + this.icebergTable = + Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(hmsTable); } @Override @@ -59,7 +62,7 @@ public String getFileFormat() throws DdlException, MetaNotFoundException { } public org.apache.iceberg.Table getIcebergTable() throws MetaNotFoundException { - return HiveMetaStoreClientHelper.getIcebergTable(hmsTable); + return icebergTable; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java new file mode 100644 index 00000000000000..564c1b4955691b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java @@ -0,0 +1,265 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external.iceberg; + +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.external.HMSExternalTable; +import org.apache.doris.common.Config; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.HMSExternalCatalog; +import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; +import org.apache.doris.datasource.property.constants.HMSProperties; +import org.apache.doris.thrift.TIcebergMetadataParams; + +import avro.shaded.com.google.common.collect.Lists; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.collect.Iterables; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.hive.HiveCatalog; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +public class IcebergMetadataCache { + + private final Cache> snapshotListCache; + private final Cache tableCache; + + public IcebergMetadataCache() { + this.snapshotListCache = CacheBuilder.newBuilder().maximumSize(Config.max_hive_table_cache_num) + .expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES) + .build(); + + this.tableCache = CacheBuilder.newBuilder().maximumSize(Config.max_hive_table_cache_num) + .expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES) + .build(); + } + + public List getSnapshotList(TIcebergMetadataParams params) throws UserException { + CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(params.getCatalog()); + IcebergMetadataCacheKey key = + IcebergMetadataCacheKey.of(catalog.getId(), params.getDatabase(), params.getTable()); + List ifPresent = snapshotListCache.getIfPresent(key); + if (ifPresent != null) { + return ifPresent; + } + + Table icebergTable = getIcebergTable(key, catalog, params.getDatabase(), params.getTable()); + List snaps = Lists.newArrayList(); + Iterables.addAll(snaps, icebergTable.snapshots()); + snapshotListCache.put(key, snaps); + return snaps; + } + + public Table getIcebergTable(IcebergMetadataCacheKey key, CatalogIf catalog, String dbName, String tbName) + throws UserException { + Table cacheTable = tableCache.getIfPresent(key); + if (cacheTable != null) { + return cacheTable; + } + + Table icebergTable; + if (catalog instanceof HMSExternalCatalog) { + HMSExternalCatalog ctg = (HMSExternalCatalog) catalog; + icebergTable = createIcebergTable( + ctg.getHiveMetastoreUris(), + ctg.getCatalogProperty().getHadoopProperties(), + dbName, + tbName); + } else if (catalog instanceof IcebergExternalCatalog) { + IcebergExternalCatalog icebergExternalCatalog = (IcebergExternalCatalog) catalog; + icebergTable = getIcebergTable( + icebergExternalCatalog.getCatalog(), icebergExternalCatalog.getId(), dbName, tbName); + } else { + throw new UserException("Only support 'hms' and 'iceberg' type for iceberg table"); + } + tableCache.put(key, icebergTable); + return icebergTable; + } + + public Table getIcebergTable(IcebergSource icebergSource) throws MetaNotFoundException { + return icebergSource.getIcebergTable(); + } + + public Table getIcebergTable(HMSExternalTable hmsTable) { + IcebergMetadataCacheKey key = IcebergMetadataCacheKey.of( + hmsTable.getCatalog().getId(), + hmsTable.getDbName(), + hmsTable.getName()); + Table table = tableCache.getIfPresent(key); + if (table != null) { + return table; + } + Table icebergTable = createIcebergTable(hmsTable); + tableCache.put(key, icebergTable); + + return icebergTable; + } + + public Table getIcebergTable(Catalog catalog, long catalogId, String dbName, String tbName) { + IcebergMetadataCacheKey key = IcebergMetadataCacheKey.of( + catalogId, + dbName, + tbName); + Table cacheTable = tableCache.getIfPresent(key); + if (cacheTable != null) { + return cacheTable; + } + Table table = catalog.loadTable(TableIdentifier.of(dbName, tbName)); + initIcebergTableFileIO(table); + + tableCache.put(key, table); + + return table; + } + + public void invalidateCatalogCache(long catalogId) { + snapshotListCache.asMap().keySet().stream() + .filter(key -> key.catalogId == catalogId) + .forEach(snapshotListCache::invalidate); + + tableCache.asMap().entrySet().stream() + .filter(entry -> entry.getKey().catalogId == catalogId) + .forEach(entry -> { + ManifestFiles.dropCache(entry.getValue().io()); + tableCache.invalidate(entry.getKey()); + }); + } + + public void invalidateTableCache(long catalogId, String dbName, String tblName) { + snapshotListCache.asMap().keySet().stream() + .filter(key -> key.catalogId == catalogId && key.dbName.equals(dbName) && key.tableName.equals(tblName)) + .forEach(snapshotListCache::invalidate); + + tableCache.asMap().entrySet().stream() + .filter(entry -> { + IcebergMetadataCacheKey key = entry.getKey(); + return key.catalogId == catalogId && key.dbName.equals(dbName) && key.tableName.equals(tblName); + }) + .forEach(entry -> { + ManifestFiles.dropCache(entry.getValue().io()); + tableCache.invalidate(entry.getKey()); + }); + } + + public void invalidateDbCache(long catalogId, String dbName) { + snapshotListCache.asMap().keySet().stream() + .filter(key -> key.catalogId == catalogId && key.dbName.equals(dbName)) + .forEach(snapshotListCache::invalidate); + + tableCache.asMap().entrySet().stream() + .filter(entry -> { + IcebergMetadataCacheKey key = entry.getKey(); + return key.catalogId == catalogId && key.dbName.equals(dbName); + }) + .forEach(entry -> { + ManifestFiles.dropCache(entry.getValue().io()); + tableCache.invalidate(entry.getKey()); + }); + } + + private Table createIcebergTable(String uri, Map hdfsConf, String db, String tbl) { + // set hdfs configure + Configuration conf = new HdfsConfiguration(); + for (Map.Entry entry : hdfsConf.entrySet()) { + conf.set(entry.getKey(), entry.getValue()); + } + + HiveCatalog hiveCatalog = new HiveCatalog(); + hiveCatalog.setConf(conf); + + Map catalogProperties = new HashMap<>(); + catalogProperties.put(HMSProperties.HIVE_METASTORE_URIS, uri); + catalogProperties.put("uri", uri); + hiveCatalog.initialize("hive", catalogProperties); + + Table table = hiveCatalog.loadTable(TableIdentifier.of(db, tbl)); + + initIcebergTableFileIO(table); + + return table; + } + + private Table createIcebergTable(HMSExternalTable hmsTable) { + return createIcebergTable(hmsTable.getMetastoreUri(), + hmsTable.getHadoopProperties(), + hmsTable.getDbName(), + hmsTable.getName()); + } + + private void initIcebergTableFileIO(Table table) { + Map ioConf = new HashMap<>(); + table.properties().forEach((key, value) -> { + if (key.startsWith("io.")) { + ioConf.put(key, value); + } + }); + table.io().initialize(ioConf); + } + + static class IcebergMetadataCacheKey { + long catalogId; + String dbName; + String tableName; + + public IcebergMetadataCacheKey(long catalogId, String dbName, String tableName) { + this.catalogId = catalogId; + this.dbName = dbName; + this.tableName = tableName; + } + + static IcebergMetadataCacheKey of(long catalogId, String dbName, String tableName) { + return new IcebergMetadataCacheKey( + catalogId, + dbName, + tableName + ); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + IcebergMetadataCacheKey that = (IcebergMetadataCacheKey) o; + return catalogId == that.catalogId + && Objects.equals(dbName, that.dbName) + && Objects.equals(tableName, that.tableName); + } + + @Override + public int hashCode() { + return Objects.hash(catalogId, dbName, tableName); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCacheMgr.java new file mode 100644 index 00000000000000..1018e318f2c228 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCacheMgr.java @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external.iceberg; + +public class IcebergMetadataCacheMgr { + + private static volatile IcebergMetadataCacheMgr INSTANCE; + private final IcebergMetadataCache icebergMetadataCache = new IcebergMetadataCache(); + + public IcebergMetadataCacheMgr() { } + + public IcebergMetadataCache getIcebergMetadataCache() { + return icebergMetadataCache; + } + + public static IcebergMetadataCacheMgr get() { + if (INSTANCE == null) { + synchronized (IcebergMetadataCacheMgr.class) { + if (INSTANCE == null) { + INSTANCE = new IcebergMetadataCacheMgr(); + } + } + } + return INSTANCE; + } + + public void removeCache(long catalogId) { + icebergMetadataCache.invalidateCatalogCache(catalogId); + } + + public void invalidateCatalogCache(long catalogId) { + icebergMetadataCache.invalidateCatalogCache(catalogId); + } + + public void invalidateTableCache(long catalogId, String dbName, String tblName) { + icebergMetadataCache.invalidateTableCache(catalogId, dbName, tblName); + } + + public void invalidateDbCache(long catalogId, String dbName) { + icebergMetadataCache.invalidateDbCache(catalogId, dbName); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java index 3d3634fb669c25..3f13a16b91e886 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java @@ -20,6 +20,7 @@ import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.TableSnapshot; import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.external.ExternalTable; import org.apache.doris.catalog.external.HMSExternalTable; @@ -77,6 +78,7 @@ public class IcebergScanNode extends FileQueryScanNode { public static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2; private IcebergSource source; + private Table icebergTable; /** * External file scan node for Query iceberg table @@ -109,8 +111,8 @@ public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckCol @Override protected void doInitialize() throws UserException { + icebergTable = Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(source); super.doInitialize(); - } public static void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit icebergSplit) { @@ -153,34 +155,30 @@ public static void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit icebe @Override public List getSplits() throws UserException { + + TableScan scan = icebergTable.newScan(); + + // set snapshot + Long snapshotId = getSpecifiedSnapshot(); + if (snapshotId != null) { + scan = scan.useSnapshot(snapshotId); + } + + // set filter List expressions = new ArrayList<>(); - org.apache.iceberg.Table table = source.getIcebergTable(); for (Expr conjunct : conjuncts) { - Expression expression = IcebergUtils.convertToIcebergExpr(conjunct, table.schema()); + Expression expression = IcebergUtils.convertToIcebergExpr(conjunct, icebergTable.schema()); if (expression != null) { expressions.add(expression); } } - TableScan scan = table.newScan(); - TableSnapshot tableSnapshot = source.getDesc().getRef().getTableSnapshot(); - if (tableSnapshot != null) { - TableSnapshot.VersionType type = tableSnapshot.getType(); - try { - if (type == TableSnapshot.VersionType.VERSION) { - scan = scan.useSnapshot(tableSnapshot.getVersion()); - } else { - long snapshotId = TimeUtils.timeStringToLong(tableSnapshot.getTime(), TimeUtils.getTimeZone()); - scan = scan.useSnapshot(getSnapshotIdAsOfTime(table.history(), snapshotId)); - } - } catch (IllegalArgumentException e) { - throw new UserException(e); - } - } for (Expression predicate : expressions) { scan = scan.filter(predicate); } + + // get splits List splits = new ArrayList<>(); - int formatVersion = ((BaseTable) table).operations().current().formatVersion(); + int formatVersion = ((BaseTable) icebergTable).operations().current().formatVersion(); // Min split size is DEFAULT_SPLIT_SIZE(128MB). long splitSize = Math.max(ConnectContext.get().getSessionVariable().getFileSplitSize(), DEFAULT_SPLIT_SIZE); CloseableIterable fileScanTasks = TableScanUtil.splitFiles(scan.planFiles(), splitSize); @@ -209,6 +207,24 @@ public List getSplits() throws UserException { return splits; } + public Long getSpecifiedSnapshot() throws UserException { + TableSnapshot tableSnapshot = source.getDesc().getRef().getTableSnapshot(); + if (tableSnapshot != null) { + TableSnapshot.VersionType type = tableSnapshot.getType(); + try { + if (type == TableSnapshot.VersionType.VERSION) { + return tableSnapshot.getVersion(); + } else { + long snapshotId = TimeUtils.timeStringToLong(tableSnapshot.getTime(), TimeUtils.getTimeZone()); + return getSnapshotIdAsOfTime(icebergTable.history(), snapshotId); + } + } catch (IllegalArgumentException e) { + throw new UserException(e); + } + } + return null; + } + private long getSnapshotIdAsOfTime(List historyEntries, long asOfTimestamp) { // find history at or before asOfTimestamp HistoryEntry latestHistory = null; @@ -254,14 +270,12 @@ private List getDeleteFileFilters(FileScanTask spitTask @Override public TFileType getLocationType() throws UserException { - Table icebergTable = source.getIcebergTable(); String location = icebergTable.location(); return getLocationType(location); } @Override public TFileType getLocationType(String location) throws UserException { - Table icebergTable = source.getIcebergTable(); return getTFileType(location).orElseThrow(() -> new DdlException("Unknown file location " + location + " for iceberg table " + icebergTable.name())); } @@ -287,7 +301,7 @@ public TFileAttributes getFileAttributes() throws UserException { @Override public List getPathPartitionKeys() throws UserException { - return source.getIcebergTable().spec().fields().stream().map(PartitionField::name).map(String::toLowerCase) + return icebergTable.spec().fields().stream().map(PartitionField::name).map(String::toLowerCase) .collect(Collectors.toList()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index b118a7d02fdcf6..1551367665374d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -32,7 +32,6 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.HiveMetaStoreClientHelper; import org.apache.doris.catalog.ListPartitionItem; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; @@ -523,7 +522,10 @@ public static long getHiveRowCount(HMSExternalTable table) { public static long getIcebergRowCount(HMSExternalTable table) { long rowCount = 0; try { - Table icebergTable = HiveMetaStoreClientHelper.getIcebergTable(table); + Table icebergTable = Env.getCurrentEnv() + .getExtMetaCacheMgr() + .getIcebergMetadataCache() + .getIcebergTable(table); TableScan tableScan = icebergTable.newScan().includeColumnStats(); for (FileScanTask task : tableScan.planFiles()) { rowCount += task.file().recordCount(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java index 1dcc8418d0999a..00bcf168cdc763 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java @@ -20,11 +20,13 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.UserException; import org.apache.doris.common.proc.FrontendsProcNode; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.HMSExternalCatalog; import org.apache.doris.datasource.property.constants.HMSProperties; +import org.apache.doris.planner.external.iceberg.IcebergMetadataCache; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TBackendsMetadataParams; @@ -108,21 +110,22 @@ private static TFetchSchemaTableDataResult icebergMetadataResult(TMetadataTableR if (!params.isSetIcebergMetadataParams()) { return errorResult("Iceberg metadata params is not set."); } + TIcebergMetadataParams icebergMetadataParams = params.getIcebergMetadataParams(); - HMSExternalCatalog catalog = (HMSExternalCatalog) Env.getCurrentEnv().getCatalogMgr() - .getCatalog(icebergMetadataParams.getCatalog()); - org.apache.iceberg.Table table; - try { - table = getIcebergTable(catalog, icebergMetadataParams.getDatabase(), icebergMetadataParams.getTable()); - } catch (MetaNotFoundException e) { - return errorResult(e.getMessage()); - } - TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult(); - List dataBatch = Lists.newArrayList(); TIcebergQueryType icebergQueryType = icebergMetadataParams.getIcebergQueryType(); + IcebergMetadataCache icebergMetadataCache = Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache(); + List dataBatch = Lists.newArrayList(); + TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult(); + switch (icebergQueryType) { case SNAPSHOTS: - for (Snapshot snapshot : table.snapshots()) { + List snapshotList; + try { + snapshotList = icebergMetadataCache.getSnapshotList(icebergMetadataParams); + } catch (UserException e) { + return errorResult(e.getMessage()); + } + for (Snapshot snapshot : snapshotList) { TRow trow = new TRow(); LocalDateTime committedAt = LocalDateTime.ofInstant(Instant.ofEpochMilli( snapshot.timestampMillis()), TimeUtils.getTimeZone().toZoneId()); From b8e3eb6de4da05261bbf8ecad6f607dacef84242 Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Sat, 29 Jul 2023 10:01:52 +0800 Subject: [PATCH 2/2] There is no need to use the singleton pattern --- .../doris/datasource/ExternalMetaCacheMgr.java | 2 +- .../external/iceberg/IcebergMetadataCacheMgr.java | 15 ++------------- 2 files changed, 3 insertions(+), 14 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java index 68e77b1a2e5ed4..77254fd633e473 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java @@ -63,7 +63,7 @@ public ExternalMetaCacheMgr() { "ExternalMetaCacheMgr", 120, true); hudiPartitionMgr = HudiPartitionMgr.get(executor); fsCache = new FileSystemCache(executor); - icebergMetadataCacheMgr = IcebergMetadataCacheMgr.get(); + icebergMetadataCacheMgr = new IcebergMetadataCacheMgr(); } public HiveMetaStoreCache getMetaStoreCache(HMSExternalCatalog catalog) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCacheMgr.java index 1018e318f2c228..5708e54213ac2a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCacheMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCacheMgr.java @@ -19,26 +19,15 @@ public class IcebergMetadataCacheMgr { - private static volatile IcebergMetadataCacheMgr INSTANCE; private final IcebergMetadataCache icebergMetadataCache = new IcebergMetadataCache(); - public IcebergMetadataCacheMgr() { } + public IcebergMetadataCacheMgr() { + } public IcebergMetadataCache getIcebergMetadataCache() { return icebergMetadataCache; } - public static IcebergMetadataCacheMgr get() { - if (INSTANCE == null) { - synchronized (IcebergMetadataCacheMgr.class) { - if (INSTANCE == null) { - INSTANCE = new IcebergMetadataCacheMgr(); - } - } - } - return INSTANCE; - } - public void removeCache(long catalogId) { icebergMetadataCache.invalidateCatalogCache(catalogId); }