diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java index 8759f9f5ef2f04..cc40ad292ce182 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java @@ -101,7 +101,7 @@ public ExternalMetaCacheMgr() { commonRefreshExecutor = ThreadPoolManager.newDaemonFixedThreadPool( Config.max_external_cache_loader_thread_pool_size, - Config.max_external_cache_loader_thread_pool_size * 1000, + Config.max_external_cache_loader_thread_pool_size * 10000, "CommonRefreshExecutor", 10, true); // The queue size should be large enough, diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index 4bb87ad5375402..bae5978cffb682 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -149,7 +149,7 @@ private void init() { CacheFactory partitionCacheFactory = new CacheFactory( OptionalLong.of(28800L), - OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60L), + OptionalLong.empty(), Config.max_hive_partition_cache_num, true, null); @@ -481,7 +481,8 @@ public List getFilesByPartitions(List partitions, List keys = partitions.stream().map(p -> p.isDummyPartition() ? FileCacheKey.createDummyCacheKey( p.getDbName(), p.getTblName(), p.getPath(), p.getInputFormat(), bindBrokerName) - : new FileCacheKey(p.getPath(), p.getInputFormat(), p.getPartitionValues(), bindBrokerName)) + : new FileCacheKey(p.getDbName(), p.getTblName(), p.getPath(), + p.getInputFormat(), p.getPartitionValues(), bindBrokerName)) .collect(Collectors.toList()); List fileLists; @@ -553,38 +554,19 @@ private List getAllPartitions(String dbName, String name, List
  • values : partitionValues.partitionValuesMap.values()) { - PartitionCacheKey partKey = new PartitionCacheKey(dbName, tblName, values); - HivePartition partition = partitionCache.getIfPresent(partKey); - if (partition != null) { - fileCacheRef.get().invalidate(new FileCacheKey(partition.getPath(), - null, partition.getPartitionValues(), null)); - partitionCache.invalidate(partKey); - } + partitionValuesCache.invalidate(new PartitionValueCacheKey(dbName, tblName, null)); + partitionCache.asMap().keySet().forEach(k -> { + if (k.isSameTable(dbName, tblName)) { + partitionCache.invalidate(k); } - partitionValuesCache.invalidate(key); - if (LOG.isDebugEnabled()) { - LOG.debug("invalid table cache for {}.{} in catalog {}, cache num: {}, cost: {} ms", - dbName, tblName, catalog.getName(), partitionValues.partitionValuesMap.size(), - (System.currentTimeMillis() - start)); + }); + long id = Util.genIdByName(dbName, tblName); + LoadingCache fileCache = fileCacheRef.get(); + fileCache.asMap().keySet().forEach(k -> { + if (k.isSameTable(id)) { + fileCache.invalidate(k); } - } else { - /** - * A file cache entry can be created reference to - * {@link org.apache.doris.planner.external.HiveSplitter#getSplits}, - * so we need to invalidate it if this is a non-partitioned table. - * We use {@link org.apache.doris.datasource.hive.HiveMetaStoreCache.FileCacheKey#createDummyCacheKey} - * to avoid invocation by Hms Client, because this method may be invoked when salve FE replay journal logs, - * and FE will exit if some network problems occur. - * */ - FileCacheKey fileCacheKey = FileCacheKey.createDummyCacheKey( - dbName, tblName, null, null, null); - fileCacheRef.get().invalidate(fileCacheKey); - } + }); } public void invalidatePartitionCache(String dbName, String tblName, String partitionName) { @@ -596,7 +578,7 @@ public void invalidatePartitionCache(String dbName, String tblName, String parti PartitionCacheKey partKey = new PartitionCacheKey(dbName, tblName, values); HivePartition partition = partitionCache.getIfPresent(partKey); if (partition != null) { - fileCacheRef.get().invalidate(new FileCacheKey(partition.getPath(), + fileCacheRef.get().invalidate(new FileCacheKey(dbName, tblName, partition.getPath(), null, partition.getPartitionValues(), null)); partitionCache.invalidate(partKey); } @@ -740,10 +722,21 @@ public void putPartitionValuesCacheForTest(PartitionValueCacheKey key, HiveParti * get fileCache ref * @return */ + @VisibleForTesting public AtomicReference> getFileCacheRef() { return fileCacheRef; } + @VisibleForTesting + public LoadingCache getPartitionValuesCache() { + return partitionValuesCache; + } + + @VisibleForTesting + public LoadingCache getPartitionCache() { + return partitionCache; + } + public List getFilesByTransaction(List partitions, ValidWriteIdList validWriteIds, boolean isFullAcid, long tableId, String bindBrokerName) { List fileCacheValues = Lists.newArrayList(); @@ -925,6 +918,10 @@ public boolean equals(Object obj) { && Objects.equals(values, ((PartitionCacheKey) obj).values); } + boolean isSameTable(String dbName, String tblName) { + return this.dbName.equals(dbName) && this.tblName.equals(tblName); + } + @Override public int hashCode() { return Objects.hash(dbName, tblName, values); @@ -949,18 +946,21 @@ public static class FileCacheKey { // e.g for file : hdfs://path/to/table/part1=a/part2=b/datafile // partitionValues would be ["part1", "part2"] protected List partitionValues; + private long id; - public FileCacheKey(String location, String inputFormat, List partitionValues, String bindBrokerName) { + public FileCacheKey(String dbName, String tblName, String location, String inputFormat, + List partitionValues, String bindBrokerName) { this.location = location; this.inputFormat = inputFormat; this.partitionValues = partitionValues == null ? Lists.newArrayList() : partitionValues; this.bindBrokerName = bindBrokerName; + this.id = Util.genIdByName(dbName, tblName); } public static FileCacheKey createDummyCacheKey(String dbName, String tblName, String location, String inputFormat, String bindBrokerName) { - FileCacheKey fileCacheKey = new FileCacheKey(location, inputFormat, null, bindBrokerName); + FileCacheKey fileCacheKey = new FileCacheKey(dbName, tblName, location, inputFormat, null, bindBrokerName); fileCacheKey.dummyKey = dbName + "." + tblName; return fileCacheKey; } @@ -980,6 +980,10 @@ public boolean equals(Object obj) { && Objects.equals(partitionValues, ((FileCacheKey) obj).partitionValues); } + boolean isSameTable(long id) { + return this.id == id; + } + @Override public int hashCode() { if (dummyKey != null) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveMetaStoreCacheTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveMetaStoreCacheTest.java new file mode 100644 index 00000000000000..607fc3b65394be --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveMetaStoreCacheTest.java @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hive; + +import org.apache.doris.common.ThreadPoolManager; + +import com.github.benmanes.caffeine.cache.LoadingCache; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.concurrent.ThreadPoolExecutor; + +public class HiveMetaStoreCacheTest { + + @Test + public void testInvalidateTableCache() { + ThreadPoolExecutor executor = ThreadPoolManager.newDaemonFixedThreadPool( + 1, 1, "refresh", 1, false); + ThreadPoolExecutor listExecutor = ThreadPoolManager.newDaemonFixedThreadPool( + 1, 1, "file", 1, false); + + HiveMetaStoreCache hiveMetaStoreCache = new HiveMetaStoreCache( + new HMSExternalCatalog(1L, "catalog", null, new HashMap<>(), null), executor, listExecutor); + + LoadingCache fileCache = hiveMetaStoreCache.getFileCacheRef().get(); + LoadingCache partitionCache = hiveMetaStoreCache.getPartitionCache(); + LoadingCache partitionValuesCache = hiveMetaStoreCache.getPartitionValuesCache(); + + String dbName = "db"; + String tbName = "tb"; + String tbName2 = "tb2"; + + putCache(fileCache, partitionCache, partitionValuesCache, dbName, tbName); + Assertions.assertEquals(2, fileCache.asMap().size()); + Assertions.assertEquals(1, partitionCache.asMap().size()); + Assertions.assertEquals(1, partitionValuesCache.asMap().size()); + + putCache(fileCache, partitionCache, partitionValuesCache, dbName, tbName2); + Assertions.assertEquals(4, fileCache.asMap().size()); + Assertions.assertEquals(2, partitionCache.asMap().size()); + Assertions.assertEquals(2, partitionValuesCache.asMap().size()); + + hiveMetaStoreCache.invalidateTableCache(dbName, tbName2); + Assertions.assertEquals(2, fileCache.asMap().size()); + Assertions.assertEquals(1, partitionCache.asMap().size()); + Assertions.assertEquals(1, partitionValuesCache.asMap().size()); + + hiveMetaStoreCache.invalidateTableCache(dbName, tbName); + Assertions.assertEquals(0, fileCache.asMap().size()); + Assertions.assertEquals(0, partitionCache.asMap().size()); + Assertions.assertEquals(0, partitionValuesCache.asMap().size()); + } + + private void putCache( + LoadingCache fileCache, + LoadingCache partitionCache, + LoadingCache partitionValuesCache, + String dbName, String tbName) { + HiveMetaStoreCache.FileCacheKey fileCacheKey1 = new HiveMetaStoreCache.FileCacheKey(dbName, tbName, tbName, "", new ArrayList<>(), null); + HiveMetaStoreCache.FileCacheKey fileCacheKey2 = HiveMetaStoreCache.FileCacheKey.createDummyCacheKey(dbName, tbName, tbName, "", null); + fileCache.put(fileCacheKey1, new HiveMetaStoreCache.FileCacheValue()); + fileCache.put(fileCacheKey2, new HiveMetaStoreCache.FileCacheValue()); + + HiveMetaStoreCache.PartitionCacheKey partitionCacheKey = new HiveMetaStoreCache.PartitionCacheKey( + dbName, + tbName, + new ArrayList<>() + ); + partitionCache.put(partitionCacheKey, new HivePartition(dbName, tbName, false, "", "", new ArrayList<>(), new HashMap<>())); + + HiveMetaStoreCache.PartitionValueCacheKey partitionValueCacheKey = new HiveMetaStoreCache.PartitionValueCacheKey(dbName, tbName, new ArrayList<>()); + partitionValuesCache.put(partitionValueCacheKey, new HiveMetaStoreCache.HivePartitionValues()); + + } +}