From 1f39eb794049a9e2478b57c58c014a82c079947e Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Wed, 18 Sep 2024 14:37:31 +0800 Subject: [PATCH] [bugfix](hive)Fix cache inconsistency issue (#40729) ## Proposed changes 1. Concurrency issues: a. One thread is performing a refresh catalog action. b. One thread is performing an insert table action, and after completion, it executes a refresh table action. The `partitionCache` will be refreshed only if the corresponding table exists in the `partitionValuesCache`. However, the `partitionValuesCache` may have been refreshed by the `refresh catalog`, resulting in the inability to find the corresponding table through the `partitionValuesCache`, resulting in the `partitionCache` not being refreshed. Similarly, the `fileCacheRef` may not be refreshed either. Therefore, directly search for all keys to match to prevent them from being refreshed. 2. No need to perform refreshAfterWriteSec operation on partitionCache. 3. Increase the thread pool size. --- .../datasource/ExternalMetaCacheMgr.java | 2 +- .../datasource/hive/HiveMetaStoreCache.java | 74 ++++++++------- .../hive/HiveMetaStoreCacheTest.java | 92 +++++++++++++++++++ 3 files changed, 132 insertions(+), 36 deletions(-) create mode 100644 fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveMetaStoreCacheTest.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java index 8759f9f5ef2f04..cc40ad292ce182 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java @@ -101,7 +101,7 @@ public ExternalMetaCacheMgr() { commonRefreshExecutor = ThreadPoolManager.newDaemonFixedThreadPool( Config.max_external_cache_loader_thread_pool_size, - Config.max_external_cache_loader_thread_pool_size * 1000, + Config.max_external_cache_loader_thread_pool_size * 10000, "CommonRefreshExecutor", 10, true); // The queue size should be large enough, diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index 4bb87ad5375402..bae5978cffb682 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -149,7 +149,7 @@ private void init() { CacheFactory partitionCacheFactory = new CacheFactory( OptionalLong.of(28800L), - OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60L), + OptionalLong.empty(), Config.max_hive_partition_cache_num, true, null); @@ -481,7 +481,8 @@ public List getFilesByPartitions(List partitions, List keys = partitions.stream().map(p -> p.isDummyPartition() ? FileCacheKey.createDummyCacheKey( p.getDbName(), p.getTblName(), p.getPath(), p.getInputFormat(), bindBrokerName) - : new FileCacheKey(p.getPath(), p.getInputFormat(), p.getPartitionValues(), bindBrokerName)) + : new FileCacheKey(p.getDbName(), p.getTblName(), p.getPath(), + p.getInputFormat(), p.getPartitionValues(), bindBrokerName)) .collect(Collectors.toList()); List fileLists; @@ -553,38 +554,19 @@ private List getAllPartitions(String dbName, String name, List
  • values : partitionValues.partitionValuesMap.values()) { - PartitionCacheKey partKey = new PartitionCacheKey(dbName, tblName, values); - HivePartition partition = partitionCache.getIfPresent(partKey); - if (partition != null) { - fileCacheRef.get().invalidate(new FileCacheKey(partition.getPath(), - null, partition.getPartitionValues(), null)); - partitionCache.invalidate(partKey); - } + partitionValuesCache.invalidate(new PartitionValueCacheKey(dbName, tblName, null)); + partitionCache.asMap().keySet().forEach(k -> { + if (k.isSameTable(dbName, tblName)) { + partitionCache.invalidate(k); } - partitionValuesCache.invalidate(key); - if (LOG.isDebugEnabled()) { - LOG.debug("invalid table cache for {}.{} in catalog {}, cache num: {}, cost: {} ms", - dbName, tblName, catalog.getName(), partitionValues.partitionValuesMap.size(), - (System.currentTimeMillis() - start)); + }); + long id = Util.genIdByName(dbName, tblName); + LoadingCache fileCache = fileCacheRef.get(); + fileCache.asMap().keySet().forEach(k -> { + if (k.isSameTable(id)) { + fileCache.invalidate(k); } - } else { - /** - * A file cache entry can be created reference to - * {@link org.apache.doris.planner.external.HiveSplitter#getSplits}, - * so we need to invalidate it if this is a non-partitioned table. - * We use {@link org.apache.doris.datasource.hive.HiveMetaStoreCache.FileCacheKey#createDummyCacheKey} - * to avoid invocation by Hms Client, because this method may be invoked when salve FE replay journal logs, - * and FE will exit if some network problems occur. - * */ - FileCacheKey fileCacheKey = FileCacheKey.createDummyCacheKey( - dbName, tblName, null, null, null); - fileCacheRef.get().invalidate(fileCacheKey); - } + }); } public void invalidatePartitionCache(String dbName, String tblName, String partitionName) { @@ -596,7 +578,7 @@ public void invalidatePartitionCache(String dbName, String tblName, String parti PartitionCacheKey partKey = new PartitionCacheKey(dbName, tblName, values); HivePartition partition = partitionCache.getIfPresent(partKey); if (partition != null) { - fileCacheRef.get().invalidate(new FileCacheKey(partition.getPath(), + fileCacheRef.get().invalidate(new FileCacheKey(dbName, tblName, partition.getPath(), null, partition.getPartitionValues(), null)); partitionCache.invalidate(partKey); } @@ -740,10 +722,21 @@ public void putPartitionValuesCacheForTest(PartitionValueCacheKey key, HiveParti * get fileCache ref * @return */ + @VisibleForTesting public AtomicReference> getFileCacheRef() { return fileCacheRef; } + @VisibleForTesting + public LoadingCache getPartitionValuesCache() { + return partitionValuesCache; + } + + @VisibleForTesting + public LoadingCache getPartitionCache() { + return partitionCache; + } + public List getFilesByTransaction(List partitions, ValidWriteIdList validWriteIds, boolean isFullAcid, long tableId, String bindBrokerName) { List fileCacheValues = Lists.newArrayList(); @@ -925,6 +918,10 @@ public boolean equals(Object obj) { && Objects.equals(values, ((PartitionCacheKey) obj).values); } + boolean isSameTable(String dbName, String tblName) { + return this.dbName.equals(dbName) && this.tblName.equals(tblName); + } + @Override public int hashCode() { return Objects.hash(dbName, tblName, values); @@ -949,18 +946,21 @@ public static class FileCacheKey { // e.g for file : hdfs://path/to/table/part1=a/part2=b/datafile // partitionValues would be ["part1", "part2"] protected List partitionValues; + private long id; - public FileCacheKey(String location, String inputFormat, List partitionValues, String bindBrokerName) { + public FileCacheKey(String dbName, String tblName, String location, String inputFormat, + List partitionValues, String bindBrokerName) { this.location = location; this.inputFormat = inputFormat; this.partitionValues = partitionValues == null ? Lists.newArrayList() : partitionValues; this.bindBrokerName = bindBrokerName; + this.id = Util.genIdByName(dbName, tblName); } public static FileCacheKey createDummyCacheKey(String dbName, String tblName, String location, String inputFormat, String bindBrokerName) { - FileCacheKey fileCacheKey = new FileCacheKey(location, inputFormat, null, bindBrokerName); + FileCacheKey fileCacheKey = new FileCacheKey(dbName, tblName, location, inputFormat, null, bindBrokerName); fileCacheKey.dummyKey = dbName + "." + tblName; return fileCacheKey; } @@ -980,6 +980,10 @@ public boolean equals(Object obj) { && Objects.equals(partitionValues, ((FileCacheKey) obj).partitionValues); } + boolean isSameTable(long id) { + return this.id == id; + } + @Override public int hashCode() { if (dummyKey != null) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveMetaStoreCacheTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveMetaStoreCacheTest.java new file mode 100644 index 00000000000000..607fc3b65394be --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveMetaStoreCacheTest.java @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hive; + +import org.apache.doris.common.ThreadPoolManager; + +import com.github.benmanes.caffeine.cache.LoadingCache; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.concurrent.ThreadPoolExecutor; + +public class HiveMetaStoreCacheTest { + + @Test + public void testInvalidateTableCache() { + ThreadPoolExecutor executor = ThreadPoolManager.newDaemonFixedThreadPool( + 1, 1, "refresh", 1, false); + ThreadPoolExecutor listExecutor = ThreadPoolManager.newDaemonFixedThreadPool( + 1, 1, "file", 1, false); + + HiveMetaStoreCache hiveMetaStoreCache = new HiveMetaStoreCache( + new HMSExternalCatalog(1L, "catalog", null, new HashMap<>(), null), executor, listExecutor); + + LoadingCache fileCache = hiveMetaStoreCache.getFileCacheRef().get(); + LoadingCache partitionCache = hiveMetaStoreCache.getPartitionCache(); + LoadingCache partitionValuesCache = hiveMetaStoreCache.getPartitionValuesCache(); + + String dbName = "db"; + String tbName = "tb"; + String tbName2 = "tb2"; + + putCache(fileCache, partitionCache, partitionValuesCache, dbName, tbName); + Assertions.assertEquals(2, fileCache.asMap().size()); + Assertions.assertEquals(1, partitionCache.asMap().size()); + Assertions.assertEquals(1, partitionValuesCache.asMap().size()); + + putCache(fileCache, partitionCache, partitionValuesCache, dbName, tbName2); + Assertions.assertEquals(4, fileCache.asMap().size()); + Assertions.assertEquals(2, partitionCache.asMap().size()); + Assertions.assertEquals(2, partitionValuesCache.asMap().size()); + + hiveMetaStoreCache.invalidateTableCache(dbName, tbName2); + Assertions.assertEquals(2, fileCache.asMap().size()); + Assertions.assertEquals(1, partitionCache.asMap().size()); + Assertions.assertEquals(1, partitionValuesCache.asMap().size()); + + hiveMetaStoreCache.invalidateTableCache(dbName, tbName); + Assertions.assertEquals(0, fileCache.asMap().size()); + Assertions.assertEquals(0, partitionCache.asMap().size()); + Assertions.assertEquals(0, partitionValuesCache.asMap().size()); + } + + private void putCache( + LoadingCache fileCache, + LoadingCache partitionCache, + LoadingCache partitionValuesCache, + String dbName, String tbName) { + HiveMetaStoreCache.FileCacheKey fileCacheKey1 = new HiveMetaStoreCache.FileCacheKey(dbName, tbName, tbName, "", new ArrayList<>(), null); + HiveMetaStoreCache.FileCacheKey fileCacheKey2 = HiveMetaStoreCache.FileCacheKey.createDummyCacheKey(dbName, tbName, tbName, "", null); + fileCache.put(fileCacheKey1, new HiveMetaStoreCache.FileCacheValue()); + fileCache.put(fileCacheKey2, new HiveMetaStoreCache.FileCacheValue()); + + HiveMetaStoreCache.PartitionCacheKey partitionCacheKey = new HiveMetaStoreCache.PartitionCacheKey( + dbName, + tbName, + new ArrayList<>() + ); + partitionCache.put(partitionCacheKey, new HivePartition(dbName, tbName, false, "", "", new ArrayList<>(), new HashMap<>())); + + HiveMetaStoreCache.PartitionValueCacheKey partitionValueCacheKey = new HiveMetaStoreCache.PartitionValueCacheKey(dbName, tbName, new ArrayList<>()); + partitionValuesCache.put(partitionValueCacheKey, new HiveMetaStoreCache.HivePartitionValues()); + + } +}