diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml index 83deec7650c750..6fd4846d4640e3 100644 --- a/fe/fe-core/pom.xml +++ b/fe/fe-core/pom.xml @@ -157,6 +157,11 @@ under the License. com.google.guava guava + + com.google.guava + guava-testlib + test + com.fasterxml.jackson.core diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/CacheFactory.java b/fe/fe-core/src/main/java/org/apache/doris/common/CacheFactory.java new file mode 100644 index 00000000000000..fbb004e5c99b02 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/CacheFactory.java @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common; + +import com.github.benmanes.caffeine.cache.AsyncCacheLoader; +import com.github.benmanes.caffeine.cache.AsyncLoadingCache; +import com.github.benmanes.caffeine.cache.CacheLoader; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.Ticker; +import org.jetbrains.annotations.NotNull; + +import java.time.Duration; +import java.util.OptionalLong; +import java.util.concurrent.ExecutorService; + +/** + * Factory to create Caffeine cache. + *

+ * This class is used to create Caffeine cache with specified parameters. + * It is used to create both sync and async cache. + * The cache is created with the following parameters: + * - expireAfterWriteSec: The duration after which the cache entries will expire. + * - refreshAfterWriteSec: The duration after which the cache entries will be refreshed. + * - maxSize: The maximum size of the cache. + * - enableStats: Whether to enable stats for the cache. + * - ticker: The ticker to use for the cache. + * The cache can be created with the above parameters using the buildCache and buildAsyncCache methods. + *

+ */ +public class CacheFactory { + + private OptionalLong expireAfterWriteSec; + private OptionalLong refreshAfterWriteSec; + private long maxSize; + private boolean enableStats; + // Ticker is used to provide a time source for the cache. + // Only used for test, to provide a fake time source. + // If not provided, the system time is used. + private Ticker ticker; + + public CacheFactory( + OptionalLong expireAfterWriteSec, + OptionalLong refreshAfterWriteSec, + long maxSize, + boolean enableStats, + Ticker ticker) { + this.expireAfterWriteSec = expireAfterWriteSec; + this.refreshAfterWriteSec = refreshAfterWriteSec; + this.maxSize = maxSize; + this.enableStats = enableStats; + this.ticker = ticker; + } + + // Build a loading cache, without executor, it will use fork-join pool for refresh + public LoadingCache buildCache(CacheLoader cacheLoader) { + Caffeine builder = buildWithParams(); + return builder.build(cacheLoader); + } + + // Build a loading cache, with executor, it will use given executor for refresh + public LoadingCache buildCache(CacheLoader cacheLoader, ExecutorService executor) { + Caffeine builder = buildWithParams(); + builder.executor(executor); + return builder.build(cacheLoader); + } + + // Build an async loading cache + public AsyncLoadingCache buildAsyncCache(AsyncCacheLoader cacheLoader, + ExecutorService executor) { + Caffeine builder = buildWithParams(); + builder.executor(executor); + return builder.buildAsync(cacheLoader); + } + + @NotNull + private Caffeine buildWithParams() { + Caffeine builder = Caffeine.newBuilder(); + builder.maximumSize(maxSize); + + if (expireAfterWriteSec.isPresent()) { + builder.expireAfterWrite(Duration.ofSeconds(expireAfterWriteSec.getAsLong())); + } + if (refreshAfterWriteSec.isPresent()) { + builder.refreshAfterWrite(Duration.ofSeconds(refreshAfterWriteSec.getAsLong())); + } + + if (enableStats) { + builder.recordStats(); + } + + if (ticker != null) { + builder.ticker(ticker); + } + return builder; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/CacheBulkLoader.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/CacheBulkLoader.java index 441f22d8b68ace..eee08b872ec3fc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/CacheBulkLoader.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/CacheBulkLoader.java @@ -19,7 +19,7 @@ import org.apache.doris.common.Pair; -import com.google.common.cache.CacheLoader; +import com.github.benmanes.caffeine.cache.CacheLoader; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.collect.Streams; @@ -31,7 +31,7 @@ import java.util.concurrent.Future; import java.util.stream.Collectors; -public abstract class CacheBulkLoader extends CacheLoader { +public abstract class CacheBulkLoader implements CacheLoader { protected abstract ExecutorService getExecutor(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index 45d99e02223757..6d5ae985fefb25 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -104,7 +104,7 @@ public abstract class ExternalCatalog @SerializedName(value = "catalogProperty") protected CatalogProperty catalogProperty; @SerializedName(value = "initialized") - private boolean initialized = false; + protected boolean initialized = false; @SerializedName(value = "idToDb") protected Map> idToDb = Maps.newConcurrentMap(); @SerializedName(value = "lastUpdateTime") diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java index 87c4ff65e988b5..6198f1bab9a911 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java @@ -50,13 +50,35 @@ public class ExternalMetaCacheMgr { private static final Logger LOG = LogManager.getLogger(ExternalMetaCacheMgr.class); + /** + * Executors for loading caches + * 1. rowCountRefreshExecutor + * For row count cache. + * Row count cache is an async loading cache, and we can ignore the result + * if cache missing or thread pool is full. + * So use a separate executor for this cache. + *

+ * 2. commonRefreshExecutor + * For other caches. Other caches are sync loading cache. + * But commonRefreshExecutor will be used for async refresh. + * That is, if cache entry is missing, the cache value will be loaded in caller thread, sychronously. + * if cache entry need refresh, it will be reloaded in commonRefreshExecutor. + *

+ * 3. fileListingExecutor + * File listing is a heavy operation, so use a separate executor for it. + * For fileCache, the refresh operation will still use commonRefreshExecutor to trigger refresh. + * And fileListingExecutor will be used to list file. + */ + private ExecutorService rowCountRefreshExecutor; + private ExecutorService commonRefreshExecutor; + private ExecutorService fileListingExecutor; + // catalog id -> HiveMetaStoreCache private final Map cacheMap = Maps.newConcurrentMap(); // catalog id -> table schema cache private Map schemaCacheMap = Maps.newHashMap(); // hudi partition manager private final HudiPartitionMgr hudiPartitionMgr; - private ExecutorService executor; // all catalogs could share the same fsCache. private FileSystemCache fsCache; // all external table row count cache. @@ -65,24 +87,42 @@ public class ExternalMetaCacheMgr { private final MaxComputeMetadataCacheMgr maxComputeMetadataCacheMgr; public ExternalMetaCacheMgr() { - executor = ThreadPoolManager.newDaemonFixedThreadPool( + rowCountRefreshExecutor = ThreadPoolManager.newDaemonFixedThreadPool( + Config.max_external_cache_loader_thread_pool_size, + Config.max_external_cache_loader_thread_pool_size, + "RowCountRefreshExecutor", 0, true); + + commonRefreshExecutor = ThreadPoolManager.newDaemonFixedThreadPool( Config.max_external_cache_loader_thread_pool_size, Config.max_external_cache_loader_thread_pool_size * 1000, - "ExternalMetaCacheMgr", 120, true); - hudiPartitionMgr = HudiPartitionMgr.get(executor); - fsCache = new FileSystemCache(executor); - rowCountCache = new ExternalRowCountCache(executor, - Config.external_cache_expire_time_minutes_after_access * 60, null); - icebergMetadataCacheMgr = new IcebergMetadataCacheMgr(); + "CommonRefreshExecutor", 10, true); + + // The queue size should be large enough, + // because there may be thousands of partitions being queried at the same time. + fileListingExecutor = ThreadPoolManager.newDaemonFixedThreadPool( + Config.max_external_cache_loader_thread_pool_size, + Config.max_external_cache_loader_thread_pool_size * 1000, + "FileListingExecutor", 10, true); + + fsCache = new FileSystemCache(); + rowCountCache = new ExternalRowCountCache(rowCountRefreshExecutor); + + hudiPartitionMgr = new HudiPartitionMgr(commonRefreshExecutor); + icebergMetadataCacheMgr = new IcebergMetadataCacheMgr(commonRefreshExecutor); maxComputeMetadataCacheMgr = new MaxComputeMetadataCacheMgr(); } + public ExecutorService getFileListingExecutor() { + return fileListingExecutor; + } + public HiveMetaStoreCache getMetaStoreCache(HMSExternalCatalog catalog) { HiveMetaStoreCache cache = cacheMap.get(catalog.getId()); if (cache == null) { synchronized (cacheMap) { if (!cacheMap.containsKey(catalog.getId())) { - cacheMap.put(catalog.getId(), new HiveMetaStoreCache(catalog, executor)); + cacheMap.put(catalog.getId(), + new HiveMetaStoreCache(catalog, commonRefreshExecutor, fileListingExecutor)); } cache = cacheMap.get(catalog.getId()); } @@ -95,7 +135,7 @@ public ExternalSchemaCache getSchemaCache(ExternalCatalog catalog) { if (cache == null) { synchronized (schemaCacheMap) { if (!schemaCacheMap.containsKey(catalog.getId())) { - schemaCacheMap.put(catalog.getId(), new ExternalSchemaCache(catalog)); + schemaCacheMap.put(catalog.getId(), new ExternalSchemaCache(catalog, commonRefreshExecutor)); } cache = schemaCacheMap.get(catalog.getId()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java index 632cde1d5a721e..4f88d534f9261f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java @@ -18,18 +18,18 @@ package org.apache.doris.datasource; import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.CacheFactory; import org.apache.doris.common.Config; import org.apache.doris.statistics.BasicAsyncCacheLoader; import org.apache.doris.statistics.util.StatisticsUtil; import com.github.benmanes.caffeine.cache.AsyncLoadingCache; -import com.github.benmanes.caffeine.cache.Caffeine; import lombok.Getter; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.time.Duration; import java.util.Optional; +import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -38,16 +38,16 @@ public class ExternalRowCountCache { private static final Logger LOG = LogManager.getLogger(ExternalRowCountCache.class); private final AsyncLoadingCache> rowCountCache; - public ExternalRowCountCache(ExecutorService executor, long refreshAfterWriteSeconds, - BasicAsyncCacheLoader> loader) { + public ExternalRowCountCache(ExecutorService executor) { // 1. set expireAfterWrite to 1 day, avoid too many entries // 2. set refreshAfterWrite to 10min(default), so that the cache will be refreshed after 10min - rowCountCache = Caffeine.newBuilder() - .maximumSize(Config.max_external_table_row_count_cache_num) - .expireAfterAccess(Duration.ofDays(1)) - .refreshAfterWrite(Duration.ofSeconds(refreshAfterWriteSeconds)) - .executor(executor) - .buildAsync(loader == null ? new RowCountCacheLoader() : loader); + CacheFactory rowCountCacheFactory = new CacheFactory( + OptionalLong.of(86400L), + OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60), + Config.max_external_table_row_count_cache_num, + false, + null); + rowCountCache = rowCountCacheFactory.buildAsyncCache(new RowCountCacheLoader(), executor); } @Getter diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java index d8f2edbbd1c1b6..d39ebf7adc319f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java @@ -18,16 +18,14 @@ package org.apache.doris.datasource; import org.apache.doris.catalog.Column; +import org.apache.doris.common.CacheFactory; import org.apache.doris.common.Config; -import org.apache.doris.common.util.Util; import org.apache.doris.metric.GaugeMetric; import org.apache.doris.metric.Metric; import org.apache.doris.metric.MetricLabel; import org.apache.doris.metric.MetricRepo; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.LoadingCache; import com.google.common.collect.ImmutableList; import lombok.Data; import org.apache.logging.log4j.LogManager; @@ -35,9 +33,9 @@ import java.util.List; import java.util.Objects; +import java.util.OptionalLong; import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutorService; // The schema cache for external table public class ExternalSchemaCache { @@ -46,21 +44,20 @@ public class ExternalSchemaCache { private LoadingCache> schemaCache; - public ExternalSchemaCache(ExternalCatalog catalog) { + public ExternalSchemaCache(ExternalCatalog catalog, ExecutorService executor) { this.catalog = catalog; - init(); + init(executor); initMetrics(); } - private void init() { - schemaCache = CacheBuilder.newBuilder().maximumSize(Config.max_external_schema_cache_num) - .expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES) - .build(new CacheLoader>() { - @Override - public ImmutableList load(SchemaCacheKey key) { - return loadSchema(key); - } - }); + private void init(ExecutorService executor) { + CacheFactory schemaCacheeFactory = new CacheFactory( + OptionalLong.of(86400L), + OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60), + Config.max_external_schema_cache_num, + false, + null); + schemaCache = schemaCacheeFactory.buildCache(key -> loadSchema(key), executor); } private void initMetrics() { @@ -69,7 +66,7 @@ private void initMetrics() { Metric.MetricUnit.NOUNIT, "external schema cache number") { @Override public Long getValue() { - return schemaCache.size(); + return schemaCache.estimatedSize(); } }; schemaCacheGauge.addLabel(new MetricLabel("catalog", catalog.getName())); @@ -86,12 +83,7 @@ private ImmutableList loadSchema(SchemaCacheKey key) { public List getSchema(String dbName, String tblName) { SchemaCacheKey key = new SchemaCacheKey(dbName, tblName); - try { - return schemaCache.get(key); - } catch (ExecutionException e) { - throw new CacheException("failed to get schema for %s in catalog %s. err: %s", - e, key, catalog.getName(), Util.getRootCauseMessage(e)); - } + return schemaCache.get(key); } public void addSchemaForTest(String dbName, String tblName, ImmutableList schema) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index a788d9e57bca5a..05bba50ecb5935 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -27,6 +27,7 @@ import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.hudi.HudiUtils; import org.apache.doris.datasource.hudi.source.COWIncrementalRelation; @@ -46,6 +47,7 @@ import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.expressions.literal.StringLiteral; +import org.apache.doris.qe.GlobalVariable; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.BaseAnalysisTask; import org.apache.doris.statistics.ColumnStatistic; @@ -82,6 +84,8 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.time.LocalDate; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -600,7 +604,7 @@ public long fetchRowCount() { // Only hive table supports estimate row count by listing file. if (rowCount == -1 && dlaType.equals(DLAType.HIVE)) { LOG.debug("Will estimate row count from file list."); - rowCount = StatisticsUtil.getRowCountFromFileList(this); + rowCount = getRowCountFromFileList(); } return rowCount; } @@ -818,9 +822,8 @@ public void gsonPostProcess() throws IOException { @Override public List getChunkSizes() { - HiveMetaStoreCache.HivePartitionValues partitionValues = StatisticsUtil.getPartitionValuesForTable(this); - List filesByPartitions - = StatisticsUtil.getFilesForPartitions(this, partitionValues, 0); + HiveMetaStoreCache.HivePartitionValues partitionValues = getAllPartitionValues(); + List filesByPartitions = getFilesForPartitions(partitionValues, 0); List result = Lists.newArrayList(); for (HiveMetaStoreCache.FileCacheValue files : filesByPartitions) { for (HiveMetaStoreCache.HiveFileStatus file : files.getFiles()) { @@ -957,4 +960,108 @@ public boolean needAutoRefresh() { public boolean isPartitionColumnAllowNull() { return true; } + + /** + * Estimate hive table row count : totalFileSize/estimatedRowSize + */ + private long getRowCountFromFileList() { + if (!GlobalVariable.enable_get_row_count_from_file_list) { + return -1; + } + if (isView()) { + return 0; + } + HiveMetaStoreCache.HivePartitionValues partitionValues = getAllPartitionValues(); + + // Get files for all partitions. + int samplePartitionSize = Config.hive_stats_partition_sample_size; + List filesByPartitions = getFilesForPartitions(partitionValues, + samplePartitionSize); + long totalSize = 0; + // Calculate the total file size. + for (HiveMetaStoreCache.FileCacheValue files : filesByPartitions) { + for (HiveMetaStoreCache.HiveFileStatus file : files.getFiles()) { + totalSize += file.getLength(); + } + } + // Estimate row count: totalSize/estimatedRowSize + long estimatedRowSize = 0; + List partitionColumns = getPartitionColumns(); + for (Column column : getFullSchema()) { + // Partition column shouldn't count to the row size, because it is not in the data file. + if (partitionColumns != null && partitionColumns.contains(column)) { + continue; + } + estimatedRowSize += column.getDataType().getSlotSize(); + } + if (estimatedRowSize == 0) { + return 0; + } + + int totalPartitionSize = partitionValues == null ? 1 : partitionValues.getIdToPartitionItem().size(); + if (samplePartitionSize < totalPartitionSize) { + totalSize = totalSize * totalPartitionSize / samplePartitionSize; + } + return totalSize / estimatedRowSize; + } + + // Get all partition values from cache. + private HiveMetaStoreCache.HivePartitionValues getAllPartitionValues() { + if (isView()) { + return null; + } + HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() + .getMetaStoreCache((HMSExternalCatalog) catalog); + List partitionColumnTypes = getPartitionColumnTypes(); + HiveMetaStoreCache.HivePartitionValues partitionValues = null; + // Get table partitions from cache. + if (!partitionColumnTypes.isEmpty()) { + // It is ok to get partition values from cache, + // no need to worry that this call will invalid or refresh the cache. + // because it has enough space to keep partition info of all tables in cache. + partitionValues = cache.getPartitionValues(dbName, name, partitionColumnTypes); + } + return partitionValues; + } + + // Get all files related to given partition values + // If sampleSize > 0, randomly choose part of partitions of the whole table. + private List getFilesForPartitions( + HiveMetaStoreCache.HivePartitionValues partitionValues, int sampleSize) { + if (isView()) { + return Lists.newArrayList(); + } + HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() + .getMetaStoreCache((HMSExternalCatalog) catalog); + List hivePartitions = Lists.newArrayList(); + if (partitionValues != null) { + Map idToPartitionItem = partitionValues.getIdToPartitionItem(); + int totalPartitionSize = idToPartitionItem.size(); + Collection partitionItems; + List> partitionValuesList; + // If partition number is too large, randomly choose part of them to estimate the whole table. + if (sampleSize > 0 && sampleSize < totalPartitionSize) { + List items = new ArrayList<>(idToPartitionItem.values()); + Collections.shuffle(items); + partitionItems = items.subList(0, sampleSize); + partitionValuesList = Lists.newArrayListWithCapacity(sampleSize); + } else { + partitionItems = idToPartitionItem.values(); + partitionValuesList = Lists.newArrayListWithCapacity(totalPartitionSize); + } + for (PartitionItem item : partitionItems) { + partitionValuesList.add(((ListPartitionItem) item).getItems().get(0).getPartitionValuesAsStringList()); + } + // get partitions without cache, so that it will not invalid the cache when executing + // non query request such as `show table status` + hivePartitions = cache.getAllPartitionsWithoutCache(dbName, name, partitionValuesList); + } else { + hivePartitions.add(new HivePartition(dbName, name, true, + getRemoteTable().getSd().getInputFormat(), + getRemoteTable().getSd().getLocation(), null, Maps.newHashMap())); + } + // Get files for all partitions. + String bindBrokerName = catalog.bindBrokerName(); + return cache.getFilesByPartitionsWithoutCache(hivePartitions, bindBrokerName); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index 23c9de3be1f531..2690d82db78d66 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -27,6 +27,7 @@ import org.apache.doris.catalog.PartitionKey; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.CacheFactory; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; @@ -50,12 +51,11 @@ import org.apache.doris.planner.ListPartitionPrunerV2; import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId; +import com.github.benmanes.caffeine.cache.CacheLoader; +import com.github.benmanes.caffeine.cache.LoadingCache; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Strings; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -89,11 +89,11 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -111,63 +111,58 @@ public class HiveMetaStoreCache { private final HMSExternalCatalog catalog; private JobConf jobConf; - private final ExecutorService executor; + private final ExecutorService refreshExecutor; + private final ExecutorService fileListingExecutor; // cache from -> private LoadingCache partitionValuesCache; // cache from -> private LoadingCache partitionCache; // the ref of cache from -> + // Other thread may reset this cache, so use AtomicReference to wrap it. private volatile AtomicReference> fileCacheRef = new AtomicReference<>(); - public HiveMetaStoreCache(HMSExternalCatalog catalog, ExecutorService executor) { + public HiveMetaStoreCache(HMSExternalCatalog catalog, + ExecutorService refreshExecutor, ExecutorService fileListingExecutor) { this.catalog = catalog; - this.executor = executor; + this.refreshExecutor = refreshExecutor; + this.fileListingExecutor = fileListingExecutor; init(); initMetrics(); } + /** + * Because the partitionValuesCache|partitionCache|fileCache use the same executor for batch loading, + * we need to be very careful and try to avoid the circular dependency of these tasks + * which will bring out thread deadlock. + **/ private void init() { - /** - * Because the partitionValuesCache|partitionCache|fileCache use the same executor for batch loading, - * we need to be very careful and try to avoid the circular dependency of there tasks - * which will bring out thread deak-locks. - * */ - partitionValuesCache = CacheBuilder.newBuilder().maximumSize(Config.max_hive_table_cache_num) - .expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES) - .build(new CacheBulkLoader() { - @Override - protected ExecutorService getExecutor() { - return HiveMetaStoreCache.this.executor; - } - - @Override - public HivePartitionValues load(PartitionValueCacheKey key) { - return loadPartitionValues(key); - } - - }); - - partitionCache = CacheBuilder.newBuilder().maximumSize(Config.max_hive_partition_cache_num) - .expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES) - .build(new CacheBulkLoader() { - @Override - protected ExecutorService getExecutor() { - return HiveMetaStoreCache.this.executor; - } - - @Override - public HivePartition load(PartitionCacheKey key) { - return loadPartition(key); - } - - @Override - public Map loadAll(Iterable keys) { - return loadPartitions(keys); - } + CacheFactory partitionValuesCacheFactory = new CacheFactory( + OptionalLong.of(86400L), + OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60L), + Config.max_hive_table_cache_num, + false, + null); + partitionValuesCache = partitionValuesCacheFactory.buildCache(key -> loadPartitionValues(key), refreshExecutor); + + CacheFactory partitionCacheFactory = new CacheFactory( + OptionalLong.of(86400L), + OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60L), + Config.max_hive_partition_cache_num, + false, + null); + partitionCache = partitionCacheFactory.buildCache(new CacheLoader() { + @Override + public HivePartition load(PartitionCacheKey key) { + return loadPartition(key); + } - }); + @Override + public Map loadAll(Iterable keys) { + return loadPartitions(keys); + } + }, refreshExecutor); setNewFileCache(); } @@ -183,18 +178,18 @@ public void setNewFileCache() { (catalog.getProperties().get(HMSExternalCatalog.FILE_META_CACHE_TTL_SECOND)), HMSExternalCatalog.FILE_META_CACHE_NO_TTL); - CacheBuilder fileCacheBuilder = CacheBuilder.newBuilder() - .maximumSize(Config.max_external_file_cache_num) - .expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES); - - if (fileMetaCacheTtlSecond >= HMSExternalCatalog.FILE_META_CACHE_TTL_DISABLE_CACHE) { - fileCacheBuilder.expireAfterWrite(fileMetaCacheTtlSecond, TimeUnit.SECONDS); - } + CacheFactory fileCacheFactory = new CacheFactory( + OptionalLong.of(fileMetaCacheTtlSecond >= HMSExternalCatalog.FILE_META_CACHE_TTL_DISABLE_CACHE + ? fileMetaCacheTtlSecond : 86400L), + OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60L), + Config.max_external_file_cache_num, + false, + null); CacheLoader loader = new CacheBulkLoader() { @Override protected ExecutorService getExecutor() { - return HiveMetaStoreCache.this.executor; + return HiveMetaStoreCache.this.fileListingExecutor; } @Override @@ -203,11 +198,11 @@ public FileCacheValue load(FileCacheKey key) { } }; - LoadingCache preFileCache = fileCacheRef.get(); + LoadingCache oldFileCache = fileCacheRef.get(); - fileCacheRef.set(fileCacheBuilder.build(loader)); - if (Objects.nonNull(preFileCache)) { - preFileCache.invalidateAll(); + fileCacheRef.set(fileCacheFactory.buildCache(loader, this.refreshExecutor)); + if (Objects.nonNull(oldFileCache)) { + oldFileCache.invalidateAll(); } } @@ -217,7 +212,7 @@ private void initMetrics() { Metric.MetricUnit.NOUNIT, "hive partition value cache number") { @Override public Long getValue() { - return partitionValuesCache.size(); + return partitionValuesCache.estimatedSize(); } }; valueCacheGauge.addLabel(new MetricLabel("type", "partition_value")); @@ -228,7 +223,7 @@ public Long getValue() { Metric.MetricUnit.NOUNIT, "hive partition cache number") { @Override public Long getValue() { - return partitionCache.size(); + return partitionCache.estimatedSize(); } }; partitionCacheGauge.addLabel(new MetricLabel("type", "partition")); @@ -239,7 +234,7 @@ public Long getValue() { Metric.MetricUnit.NOUNIT, "hive file cache number") { @Override public Long getValue() { - return fileCacheRef.get().size(); + return fileCacheRef.get().estimatedSize(); } }; fileCacheGauge.addLabel(new MetricLabel("type", "file")); @@ -468,11 +463,7 @@ public HivePartitionValues getPartitionValues(String dbName, String tblName, Lis } public HivePartitionValues getPartitionValues(PartitionValueCacheKey key) { - try { - return partitionValuesCache.get(key); - } catch (ExecutionException e) { - throw new CacheException("failed to get partition values for %s in catalog %s", e, key, catalog.getName()); - } + return partitionValuesCache.get(key); } public List getFilesByPartitionsWithCache(List partitions, @@ -499,10 +490,10 @@ private List getFilesByPartitions(List partitions List fileLists; try { if (withCache) { - fileLists = fileCacheRef.get().getAll(keys).values().asList(); + fileLists = fileCacheRef.get().getAll(keys).values().stream().collect(Collectors.toList()); } else { List>> pList = keys.stream() - .map(key -> Pair.of(key, executor.submit(() -> loadFiles(key)))) + .map(key -> Pair.of(key, fileListingExecutor.submit(() -> loadFiles(key)))) .collect(Collectors.toList()); fileLists = Lists.newArrayListWithExpectedSize(keys.size()); @@ -545,15 +536,11 @@ private List getAllPartitions(String dbName, String name, List

  • partitions; - try { - if (withCache) { - partitions = partitionCache.getAll(keys).values().asList(); - } else { - Map map = loadPartitions(keys); - partitions = map.values().stream().collect(Collectors.toList()); - } - } catch (ExecutionException e) { - throw new CacheException("failed to get partition in catalog %s", e, catalog.getName()); + if (withCache) { + partitions = partitionCache.getAll(keys).values().stream().collect(Collectors.toList()); + } else { + Map map = loadPartitions(keys); + partitions = map.values().stream().collect(Collectors.toList()); } if (LOG.isDebugEnabled()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java index c5e74defc1af4c..a4566cd0b7a03c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java @@ -78,7 +78,7 @@ public HiveMetadataOps(HMSExternalCatalog catalog, HMSCachedClient client) { this.fs = new DFSFileSystem(catalog.getProperties()); } - // for test + @VisibleForTesting public HiveMetadataOps(HMSExternalCatalog catalog, HMSCachedClient client, FileSystem fs) { this.catalog = catalog; this.client = client; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java index 90c89dbbda2c76..39e68ea7a168ef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java @@ -17,6 +17,7 @@ package org.apache.doris.datasource.hudi.source; +import org.apache.doris.common.CacheFactory; import org.apache.doris.common.Config; import org.apache.doris.datasource.CacheException; import org.apache.doris.datasource.TablePartitionValues; @@ -24,10 +25,8 @@ import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.hive.HMSExternalTable; +import com.github.benmanes.caffeine.cache.LoadingCache; import com.google.common.base.Preconditions; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -37,8 +36,9 @@ import java.util.Arrays; import java.util.List; +import java.util.OptionalLong; import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; public class HudiCachedPartitionProcessor extends HudiPartitionProcessor { @@ -47,22 +47,16 @@ public class HudiCachedPartitionProcessor extends HudiPartitionProcessor { private final Executor executor; private final LoadingCache partitionCache; - public HudiCachedPartitionProcessor(long catalogId, Executor executor) { + public HudiCachedPartitionProcessor(long catalogId, ExecutorService executor) { this.catalogId = catalogId; this.executor = executor; - this.partitionCache = CacheBuilder.newBuilder().maximumSize(Config.max_hive_table_cache_num) - .expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES) - .build(CacheLoader.asyncReloading( - new CacheLoader() { - @Override - public TablePartitionValues load(TablePartitionKey key) throws Exception { - return new TablePartitionValues(); - } - }, executor)); - } - - public Executor getExecutor() { - return executor; + CacheFactory partitionCacheFactory = new CacheFactory( + OptionalLong.of(86400L), + OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60), + Config.max_hive_table_cache_num, + false, + null); + this.partitionCache = partitionCacheFactory.buildCache(key -> new TablePartitionValues(), executor); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionMgr.java index b1d894e3d7f003..b6bfd0ec499427 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionMgr.java @@ -23,15 +23,13 @@ import com.google.common.collect.Maps; import java.util.Map; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; public class HudiPartitionMgr { - private static volatile HudiPartitionMgr partitionMgr = null; + private final Map partitionProcessors = Maps.newConcurrentMap(); + private final ExecutorService executor; - private static final Map partitionProcessors = Maps.newConcurrentMap(); - private final Executor executor; - - private HudiPartitionMgr(Executor executor) { + public HudiPartitionMgr(ExecutorService executor) { this.executor = executor; } @@ -72,15 +70,4 @@ public void cleanTablePartitions(long catalogId, String dbName, String tblName) processor.cleanTablePartitions(dbName, tblName); } } - - public static HudiPartitionMgr get(Executor executor) { - if (partitionMgr == null) { - synchronized (HudiPartitionMgr.class) { - if (partitionMgr == null) { - partitionMgr = new HudiPartitionMgr(executor); - } - } - } - return partitionMgr; - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java index 58068c575d7365..eb1d77a322dfc2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java @@ -324,8 +324,7 @@ public List getSplits() throws UserException { List partitions = HiveMetaStoreClientHelper.ugiDoAs( HiveMetaStoreClientHelper.getConfiguration(hmsTable), () -> getPrunedPartitions(hudiClient, snapshotTimestamp)); - Executor executor = ((HudiCachedPartitionProcessor) Env.getCurrentEnv() - .getExtMetaCacheMgr().getHudiPartitionProcess(hmsTable.getCatalog())).getExecutor(); + Executor executor = Env.getCurrentEnv().getExtMetaCacheMgr().getFileListingExecutor(); List splits = Collections.synchronizedList(new ArrayList<>()); CountDownLatch countDownLatch = new CountDownLatch(partitions.size()); partitions.forEach(partition -> executor.execute(() -> { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java index 10fa205b095133..adc1e1f74f176b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java @@ -18,6 +18,7 @@ package org.apache.doris.datasource.iceberg; import org.apache.doris.catalog.Env; +import org.apache.doris.common.CacheFactory; import org.apache.doris.common.Config; import org.apache.doris.common.UserException; import org.apache.doris.datasource.CatalogIf; @@ -27,8 +28,7 @@ import org.apache.doris.thrift.TIcebergMetadataParams; import avro.shaded.com.google.common.collect.Lists; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; +import com.github.benmanes.caffeine.cache.LoadingCache; import com.google.common.collect.Iterables; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.HdfsConfiguration; @@ -38,77 +38,85 @@ import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hive.HiveCatalog; +import org.jetbrains.annotations.NotNull; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.TimeUnit; +import java.util.OptionalLong; +import java.util.concurrent.ExecutorService; public class IcebergMetadataCache { - private final Cache> snapshotListCache; - private final Cache tableCache; - - public IcebergMetadataCache() { - this.snapshotListCache = CacheBuilder.newBuilder().maximumSize(Config.max_hive_table_cache_num) - .expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES) - .build(); - - this.tableCache = CacheBuilder.newBuilder().maximumSize(Config.max_hive_table_cache_num) - .expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES) - .build(); + private final LoadingCache> snapshotListCache; + private final LoadingCache tableCache; + + public IcebergMetadataCache(ExecutorService executor) { + CacheFactory snapshotListCacheFactory = new CacheFactory( + OptionalLong.of(86400L), + OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60), + Config.max_hive_table_cache_num, + false, + null); + this.snapshotListCache = snapshotListCacheFactory.buildCache(key -> loadSnapshots(key), executor); + + CacheFactory tableCacheFactory = new CacheFactory( + OptionalLong.of(86400L), + OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60), + Config.max_hive_table_cache_num, + false, + null); + this.tableCache = tableCacheFactory.buildCache(key -> loadTable(key), executor); } public List getSnapshotList(TIcebergMetadataParams params) throws UserException { CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(params.getCatalog()); IcebergMetadataCacheKey key = - IcebergMetadataCacheKey.of(catalog.getId(), params.getDatabase(), params.getTable()); - List ifPresent = snapshotListCache.getIfPresent(key); - if (ifPresent != null) { - return ifPresent; - } + IcebergMetadataCacheKey.of(catalog, params.getDatabase(), params.getTable()); + return snapshotListCache.get(key); + } + + public Table getIcebergTable(CatalogIf catalog, String dbName, String tbName) { + IcebergMetadataCacheKey key = IcebergMetadataCacheKey.of(catalog, dbName, tbName); + return tableCache.get(key); + } - Table icebergTable = getIcebergTable(catalog, params.getDatabase(), params.getTable()); + @NotNull + private List loadSnapshots(IcebergMetadataCacheKey key) { + Table icebergTable = getIcebergTable(key.catalog, key.dbName, key.tableName); List snaps = Lists.newArrayList(); Iterables.addAll(snaps, icebergTable.snapshots()); - snapshotListCache.put(key, snaps); return snaps; } - public Table getIcebergTable(CatalogIf catalog, String dbName, String tbName) { - IcebergMetadataCacheKey key = IcebergMetadataCacheKey.of(catalog.getId(), dbName, tbName); - Table cacheTable = tableCache.getIfPresent(key); - if (cacheTable != null) { - return cacheTable; - } - + @NotNull + private Table loadTable(IcebergMetadataCacheKey key) { Catalog icebergCatalog; - if (catalog instanceof HMSExternalCatalog) { - HMSExternalCatalog ctg = (HMSExternalCatalog) catalog; + if (key.catalog instanceof HMSExternalCatalog) { + HMSExternalCatalog ctg = (HMSExternalCatalog) key.catalog; icebergCatalog = createIcebergHiveCatalog( ctg.getHiveMetastoreUris(), ctg.getCatalogProperty().getHadoopProperties(), ctg.getProperties()); - } else if (catalog instanceof IcebergExternalCatalog) { - icebergCatalog = ((IcebergExternalCatalog) catalog).getCatalog(); + } else if (key.catalog instanceof IcebergExternalCatalog) { + icebergCatalog = ((IcebergExternalCatalog) key.catalog).getCatalog(); } else { throw new RuntimeException("Only support 'hms' and 'iceberg' type for iceberg table"); } - Table icebergTable = HiveMetaStoreClientHelper.ugiDoAs(catalog.getId(), - () -> icebergCatalog.loadTable(TableIdentifier.of(dbName, tbName))); - initIcebergTableFileIO(icebergTable, catalog.getProperties()); - tableCache.put(key, icebergTable); + Table icebergTable = HiveMetaStoreClientHelper.ugiDoAs(key.catalog.getId(), + () -> icebergCatalog.loadTable(TableIdentifier.of(key.dbName, key.tableName))); + initIcebergTableFileIO(icebergTable, key.catalog.getProperties()); return icebergTable; } public void invalidateCatalogCache(long catalogId) { snapshotListCache.asMap().keySet().stream() - .filter(key -> key.catalogId == catalogId) + .filter(key -> key.catalog.getId() == catalogId) .forEach(snapshotListCache::invalidate); tableCache.asMap().entrySet().stream() - .filter(entry -> entry.getKey().catalogId == catalogId) + .filter(entry -> entry.getKey().catalog.getId() == catalogId) .forEach(entry -> { ManifestFiles.dropCache(entry.getValue().io()); tableCache.invalidate(entry.getKey()); @@ -117,13 +125,15 @@ public void invalidateCatalogCache(long catalogId) { public void invalidateTableCache(long catalogId, String dbName, String tblName) { snapshotListCache.asMap().keySet().stream() - .filter(key -> key.catalogId == catalogId && key.dbName.equals(dbName) && key.tableName.equals(tblName)) + .filter(key -> key.catalog.getId() == catalogId && key.dbName.equals(dbName) && key.tableName.equals( + tblName)) .forEach(snapshotListCache::invalidate); tableCache.asMap().entrySet().stream() .filter(entry -> { IcebergMetadataCacheKey key = entry.getKey(); - return key.catalogId == catalogId && key.dbName.equals(dbName) && key.tableName.equals(tblName); + return key.catalog.getId() == catalogId && key.dbName.equals(dbName) && key.tableName.equals( + tblName); }) .forEach(entry -> { ManifestFiles.dropCache(entry.getValue().io()); @@ -133,13 +143,13 @@ public void invalidateTableCache(long catalogId, String dbName, String tblName) public void invalidateDbCache(long catalogId, String dbName) { snapshotListCache.asMap().keySet().stream() - .filter(key -> key.catalogId == catalogId && key.dbName.equals(dbName)) + .filter(key -> key.catalog.getId() == catalogId && key.dbName.equals(dbName)) .forEach(snapshotListCache::invalidate); tableCache.asMap().entrySet().stream() .filter(entry -> { IcebergMetadataCacheKey key = entry.getKey(); - return key.catalogId == catalogId && key.dbName.equals(dbName); + return key.catalog.getId() == catalogId && key.dbName.equals(dbName); }) .forEach(entry -> { ManifestFiles.dropCache(entry.getValue().io()); @@ -184,22 +194,18 @@ private static void initIcebergTableFileIO(Table table, Map prop } static class IcebergMetadataCacheKey { - long catalogId; + CatalogIf catalog; String dbName; String tableName; - public IcebergMetadataCacheKey(long catalogId, String dbName, String tableName) { - this.catalogId = catalogId; + public IcebergMetadataCacheKey(CatalogIf catalog, String dbName, String tableName) { + this.catalog = catalog; this.dbName = dbName; this.tableName = tableName; } - static IcebergMetadataCacheKey of(long catalogId, String dbName, String tableName) { - return new IcebergMetadataCacheKey( - catalogId, - dbName, - tableName - ); + static IcebergMetadataCacheKey of(CatalogIf catalog, String dbName, String tableName) { + return new IcebergMetadataCacheKey(catalog, dbName, tableName); } @Override @@ -211,14 +217,14 @@ public boolean equals(Object o) { return false; } IcebergMetadataCacheKey that = (IcebergMetadataCacheKey) o; - return catalogId == that.catalogId + return catalog.getId() == that.catalog.getId() && Objects.equals(dbName, that.dbName) && Objects.equals(tableName, that.tableName); } @Override public int hashCode() { - return Objects.hash(catalogId, dbName, tableName); + return Objects.hash(catalog.getId(), dbName, tableName); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCacheMgr.java index 012af41879cc68..69a29cc2885e83 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCacheMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCacheMgr.java @@ -17,11 +17,14 @@ package org.apache.doris.datasource.iceberg; +import java.util.concurrent.ExecutorService; + public class IcebergMetadataCacheMgr { - private final IcebergMetadataCache icebergMetadataCache = new IcebergMetadataCache(); + private IcebergMetadataCache icebergMetadataCache; - public IcebergMetadataCacheMgr() { + public IcebergMetadataCacheMgr(ExecutorService executor) { + this.icebergMetadataCache = new IcebergMetadataCache(executor); } public IcebergMetadataCache getIcebergMetadataCache() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java index 474b4e6cb8e7f8..363b7ce689dab1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java @@ -82,9 +82,19 @@ public long getTotalRows() throws TunnelException { MaxComputeMetadataCache metadataCache = Env.getCurrentEnv().getExtMetaCacheMgr() .getMaxComputeMetadataCache(catalog.getId()); MaxComputeExternalCatalog mcCatalog = ((MaxComputeExternalCatalog) catalog); - return metadataCache.getCachedRowCount(dbName, name, null, () -> mcCatalog.getTableTunnel() - .getDownloadSession(dbName, name, null) - .getRecordCount()); + return metadataCache.getCachedRowCount(dbName, name, null, key -> { + try { + return loadRowCount(mcCatalog, key); + } catch (TunnelException e) { + throw new RuntimeException(e); + } + }); + } + + private long loadRowCount(MaxComputeExternalCatalog catalog, MaxComputeCacheKey key) throws TunnelException { + return catalog.getTableTunnel() + .getDownloadSession(key.getDbName(), key.getTblName(), null) + .getRecordCount(); } @Override @@ -106,21 +116,23 @@ public TablePartitionValues getPartitionValues() { MaxComputeMetadataCache metadataCache = Env.getCurrentEnv().getExtMetaCacheMgr() .getMaxComputeMetadataCache(catalog.getId()); return metadataCache.getCachedPartitionValues( - new MaxComputeCacheKey(projectName, tableName), - () -> { - TablePartitionValues partitionValues = new TablePartitionValues(); - partitionValues.addPartitions(partitionSpecs, - partitionSpecs.stream() - .map(p -> parsePartitionValues(new ArrayList<>(getPartitionNames()), p)) - .collect(Collectors.toList()), - partitionTypes); - return partitionValues; - }); + new MaxComputeCacheKey(projectName, tableName), key -> loadPartitionValues(key)); + } + + private TablePartitionValues loadPartitionValues(MaxComputeCacheKey key) { + TablePartitionValues partitionValues = new TablePartitionValues(); + partitionValues.addPartitions(partitionSpecs, + partitionSpecs.stream() + .map(p -> parsePartitionValues(new ArrayList<>(getPartitionNames()), p)) + .collect(Collectors.toList()), + partitionTypes); + return partitionValues; } /** * parse all values from partitionPath to a single list. - * @param partitionColumns partitionColumns can contain the part1,part2,part3... + * + * @param partitionColumns partitionColumns can contain the part1,part2,part3... * @param partitionPath partitionPath format is like the 'part1=123/part2=abc/part3=1bc' * @return all values of partitionPath */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeMetadataCache.java index b26ad8364e34f4..cce570a11746aa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeMetadataCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeMetadataCache.java @@ -20,14 +20,12 @@ import org.apache.doris.common.Config; import org.apache.doris.datasource.TablePartitionValues; -import com.aliyun.odps.tunnel.TunnelException; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.stream.Collectors; public class MaxComputeMetadataCache { @@ -35,32 +33,23 @@ public class MaxComputeMetadataCache { private final Cache tableRowCountCache; public MaxComputeMetadataCache() { - partitionValuesCache = CacheBuilder.newBuilder().maximumSize(Config.max_hive_partition_cache_num) + partitionValuesCache = Caffeine.newBuilder().maximumSize(Config.max_hive_partition_cache_num) .expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES) .build(); - tableRowCountCache = CacheBuilder.newBuilder().maximumSize(10000) + tableRowCountCache = Caffeine.newBuilder().maximumSize(10000) .expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES) .build(); } public Long getCachedRowCount(String dbName, String tblName, String partitionSpec, - Callable loader) throws TunnelException { - try { - MaxComputeCacheKey tablePartitionKey = new MaxComputeCacheKey(dbName, tblName, partitionSpec); - return tableRowCountCache.get(tablePartitionKey, loader); - } catch (ExecutionException e) { - throw new TunnelException(e.getMessage(), e); - } + Function loader) { + MaxComputeCacheKey tablePartitionKey = new MaxComputeCacheKey(dbName, tblName, partitionSpec); + return tableRowCountCache.get(tablePartitionKey, loader); } public TablePartitionValues getCachedPartitionValues(MaxComputeCacheKey tablePartitionKey, - Callable loader) { - try { - return partitionValuesCache.get(tablePartitionKey, loader); - } catch (ExecutionException e) { - throw new RuntimeException("Fail to load partition values for table:" - + " '" + tablePartitionKey.getDbName() + "." + tablePartitionKey.getTblName() + "'"); - } + Function loader) { + return partitionValuesCache.get(tablePartitionKey, loader); } public void cleanUp() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeMetadataCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeMetadataCacheMgr.java index 6ee75eae26d747..270c47f0df6f6c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeMetadataCacheMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeMetadataCacheMgr.java @@ -23,7 +23,10 @@ public class MaxComputeMetadataCacheMgr { - private static final Map maxComputeMetadataCaches = Maps.newConcurrentMap(); + private final Map maxComputeMetadataCaches = Maps.newConcurrentMap(); + + public MaxComputeMetadataCacheMgr() { + } public MaxComputeMetadataCache getMaxComputeMetadataCache(long catalogId) { MaxComputeMetadataCache cache = maxComputeMetadataCaches.get(catalogId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java index 7946dd5e8a7cea..149bbe2d378817 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java @@ -17,39 +17,30 @@ package org.apache.doris.fs; +import org.apache.doris.common.CacheFactory; import org.apache.doris.common.Config; import org.apache.doris.common.Pair; -import org.apache.doris.common.util.CacheBulkLoader; -import org.apache.doris.datasource.CacheException; import org.apache.doris.fs.remote.RemoteFileSystem; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.LoadingCache; import org.apache.hadoop.mapred.JobConf; import java.util.Objects; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.OptionalLong; public class FileSystemCache { private LoadingCache fileSystemCache; - public FileSystemCache(ExecutorService executor) { - fileSystemCache = CacheBuilder.newBuilder().maximumSize(Config.max_remote_file_system_cache_num) - .expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES) - .build(new CacheBulkLoader() { - @Override - protected ExecutorService getExecutor() { - return executor; - } - - @Override - public RemoteFileSystem load(FileSystemCacheKey key) { - return loadFileSystem(key); - } - }); + public FileSystemCache() { + // no need to set refreshAfterWrite, because the FileSystem is created once and never changed + CacheFactory fsCacheFactory = new CacheFactory( + OptionalLong.of(86400L), + OptionalLong.empty(), + Config.max_remote_file_system_cache_num, + false, + null); + fileSystemCache = fsCacheFactory.buildCache(key -> loadFileSystem(key)); } private RemoteFileSystem loadFileSystem(FileSystemCacheKey key) { @@ -57,11 +48,7 @@ private RemoteFileSystem loadFileSystem(FileSystemCacheKey key) { } public RemoteFileSystem getRemoteFileSystem(FileSystemCacheKey key) { - try { - return fileSystemCache.get(key); - } catch (ExecutionException e) { - throw new CacheException("failed to get remote filesystem for type[%s]", e, key.type); - } + return fileSystemCache.get(key); } public static class FileSystemCacheKey { diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/ShowAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/ShowAction.java index 0aa5dd54d78b46..8156710aa1ec5b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/ShowAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/ShowAction.java @@ -324,7 +324,7 @@ public long getDataSizeOfDatabase(DatabaseIf db) { return totalSize; } - public Map getDataSizeOfTables(DatabaseIf db, String tableName, boolean singleReplica) { + private Map getDataSizeOfTables(DatabaseIf db, String tableName, boolean singleReplica) { Map oneEntry = Maps.newHashMap(); db.readLock(); try { @@ -348,7 +348,7 @@ public Map getDataSizeOfTables(DatabaseIf db, String tableName, bo return oneEntry; } - public Map getDataSizeOfTable(Table table, boolean singleReplica) { + private Map getDataSizeOfTable(Table table, boolean singleReplica) { Map oneEntry = Maps.newHashMap(); if (table.getType() == TableType.VIEW || table.getType() == TableType.ODBC) { oneEntry.put(table.getName(), 0L); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/GlobalVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/GlobalVariable.java index a8374a7b4406c0..30f91c6f8c1ac4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/GlobalVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/GlobalVariable.java @@ -56,6 +56,8 @@ public final class GlobalVariable { public static final String AUDIT_PLUGIN_MAX_BATCH_INTERVAL_SEC = "audit_plugin_max_batch_interval_sec"; public static final String AUDIT_PLUGIN_MAX_SQL_LENGTH = "audit_plugin_max_sql_length"; + public static final String ENABLE_GET_ROW_COUNT_FROM_FILE_LIST = "enable_get_row_count_from_file_list"; + @VariableMgr.VarAttr(name = VERSION_COMMENT, flag = VariableMgr.READ_ONLY) public static String versionComment = "Doris version " + Version.DORIS_BUILD_VERSION + "-" + Version.DORIS_BUILD_SHORT_HASH; @@ -125,6 +127,16 @@ public final class GlobalVariable { @VariableMgr.VarAttr(name = AUDIT_PLUGIN_MAX_SQL_LENGTH, flag = VariableMgr.GLOBAL) public static int auditPluginMaxSqlLength = 4096; + @VariableMgr.VarAttr(name = ENABLE_GET_ROW_COUNT_FROM_FILE_LIST, flag = VariableMgr.GLOBAL, + description = { + "针对外表,是否允许根据文件列表估算表行数。获取文件列表可能是一个耗时的操作," + + "如果不需要估算表行数或者对性能有影响,可以关闭该功能。", + "For external tables, whether to enable getting row count from file list. " + + "Getting file list may be a time-consuming operation. " + + "If you don't need to estimate the number of rows in the table " + + "or it affects performance, you can disable this feature."}) + public static boolean enable_get_row_count_from_file_list = true; + // Don't allow creating instance. private GlobalVariable() { } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/BasicAsyncCacheLoader.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/BasicAsyncCacheLoader.java index ac5896bb06c93d..59e1ad390d34b8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BasicAsyncCacheLoader.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BasicAsyncCacheLoader.java @@ -39,7 +39,9 @@ public abstract class BasicAsyncCacheLoader implements AsyncCacheLoader filesByPartitions - = getFilesForPartitions(table, partitionValues, samplePartitionSize); - long totalSize = 0; - // Calculate the total file size. - for (HiveMetaStoreCache.FileCacheValue files : filesByPartitions) { - for (HiveMetaStoreCache.HiveFileStatus file : files.getFiles()) { - totalSize += file.getLength(); - } - } - // Estimate row count: totalSize/estimatedRowSize - long estimatedRowSize = 0; - List partitionColumns = table.getPartitionColumns(); - for (Column column : table.getFullSchema()) { - // Partition column shouldn't count to the row size, because it is not in the data file. - if (partitionColumns != null && partitionColumns.contains(column)) { - continue; - } - estimatedRowSize += column.getDataType().getSlotSize(); - } - if (estimatedRowSize == 0) { - return 0; - } - if (samplePartitionSize < totalPartitionSize) { - totalSize = totalSize * totalPartitionSize / samplePartitionSize; - } - return totalSize / estimatedRowSize; - } - - public static HiveMetaStoreCache.HivePartitionValues getPartitionValuesForTable(HMSExternalTable table) { - if (table.isView()) { - return null; - } - HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() - .getMetaStoreCache((HMSExternalCatalog) table.getCatalog()); - List partitionColumnTypes = table.getPartitionColumnTypes(); - HiveMetaStoreCache.HivePartitionValues partitionValues = null; - // Get table partitions from cache. - if (!partitionColumnTypes.isEmpty()) { - // It is ok to get partition values from cache, - // no need to worry that this call will invalid or refresh the cache. - // because it has enough space to keep partition info of all tables in cache. - partitionValues = cache.getPartitionValues(table.getDbName(), table.getName(), partitionColumnTypes); - } - return partitionValues; - } - - public static List getFilesForPartitions( - HMSExternalTable table, HiveMetaStoreCache.HivePartitionValues partitionValues, int sampleSize) { - if (table.isView()) { - return Lists.newArrayList(); - } - HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() - .getMetaStoreCache((HMSExternalCatalog) table.getCatalog()); - List hivePartitions = Lists.newArrayList(); - if (partitionValues != null) { - Map idToPartitionItem = partitionValues.getIdToPartitionItem(); - int totalPartitionSize = idToPartitionItem.size(); - Collection partitionItems; - List> partitionValuesList; - // If partition number is too large, randomly choose part of them to estimate the whole table. - if (sampleSize > 0 && sampleSize < totalPartitionSize) { - List items = new ArrayList<>(idToPartitionItem.values()); - Collections.shuffle(items); - partitionItems = items.subList(0, sampleSize); - partitionValuesList = Lists.newArrayListWithCapacity(sampleSize); - } else { - partitionItems = idToPartitionItem.values(); - partitionValuesList = Lists.newArrayListWithCapacity(totalPartitionSize); - } - for (PartitionItem item : partitionItems) { - partitionValuesList.add(((ListPartitionItem) item).getItems().get(0).getPartitionValuesAsStringList()); - } - // get partitions without cache, so that it will not invalid the cache when executing - // non query request such as `show table status` - hivePartitions = cache.getAllPartitionsWithoutCache(table.getDbName(), table.getName(), - partitionValuesList); - } else { - hivePartitions.add(new HivePartition(table.getDbName(), table.getName(), true, - table.getRemoteTable().getSd().getInputFormat(), - table.getRemoteTable().getSd().getLocation(), null, Maps.newHashMap())); - } - // Get files for all partitions. - String bindBrokerName = table.getCatalog().bindBrokerName(); - return cache.getFilesByPartitionsWithoutCache(hivePartitions, bindBrokerName); - } - /** * Get Iceberg column statistics. * diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/CacheBulkLoaderTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/CacheBulkLoaderTest.java index 0cc54817b346c4..c89eafda408685 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/CacheBulkLoaderTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/CacheBulkLoaderTest.java @@ -19,15 +19,14 @@ import org.apache.doris.common.util.CacheBulkLoader; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; import org.apache.commons.collections.MapUtils; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -41,7 +40,7 @@ public void test() { ThreadPoolExecutor executor = ThreadPoolManager.newDaemonFixedThreadPool( 10, 10, "TestThreadPool", 120, true); - LoadingCache testCache = CacheBuilder.newBuilder().maximumSize(100) + LoadingCache testCache = Caffeine.newBuilder().maximumSize(100) .expireAfterAccess(1, TimeUnit.MINUTES) .build(new CacheBulkLoader() { @Override @@ -63,15 +62,10 @@ public String load(String key) { List testKeys = IntStream.range(1, 101).boxed() .map(i -> String.format("k%d", i)).collect(Collectors.toList()); - try { - Map vMap = testCache.getAll(testKeys); - Assertions.assertTrue(MapUtils.isNotEmpty(vMap) && vMap.size() == testKeys.size()); - for (String key : vMap.keySet()) { - Assertions.assertTrue(key.replace("k", "v").equals(vMap.get(key))); - } - } catch (ExecutionException e) { - e.printStackTrace(); - Assertions.fail(); + Map vMap = testCache.getAll(testKeys); + Assertions.assertTrue(MapUtils.isNotEmpty(vMap) && vMap.size() == testKeys.size()); + for (String key : vMap.keySet()) { + Assertions.assertTrue(key.replace("k", "v").equals(vMap.get(key))); } try { diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/CacheFactoryTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/CacheFactoryTest.java new file mode 100644 index 00000000000000..60b73d6d5c8d4f --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/common/CacheFactoryTest.java @@ -0,0 +1,291 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common; + +import org.apache.doris.statistics.BasicAsyncCacheLoader; + +import com.github.benmanes.caffeine.cache.AsyncLoadingCache; +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.google.common.collect.Lists; +import com.google.common.testing.FakeTicker; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +public class CacheFactoryTest { + + private ExecutorService executor; + + public static class CacheValue { + private final String value; + + public CacheValue(String value) { + this.value = value; + } + + public String getValue() { + return value; + } + + public static CacheValue createValue(String val, AtomicLong counter) { + try { + System.out.println("before create value: " + val + ", Thread: " + Thread.currentThread().getName()); + Thread.sleep(2000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + System.out.println("create value: " + val); + counter.incrementAndGet(); + return new CacheValue(val); + } + } + + public static class CacheValueLoader extends BasicAsyncCacheLoader> { + + private AtomicLong counter; + + public CacheValueLoader(AtomicLong counter) { + this.counter = counter; + } + + @Override + protected Optional doLoad(Integer key) { + return Optional.of(CacheValue.createValue("value" + key, counter)); + } + } + + public static class LoaderRunner implements Runnable { + private final AsyncLoadingCache> cache; + private final int key; + private final AtomicLong counter; + + public LoaderRunner(AsyncLoadingCache> cache, int key, AtomicLong counter) { + this.cache = cache; + this.key = key; + this.counter = counter; + } + + @Override + public void run() { + try { + CompletableFuture> cacheValue = cache.get(key); + System.out.println("key: " + key + ", value: " + cacheValue.get().get().getValue()); + } catch (RejectedExecutionException e) { + counter.incrementAndGet(); + throw e; + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + } + + @BeforeEach + public void setUp() { + executor = ThreadPoolManager.newDaemonFixedThreadPool( + 10, 1, + "testCacheFactory", 0, false); + } + + @Test + public void testLoadingCacheWithoutExpireAfterWrite() throws InterruptedException { + FakeTicker ticker = new FakeTicker(); + AtomicLong counter = new AtomicLong(0); + CacheFactory cacheFactory = new CacheFactory( + OptionalLong.empty(), + OptionalLong.of(10), + 1000, + false, + ticker::read); + LoadingCache loadingCache = cacheFactory.buildCache( + key -> CacheValue.createValue("value" + key, counter), executor); + CacheValue value = loadingCache.get(1); + Assertions.assertEquals("value1", value.getValue()); + Assertions.assertEquals(1, counter.get()); + value = loadingCache.get(1); + Assertions.assertEquals("value1", value.getValue()); + Assertions.assertEquals(1, counter.get()); + // advance 11 seconds to pass the refreshAfterWrite + ticker.advance(11, TimeUnit.SECONDS); + // trigger refresh + value = loadingCache.get(1); + // refresh in background, so still get value1 + Assertions.assertEquals("value1", value.getValue()); + // sleep longer to wait for refresh + Thread.sleep(2500); + value = loadingCache.get(1); + Assertions.assertEquals("value1", value.getValue()); + Assertions.assertEquals(2, counter.get()); + } + + @Test + public void testLoadingCacheWithExpireAfterWrite() throws InterruptedException { + FakeTicker ticker = new FakeTicker(); + AtomicLong counter = new AtomicLong(0); + CacheFactory cacheFactory = new CacheFactory( + OptionalLong.of(60L), + OptionalLong.of(10), + 1000, + false, + ticker::read); + LoadingCache loadingCache = cacheFactory.buildCache( + key -> CacheValue.createValue("value" + key, counter), executor); + CacheValue value = loadingCache.get(1); + Assertions.assertEquals("value1", value.getValue()); + Assertions.assertEquals(1, counter.get()); + value = loadingCache.get(1); + Assertions.assertEquals("value1", value.getValue()); + Assertions.assertEquals(1, counter.get()); + // advance 11 seconds to pass the refreshAfterWrite + ticker.advance(11, TimeUnit.SECONDS); + // trigger refresh + value = loadingCache.get(1); + // refresh in background, so still get value1 + Assertions.assertEquals("value1", value.getValue()); + // sleep longer to wait for refresh + Thread.sleep(2500); + value = loadingCache.get(1); + Assertions.assertEquals("value1", value.getValue()); + // refreshed, so counter +1 + Assertions.assertEquals(2, counter.get()); + // advance 61 seconds to pass the expireAfterWrite + ticker.advance(61, TimeUnit.SECONDS); + value = loadingCache.get(1); + Assertions.assertEquals("value1", value.getValue()); + // expired, so counter +1 + Assertions.assertEquals(3, counter.get()); + } + + @Test + public void testLoadingCacheWithoutRefreshAfterWrite() throws InterruptedException { + FakeTicker ticker = new FakeTicker(); + AtomicLong counter = new AtomicLong(0); + CacheFactory cacheFactory = new CacheFactory( + OptionalLong.of(60L), + OptionalLong.empty(), + 1000, + false, + ticker::read); + LoadingCache loadingCache = cacheFactory.buildCache( + key -> CacheValue.createValue("value" + key, counter), executor); + CacheValue value = loadingCache.get(1); + Assertions.assertEquals("value1", value.getValue()); + Assertions.assertEquals(1, counter.get()); + // advance 30 seconds, key still not expired + ticker.advance(30, TimeUnit.SECONDS); + value = loadingCache.get(1); + Assertions.assertEquals("value1", value.getValue()); + Assertions.assertEquals(1, counter.get()); + // advance 31 seconds to pass the expireAfterWrite + ticker.advance(31, TimeUnit.SECONDS); + value = loadingCache.get(1); + Assertions.assertEquals("value1", value.getValue()); + // expired, so counter +1 + Assertions.assertEquals(2, counter.get()); + } + + @Test + public void testAsyncLoadingCacheWithExpireAfterWrite() throws InterruptedException, ExecutionException { + FakeTicker ticker = new FakeTicker(); + AtomicLong counter = new AtomicLong(0); + CacheFactory cacheFactory = new CacheFactory( + OptionalLong.of(60L), + OptionalLong.of(10), + 1000, + false, + ticker::read); + CacheValueLoader loader = new CacheValueLoader(counter); + AsyncLoadingCache> loadingCache = cacheFactory.buildAsyncCache(loader, executor); + CompletableFuture> futureValue = loadingCache.get(1); + Assertions.assertFalse(futureValue.isDone()); + Assertions.assertEquals("value1", futureValue.get().get().getValue()); + Assertions.assertEquals(1, counter.get()); + futureValue = loadingCache.get(1); + Assertions.assertTrue(futureValue.isDone()); + Assertions.assertEquals("value1", futureValue.get().get().getValue()); + Assertions.assertEquals(1, counter.get()); + // advance 11 seconds to pass the refreshAfterWrite + ticker.advance(11, TimeUnit.SECONDS); + // trigger refresh + futureValue = loadingCache.get(1); + // refresh in background, so still get value1 + Assertions.assertTrue(futureValue.isDone()); + Assertions.assertEquals("value1", futureValue.get().get().getValue()); + // sleep longer to wait for refresh + Thread.sleep(2500); + futureValue = loadingCache.get(1); + Assertions.assertEquals("value1", futureValue.get().get().getValue()); + // refreshed, so counter +1 + Assertions.assertEquals(2, counter.get()); + // advance 61 seconds to pass the expireAfterWrite + ticker.advance(61, TimeUnit.SECONDS); + futureValue = loadingCache.get(1); + Assertions.assertFalse(futureValue.isDone()); + Assertions.assertEquals("value1", futureValue.get().get().getValue()); + // expired, so counter +1 + Assertions.assertEquals(3, counter.get()); + } + + @Test + public void testTooManyGetRequests() { + ExecutorService executor = ThreadPoolManager.newDaemonFixedThreadPool( + 10, 1, + "testCacheFactory", 0, false); + FakeTicker ticker = new FakeTicker(); + AtomicLong counter = new AtomicLong(0); + CacheFactory cacheFactory = new CacheFactory( + OptionalLong.of(60L), + OptionalLong.of(10), + 1000, + false, + ticker::read); + CacheValueLoader loader = new CacheValueLoader(counter); + AsyncLoadingCache> loadingCache = cacheFactory.buildAsyncCache(loader, executor); + + AtomicLong rejectCounter = new AtomicLong(0); + List threads = Lists.newArrayList(); + for (int i = 0; i < 12; i++) { + Thread thread = new Thread(new LoaderRunner(loadingCache, i, rejectCounter)); + threads.add(thread); + thread.start(); + } + + for (Thread thread : threads) { + try { + thread.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + // The thread pool size is 10, add 1 queue size. + // So there will be 11 threads executed, and 1 thread will be rejected. + Assertions.assertTrue(counter.get() < 12); + Assertions.assertTrue(rejectCounter.get() >= 1); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java index 2184e2ab37fda1..5faa9952b9502f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java @@ -50,6 +50,7 @@ import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.hive.HiveMetaStoreCache; import org.apache.doris.datasource.hive.HiveMetaStoreCache.FileCacheKey; +import org.apache.doris.datasource.hive.HiveMetaStoreCache.FileCacheValue; import org.apache.doris.datasource.hive.HiveMetaStoreCache.HivePartitionValues; import org.apache.doris.datasource.hive.HiveMetaStoreCache.PartitionValueCacheKey; import org.apache.doris.mysql.privilege.Auth; @@ -60,8 +61,8 @@ import org.apache.doris.qe.ShowResultSet; import org.apache.doris.utframe.TestWithFeService; +import com.github.benmanes.caffeine.cache.LoadingCache; import com.google.common.base.Preconditions; -import com.google.common.cache.LoadingCache; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Range; @@ -742,7 +743,7 @@ public void testAlterFileCache() throws Exception { HMSExternalCatalog hiveCatalog = (HMSExternalCatalog) mgr.getCatalog(catalogName); HiveMetaStoreCache metaStoreCache = externalMetaCacheMgr.getMetaStoreCache(hiveCatalog); - LoadingCache preFileCache = metaStoreCache.getFileCacheRef().get(); + LoadingCache preFileCache = metaStoreCache.getFileCacheRef().get(); // 1. properties contains `file.meta.cache.ttl-second`, it should not be equal @@ -759,8 +760,6 @@ public void testAlterFileCache() throws Exception { + " (\"type\" = \"hms\", \"hive.metastore.uris\" = \"thrift://172.16.5.9:9083\");"; mgr.alterCatalogProps((AlterCatalogPropertyStmt) parseAndAnalyzeStmt(alterCatalogProp)); Assertions.assertEquals(preFileCache, metaStoreCache.getFileCacheRef().get()); - - } @Test diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalRowCountCacheTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalRowCountCacheTest.java deleted file mode 100644 index e8622f6b59a239..00000000000000 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalRowCountCacheTest.java +++ /dev/null @@ -1,100 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.datasource; - -import org.apache.doris.datasource.ExternalRowCountCache.RowCountKey; -import org.apache.doris.statistics.BasicAsyncCacheLoader; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.util.Optional; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicLong; - -public class ExternalRowCountCacheTest { - private ExternalRowCountCache cache; - private ExecutorService executorService; - - public static class TestLoader extends BasicAsyncCacheLoader> { - - private AtomicLong incr = new AtomicLong(333); - - @Override - protected Optional doLoad(RowCountKey rowCountKey) { - if (rowCountKey.getTableId() == 1) { - return Optional.of(111L); - } else if (rowCountKey.getTableId() == 2) { - return Optional.of(222L); - } else if (rowCountKey.getTableId() == 3) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - System.out.println("load: " + incr.get()); - return Optional.of(incr.incrementAndGet()); - } - return Optional.empty(); - } - } - - @BeforeEach - public void setUp() { - executorService = Executors.newFixedThreadPool(2); - cache = new ExternalRowCountCache(executorService, 2, new TestLoader()); - } - - @Test - public void test() throws Exception { - // table 1 - long rowCount = cache.getCachedRowCount(1, 1, 1); - Assertions.assertEquals(0, rowCount); - Thread.sleep(1000); - rowCount = cache.getCachedRowCount(1, 1, 1); - Assertions.assertEquals(111, rowCount); - - // table 2 - rowCount = cache.getCachedRowCount(1, 1, 2); - Assertions.assertEquals(0, rowCount); - Thread.sleep(1000); - rowCount = cache.getCachedRowCount(1, 1, 2); - Assertions.assertEquals(222, rowCount); - - // table 3 - rowCount = cache.getCachedRowCount(1, 1, 3); - // first get, it should be 0 because the loader is async - Assertions.assertEquals(0, rowCount); - // After sleep 2 sec and then get, it should be 1 - Thread.sleep(2000); - rowCount = cache.getCachedRowCount(1, 1, 3); - Assertions.assertEquals(334, rowCount); - // sleep 3 sec to trigger refresh - Thread.sleep(3000); - rowCount = cache.getCachedRowCount(1, 1, 3); - // the refresh will be triggered only when query it, so it should still be 1 - Assertions.assertEquals(334, rowCount); - // sleep 2 sec to wait for the doLoad - Thread.sleep(2000); - rowCount = cache.getCachedRowCount(1, 1, 3); - // refresh done, value should be 2 - Assertions.assertEquals(335, rowCount); - } -} diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/HMSCachedClientTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/TestHMSCachedClient.java similarity index 99% rename from fe/fe-core/src/test/java/org/apache/doris/datasource/HMSCachedClientTest.java rename to fe/fe-core/src/test/java/org/apache/doris/datasource/TestHMSCachedClient.java index 126df780dbf16a..3927c0db52460f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/HMSCachedClientTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/TestHMSCachedClient.java @@ -46,7 +46,7 @@ import java.util.function.Function; import java.util.stream.Collectors; -public class HMSCachedClientTest implements HMSCachedClient { +public class TestHMSCachedClient implements HMSCachedClient { public Map> partitions = new ConcurrentHashMap<>(); public Map> tables = new HashMap<>(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java index 54bbf5eca3f12b..dedc7738c8601c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java @@ -19,7 +19,7 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.PrimitiveType; -import org.apache.doris.datasource.HMSCachedClientTest; +import org.apache.doris.datasource.TestHMSCachedClient; import org.apache.doris.fs.LocalDfsFileSystem; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.THiveLocationParams; @@ -90,7 +90,7 @@ public static void createTestHiveCatalog() throws IOException { entries.set("hive.metastore.uris", uri); hmsClient = new ThriftHMSCachedClient(entries, 2); } else { - hmsClient = new HMSCachedClientTest(); + hmsClient = new TestHMSCachedClient(); } hmsOps = new HiveMetadataOps(null, hmsClient, fs); } diff --git a/fe/pom.xml b/fe/pom.xml index a15b2709a1b4b8..e30fdc2f6cb6a4 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -680,6 +680,11 @@ under the License. guava ${guava.version} + + com.google.guava + guava-testlib + ${guava.version} + com.fasterxml.jackson.core