From f40a9b5935f1605659557b38b594f125a0c2eaf3 Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Wed, 17 Apr 2024 09:52:48 +0800 Subject: [PATCH] [opt](meta-cache) refine the meta cache (#33449) 1. Use `caffeine` instead of `guava cache` to get better performace 2. Add a new class `CacheFactory` All (Async)LoadingCache should be built from `CacheFactory` 3. Use separator executor for different caches 1. rowCountRefreshExecutor For row count cache. Row count cache is an async loading cache, and we can ignore the result if cache missing or thread pool is full. So use a separate executor for this cache. 2. commonRefreshExecutor For other caches. Other caches are sync loading cache. But commonRefreshExecutor will be used for async refresh. That is, if cache entry is missing, the cache value will be loaded in caller thread, sychronously. if cache entry need refresh, it will be reloaded in commonRefreshExecutor. 3. fileListingExecutor File listing is a heavy operation, so use a separate executor for it. For fileCache, the refresh operation will still use commonRefreshExecutor to trigger refresh. And fileListingExecutor will be used to list file. 4. Change the refresh and expire logic of caches For most of caches, set `refreshAfterWrite` strategy, so that even if the cache entry is expired, the old entry can still be used while new entry is being loaded. 5. Add new global variable `enable_get_row_count_from_file_list` Default is true, if false, will disable getting row count from file list --- fe/fe-core/pom.xml | 5 + .../org/apache/doris/common/CacheFactory.java | 112 +++++++ .../doris/common/util/CacheBulkLoader.java | 4 +- .../doris/datasource/ExternalCatalog.java | 2 +- .../datasource/ExternalMetaCacheMgr.java | 60 +++- .../datasource/ExternalRowCountCache.java | 20 +- .../doris/datasource/ExternalSchemaCache.java | 40 +-- .../datasource/hive/HMSExternalTable.java | 115 ++++++- .../datasource/hive/HiveMetaStoreCache.java | 139 ++++----- .../datasource/hive/HiveMetadataOps.java | 2 +- .../source/HudiCachedPartitionProcessor.java | 30 +- .../hudi/source/HudiPartitionMgr.java | 21 +- .../datasource/hudi/source/HudiScanNode.java | 3 +- .../iceberg/IcebergMetadataCache.java | 112 +++---- .../iceberg/IcebergMetadataCacheMgr.java | 7 +- .../maxcompute/MaxComputeExternalTable.java | 40 ++- .../maxcompute/MaxComputeMetadataCache.java | 31 +- .../MaxComputeMetadataCacheMgr.java | 5 +- .../org/apache/doris/fs/FileSystemCache.java | 39 +-- .../apache/doris/httpv2/rest/ShowAction.java | 4 +- .../org/apache/doris/qe/GlobalVariable.java | 12 + .../statistics/BasicAsyncCacheLoader.java | 4 +- .../doris/statistics/util/StatisticsUtil.java | 108 ------- .../doris/common/CacheBulkLoaderTest.java | 20 +- .../apache/doris/common/CacheFactoryTest.java | 291 ++++++++++++++++++ .../doris/datasource/CatalogMgrTest.java | 7 +- .../datasource/ExternalRowCountCacheTest.java | 100 ------ ...ientTest.java => TestHMSCachedClient.java} | 2 +- .../doris/datasource/hive/HmsCommitTest.java | 4 +- fe/pom.xml | 5 + 30 files changed, 831 insertions(+), 513 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/common/CacheFactory.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/common/CacheFactoryTest.java delete mode 100644 fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalRowCountCacheTest.java rename fe/fe-core/src/test/java/org/apache/doris/datasource/{HMSCachedClientTest.java => TestHMSCachedClient.java} (99%) diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml index 83deec7650c750..6fd4846d4640e3 100644 --- a/fe/fe-core/pom.xml +++ b/fe/fe-core/pom.xml @@ -157,6 +157,11 @@ under the License. com.google.guava guava + + com.google.guava + guava-testlib + test + com.fasterxml.jackson.core diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/CacheFactory.java b/fe/fe-core/src/main/java/org/apache/doris/common/CacheFactory.java new file mode 100644 index 00000000000000..fbb004e5c99b02 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/CacheFactory.java @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common; + +import com.github.benmanes.caffeine.cache.AsyncCacheLoader; +import com.github.benmanes.caffeine.cache.AsyncLoadingCache; +import com.github.benmanes.caffeine.cache.CacheLoader; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.Ticker; +import org.jetbrains.annotations.NotNull; + +import java.time.Duration; +import java.util.OptionalLong; +import java.util.concurrent.ExecutorService; + +/** + * Factory to create Caffeine cache. + *

+ * This class is used to create Caffeine cache with specified parameters. + * It is used to create both sync and async cache. + * The cache is created with the following parameters: + * - expireAfterWriteSec: The duration after which the cache entries will expire. + * - refreshAfterWriteSec: The duration after which the cache entries will be refreshed. + * - maxSize: The maximum size of the cache. + * - enableStats: Whether to enable stats for the cache. + * - ticker: The ticker to use for the cache. + * The cache can be created with the above parameters using the buildCache and buildAsyncCache methods. + *

+ */ +public class CacheFactory { + + private OptionalLong expireAfterWriteSec; + private OptionalLong refreshAfterWriteSec; + private long maxSize; + private boolean enableStats; + // Ticker is used to provide a time source for the cache. + // Only used for test, to provide a fake time source. + // If not provided, the system time is used. + private Ticker ticker; + + public CacheFactory( + OptionalLong expireAfterWriteSec, + OptionalLong refreshAfterWriteSec, + long maxSize, + boolean enableStats, + Ticker ticker) { + this.expireAfterWriteSec = expireAfterWriteSec; + this.refreshAfterWriteSec = refreshAfterWriteSec; + this.maxSize = maxSize; + this.enableStats = enableStats; + this.ticker = ticker; + } + + // Build a loading cache, without executor, it will use fork-join pool for refresh + public LoadingCache buildCache(CacheLoader cacheLoader) { + Caffeine builder = buildWithParams(); + return builder.build(cacheLoader); + } + + // Build a loading cache, with executor, it will use given executor for refresh + public LoadingCache buildCache(CacheLoader cacheLoader, ExecutorService executor) { + Caffeine builder = buildWithParams(); + builder.executor(executor); + return builder.build(cacheLoader); + } + + // Build an async loading cache + public AsyncLoadingCache buildAsyncCache(AsyncCacheLoader cacheLoader, + ExecutorService executor) { + Caffeine builder = buildWithParams(); + builder.executor(executor); + return builder.buildAsync(cacheLoader); + } + + @NotNull + private Caffeine buildWithParams() { + Caffeine builder = Caffeine.newBuilder(); + builder.maximumSize(maxSize); + + if (expireAfterWriteSec.isPresent()) { + builder.expireAfterWrite(Duration.ofSeconds(expireAfterWriteSec.getAsLong())); + } + if (refreshAfterWriteSec.isPresent()) { + builder.refreshAfterWrite(Duration.ofSeconds(refreshAfterWriteSec.getAsLong())); + } + + if (enableStats) { + builder.recordStats(); + } + + if (ticker != null) { + builder.ticker(ticker); + } + return builder; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/CacheBulkLoader.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/CacheBulkLoader.java index 441f22d8b68ace..eee08b872ec3fc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/CacheBulkLoader.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/CacheBulkLoader.java @@ -19,7 +19,7 @@ import org.apache.doris.common.Pair; -import com.google.common.cache.CacheLoader; +import com.github.benmanes.caffeine.cache.CacheLoader; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.collect.Streams; @@ -31,7 +31,7 @@ import java.util.concurrent.Future; import java.util.stream.Collectors; -public abstract class CacheBulkLoader extends CacheLoader { +public abstract class CacheBulkLoader implements CacheLoader { protected abstract ExecutorService getExecutor(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index 45d99e02223757..6d5ae985fefb25 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -104,7 +104,7 @@ public abstract class ExternalCatalog @SerializedName(value = "catalogProperty") protected CatalogProperty catalogProperty; @SerializedName(value = "initialized") - private boolean initialized = false; + protected boolean initialized = false; @SerializedName(value = "idToDb") protected Map> idToDb = Maps.newConcurrentMap(); @SerializedName(value = "lastUpdateTime") diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java index 87c4ff65e988b5..6198f1bab9a911 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java @@ -50,13 +50,35 @@ public class ExternalMetaCacheMgr { private static final Logger LOG = LogManager.getLogger(ExternalMetaCacheMgr.class); + /** + * Executors for loading caches + * 1. rowCountRefreshExecutor + * For row count cache. + * Row count cache is an async loading cache, and we can ignore the result + * if cache missing or thread pool is full. + * So use a separate executor for this cache. + *

+ * 2. commonRefreshExecutor + * For other caches. Other caches are sync loading cache. + * But commonRefreshExecutor will be used for async refresh. + * That is, if cache entry is missing, the cache value will be loaded in caller thread, sychronously. + * if cache entry need refresh, it will be reloaded in commonRefreshExecutor. + *

+ * 3. fileListingExecutor + * File listing is a heavy operation, so use a separate executor for it. + * For fileCache, the refresh operation will still use commonRefreshExecutor to trigger refresh. + * And fileListingExecutor will be used to list file. + */ + private ExecutorService rowCountRefreshExecutor; + private ExecutorService commonRefreshExecutor; + private ExecutorService fileListingExecutor; + // catalog id -> HiveMetaStoreCache private final Map cacheMap = Maps.newConcurrentMap(); // catalog id -> table schema cache private Map schemaCacheMap = Maps.newHashMap(); // hudi partition manager private final HudiPartitionMgr hudiPartitionMgr; - private ExecutorService executor; // all catalogs could share the same fsCache. private FileSystemCache fsCache; // all external table row count cache. @@ -65,24 +87,42 @@ public class ExternalMetaCacheMgr { private final MaxComputeMetadataCacheMgr maxComputeMetadataCacheMgr; public ExternalMetaCacheMgr() { - executor = ThreadPoolManager.newDaemonFixedThreadPool( + rowCountRefreshExecutor = ThreadPoolManager.newDaemonFixedThreadPool( + Config.max_external_cache_loader_thread_pool_size, + Config.max_external_cache_loader_thread_pool_size, + "RowCountRefreshExecutor", 0, true); + + commonRefreshExecutor = ThreadPoolManager.newDaemonFixedThreadPool( Config.max_external_cache_loader_thread_pool_size, Config.max_external_cache_loader_thread_pool_size * 1000, - "ExternalMetaCacheMgr", 120, true); - hudiPartitionMgr = HudiPartitionMgr.get(executor); - fsCache = new FileSystemCache(executor); - rowCountCache = new ExternalRowCountCache(executor, - Config.external_cache_expire_time_minutes_after_access * 60, null); - icebergMetadataCacheMgr = new IcebergMetadataCacheMgr(); + "CommonRefreshExecutor", 10, true); + + // The queue size should be large enough, + // because there may be thousands of partitions being queried at the same time. + fileListingExecutor = ThreadPoolManager.newDaemonFixedThreadPool( + Config.max_external_cache_loader_thread_pool_size, + Config.max_external_cache_loader_thread_pool_size * 1000, + "FileListingExecutor", 10, true); + + fsCache = new FileSystemCache(); + rowCountCache = new ExternalRowCountCache(rowCountRefreshExecutor); + + hudiPartitionMgr = new HudiPartitionMgr(commonRefreshExecutor); + icebergMetadataCacheMgr = new IcebergMetadataCacheMgr(commonRefreshExecutor); maxComputeMetadataCacheMgr = new MaxComputeMetadataCacheMgr(); } + public ExecutorService getFileListingExecutor() { + return fileListingExecutor; + } + public HiveMetaStoreCache getMetaStoreCache(HMSExternalCatalog catalog) { HiveMetaStoreCache cache = cacheMap.get(catalog.getId()); if (cache == null) { synchronized (cacheMap) { if (!cacheMap.containsKey(catalog.getId())) { - cacheMap.put(catalog.getId(), new HiveMetaStoreCache(catalog, executor)); + cacheMap.put(catalog.getId(), + new HiveMetaStoreCache(catalog, commonRefreshExecutor, fileListingExecutor)); } cache = cacheMap.get(catalog.getId()); } @@ -95,7 +135,7 @@ public ExternalSchemaCache getSchemaCache(ExternalCatalog catalog) { if (cache == null) { synchronized (schemaCacheMap) { if (!schemaCacheMap.containsKey(catalog.getId())) { - schemaCacheMap.put(catalog.getId(), new ExternalSchemaCache(catalog)); + schemaCacheMap.put(catalog.getId(), new ExternalSchemaCache(catalog, commonRefreshExecutor)); } cache = schemaCacheMap.get(catalog.getId()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java index 632cde1d5a721e..4f88d534f9261f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java @@ -18,18 +18,18 @@ package org.apache.doris.datasource; import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.CacheFactory; import org.apache.doris.common.Config; import org.apache.doris.statistics.BasicAsyncCacheLoader; import org.apache.doris.statistics.util.StatisticsUtil; import com.github.benmanes.caffeine.cache.AsyncLoadingCache; -import com.github.benmanes.caffeine.cache.Caffeine; import lombok.Getter; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.time.Duration; import java.util.Optional; +import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -38,16 +38,16 @@ public class ExternalRowCountCache { private static final Logger LOG = LogManager.getLogger(ExternalRowCountCache.class); private final AsyncLoadingCache> rowCountCache; - public ExternalRowCountCache(ExecutorService executor, long refreshAfterWriteSeconds, - BasicAsyncCacheLoader> loader) { + public ExternalRowCountCache(ExecutorService executor) { // 1. set expireAfterWrite to 1 day, avoid too many entries // 2. set refreshAfterWrite to 10min(default), so that the cache will be refreshed after 10min - rowCountCache = Caffeine.newBuilder() - .maximumSize(Config.max_external_table_row_count_cache_num) - .expireAfterAccess(Duration.ofDays(1)) - .refreshAfterWrite(Duration.ofSeconds(refreshAfterWriteSeconds)) - .executor(executor) - .buildAsync(loader == null ? new RowCountCacheLoader() : loader); + CacheFactory rowCountCacheFactory = new CacheFactory( + OptionalLong.of(86400L), + OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60), + Config.max_external_table_row_count_cache_num, + false, + null); + rowCountCache = rowCountCacheFactory.buildAsyncCache(new RowCountCacheLoader(), executor); } @Getter diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java index d8f2edbbd1c1b6..d39ebf7adc319f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java @@ -18,16 +18,14 @@ package org.apache.doris.datasource; import org.apache.doris.catalog.Column; +import org.apache.doris.common.CacheFactory; import org.apache.doris.common.Config; -import org.apache.doris.common.util.Util; import org.apache.doris.metric.GaugeMetric; import org.apache.doris.metric.Metric; import org.apache.doris.metric.MetricLabel; import org.apache.doris.metric.MetricRepo; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.LoadingCache; import com.google.common.collect.ImmutableList; import lombok.Data; import org.apache.logging.log4j.LogManager; @@ -35,9 +33,9 @@ import java.util.List; import java.util.Objects; +import java.util.OptionalLong; import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutorService; // The schema cache for external table public class ExternalSchemaCache { @@ -46,21 +44,20 @@ public class ExternalSchemaCache { private LoadingCache> schemaCache; - public ExternalSchemaCache(ExternalCatalog catalog) { + public ExternalSchemaCache(ExternalCatalog catalog, ExecutorService executor) { this.catalog = catalog; - init(); + init(executor); initMetrics(); } - private void init() { - schemaCache = CacheBuilder.newBuilder().maximumSize(Config.max_external_schema_cache_num) - .expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES) - .build(new CacheLoader>() { - @Override - public ImmutableList load(SchemaCacheKey key) { - return loadSchema(key); - } - }); + private void init(ExecutorService executor) { + CacheFactory schemaCacheeFactory = new CacheFactory( + OptionalLong.of(86400L), + OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60), + Config.max_external_schema_cache_num, + false, + null); + schemaCache = schemaCacheeFactory.buildCache(key -> loadSchema(key), executor); } private void initMetrics() { @@ -69,7 +66,7 @@ private void initMetrics() { Metric.MetricUnit.NOUNIT, "external schema cache number") { @Override public Long getValue() { - return schemaCache.size(); + return schemaCache.estimatedSize(); } }; schemaCacheGauge.addLabel(new MetricLabel("catalog", catalog.getName())); @@ -86,12 +83,7 @@ private ImmutableList loadSchema(SchemaCacheKey key) { public List getSchema(String dbName, String tblName) { SchemaCacheKey key = new SchemaCacheKey(dbName, tblName); - try { - return schemaCache.get(key); - } catch (ExecutionException e) { - throw new CacheException("failed to get schema for %s in catalog %s. err: %s", - e, key, catalog.getName(), Util.getRootCauseMessage(e)); - } + return schemaCache.get(key); } public void addSchemaForTest(String dbName, String tblName, ImmutableList schema) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index a788d9e57bca5a..05bba50ecb5935 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -27,6 +27,7 @@ import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.hudi.HudiUtils; import org.apache.doris.datasource.hudi.source.COWIncrementalRelation; @@ -46,6 +47,7 @@ import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.expressions.literal.StringLiteral; +import org.apache.doris.qe.GlobalVariable; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.BaseAnalysisTask; import org.apache.doris.statistics.ColumnStatistic; @@ -82,6 +84,8 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.time.LocalDate; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -600,7 +604,7 @@ public long fetchRowCount() { // Only hive table supports estimate row count by listing file. if (rowCount == -1 && dlaType.equals(DLAType.HIVE)) { LOG.debug("Will estimate row count from file list."); - rowCount = StatisticsUtil.getRowCountFromFileList(this); + rowCount = getRowCountFromFileList(); } return rowCount; } @@ -818,9 +822,8 @@ public void gsonPostProcess() throws IOException { @Override public List getChunkSizes() { - HiveMetaStoreCache.HivePartitionValues partitionValues = StatisticsUtil.getPartitionValuesForTable(this); - List filesByPartitions - = StatisticsUtil.getFilesForPartitions(this, partitionValues, 0); + HiveMetaStoreCache.HivePartitionValues partitionValues = getAllPartitionValues(); + List filesByPartitions = getFilesForPartitions(partitionValues, 0); List result = Lists.newArrayList(); for (HiveMetaStoreCache.FileCacheValue files : filesByPartitions) { for (HiveMetaStoreCache.HiveFileStatus file : files.getFiles()) { @@ -957,4 +960,108 @@ public boolean needAutoRefresh() { public boolean isPartitionColumnAllowNull() { return true; } + + /** + * Estimate hive table row count : totalFileSize/estimatedRowSize + */ + private long getRowCountFromFileList() { + if (!GlobalVariable.enable_get_row_count_from_file_list) { + return -1; + } + if (isView()) { + return 0; + } + HiveMetaStoreCache.HivePartitionValues partitionValues = getAllPartitionValues(); + + // Get files for all partitions. + int samplePartitionSize = Config.hive_stats_partition_sample_size; + List filesByPartitions = getFilesForPartitions(partitionValues, + samplePartitionSize); + long totalSize = 0; + // Calculate the total file size. + for (HiveMetaStoreCache.FileCacheValue files : filesByPartitions) { + for (HiveMetaStoreCache.HiveFileStatus file : files.getFiles()) { + totalSize += file.getLength(); + } + } + // Estimate row count: totalSize/estimatedRowSize + long estimatedRowSize = 0; + List partitionColumns = getPartitionColumns(); + for (Column column : getFullSchema()) { + // Partition column shouldn't count to the row size, because it is not in the data file. + if (partitionColumns != null && partitionColumns.contains(column)) { + continue; + } + estimatedRowSize += column.getDataType().getSlotSize(); + } + if (estimatedRowSize == 0) { + return 0; + } + + int totalPartitionSize = partitionValues == null ? 1 : partitionValues.getIdToPartitionItem().size(); + if (samplePartitionSize < totalPartitionSize) { + totalSize = totalSize * totalPartitionSize / samplePartitionSize; + } + return totalSize / estimatedRowSize; + } + + // Get all partition values from cache. + private HiveMetaStoreCache.HivePartitionValues getAllPartitionValues() { + if (isView()) { + return null; + } + HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() + .getMetaStoreCache((HMSExternalCatalog) catalog); + List partitionColumnTypes = getPartitionColumnTypes(); + HiveMetaStoreCache.HivePartitionValues partitionValues = null; + // Get table partitions from cache. + if (!partitionColumnTypes.isEmpty()) { + // It is ok to get partition values from cache, + // no need to worry that this call will invalid or refresh the cache. + // because it has enough space to keep partition info of all tables in cache. + partitionValues = cache.getPartitionValues(dbName, name, partitionColumnTypes); + } + return partitionValues; + } + + // Get all files related to given partition values + // If sampleSize > 0, randomly choose part of partitions of the whole table. + private List getFilesForPartitions( + HiveMetaStoreCache.HivePartitionValues partitionValues, int sampleSize) { + if (isView()) { + return Lists.newArrayList(); + } + HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() + .getMetaStoreCache((HMSExternalCatalog) catalog); + List hivePartitions = Lists.newArrayList(); + if (partitionValues != null) { + Map idToPartitionItem = partitionValues.getIdToPartitionItem(); + int totalPartitionSize = idToPartitionItem.size(); + Collection partitionItems; + List> partitionValuesList; + // If partition number is too large, randomly choose part of them to estimate the whole table. + if (sampleSize > 0 && sampleSize < totalPartitionSize) { + List items = new ArrayList<>(idToPartitionItem.values()); + Collections.shuffle(items); + partitionItems = items.subList(0, sampleSize); + partitionValuesList = Lists.newArrayListWithCapacity(sampleSize); + } else { + partitionItems = idToPartitionItem.values(); + partitionValuesList = Lists.newArrayListWithCapacity(totalPartitionSize); + } + for (PartitionItem item : partitionItems) { + partitionValuesList.add(((ListPartitionItem) item).getItems().get(0).getPartitionValuesAsStringList()); + } + // get partitions without cache, so that it will not invalid the cache when executing + // non query request such as `show table status` + hivePartitions = cache.getAllPartitionsWithoutCache(dbName, name, partitionValuesList); + } else { + hivePartitions.add(new HivePartition(dbName, name, true, + getRemoteTable().getSd().getInputFormat(), + getRemoteTable().getSd().getLocation(), null, Maps.newHashMap())); + } + // Get files for all partitions. + String bindBrokerName = catalog.bindBrokerName(); + return cache.getFilesByPartitionsWithoutCache(hivePartitions, bindBrokerName); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index 23c9de3be1f531..2690d82db78d66 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -27,6 +27,7 @@ import org.apache.doris.catalog.PartitionKey; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.CacheFactory; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; @@ -50,12 +51,11 @@ import org.apache.doris.planner.ListPartitionPrunerV2; import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId; +import com.github.benmanes.caffeine.cache.CacheLoader; +import com.github.benmanes.caffeine.cache.LoadingCache; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Strings; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -89,11 +89,11 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -111,63 +111,58 @@ public class HiveMetaStoreCache { private final HMSExternalCatalog catalog; private JobConf jobConf; - private final ExecutorService executor; + private final ExecutorService refreshExecutor; + private final ExecutorService fileListingExecutor; // cache from -> private LoadingCache partitionValuesCache; // cache from -> private LoadingCache partitionCache; // the ref of cache from -> + // Other thread may reset this cache, so use AtomicReference to wrap it. private volatile AtomicReference> fileCacheRef = new AtomicReference<>(); - public HiveMetaStoreCache(HMSExternalCatalog catalog, ExecutorService executor) { + public HiveMetaStoreCache(HMSExternalCatalog catalog, + ExecutorService refreshExecutor, ExecutorService fileListingExecutor) { this.catalog = catalog; - this.executor = executor; + this.refreshExecutor = refreshExecutor; + this.fileListingExecutor = fileListingExecutor; init(); initMetrics(); } + /** + * Because the partitionValuesCache|partitionCache|fileCache use the same executor for batch loading, + * we need to be very careful and try to avoid the circular dependency of these tasks + * which will bring out thread deadlock. + **/ private void init() { - /** - * Because the partitionValuesCache|partitionCache|fileCache use the same executor for batch loading, - * we need to be very careful and try to avoid the circular dependency of there tasks - * which will bring out thread deak-locks. - * */ - partitionValuesCache = CacheBuilder.newBuilder().maximumSize(Config.max_hive_table_cache_num) - .expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES) - .build(new CacheBulkLoader() { - @Override - protected ExecutorService getExecutor() { - return HiveMetaStoreCache.this.executor; - } - - @Override - public HivePartitionValues load(PartitionValueCacheKey key) { - return loadPartitionValues(key); - } - - }); - - partitionCache = CacheBuilder.newBuilder().maximumSize(Config.max_hive_partition_cache_num) - .expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES) - .build(new CacheBulkLoader() { - @Override - protected ExecutorService getExecutor() { - return HiveMetaStoreCache.this.executor; - } - - @Override - public HivePartition load(PartitionCacheKey key) { - return loadPartition(key); - } - - @Override - public Map loadAll(Iterable keys) { - return loadPartitions(keys); - } + CacheFactory partitionValuesCacheFactory = new CacheFactory( + OptionalLong.of(86400L), + OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60L), + Config.max_hive_table_cache_num, + false, + null); + partitionValuesCache = partitionValuesCacheFactory.buildCache(key -> loadPartitionValues(key), refreshExecutor); + + CacheFactory partitionCacheFactory = new CacheFactory( + OptionalLong.of(86400L), + OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60L), + Config.max_hive_partition_cache_num, + false, + null); + partitionCache = partitionCacheFactory.buildCache(new CacheLoader() { + @Override + public HivePartition load(PartitionCacheKey key) { + return loadPartition(key); + } - }); + @Override + public Map loadAll(Iterable keys) { + return loadPartitions(keys); + } + }, refreshExecutor); setNewFileCache(); } @@ -183,18 +178,18 @@ public void setNewFileCache() { (catalog.getProperties().get(HMSExternalCatalog.FILE_META_CACHE_TTL_SECOND)), HMSExternalCatalog.FILE_META_CACHE_NO_TTL); - CacheBuilder fileCacheBuilder = CacheBuilder.newBuilder() - .maximumSize(Config.max_external_file_cache_num) - .expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES); - - if (fileMetaCacheTtlSecond >= HMSExternalCatalog.FILE_META_CACHE_TTL_DISABLE_CACHE) { - fileCacheBuilder.expireAfterWrite(fileMetaCacheTtlSecond, TimeUnit.SECONDS); - } + CacheFactory fileCacheFactory = new CacheFactory( + OptionalLong.of(fileMetaCacheTtlSecond >= HMSExternalCatalog.FILE_META_CACHE_TTL_DISABLE_CACHE + ? fileMetaCacheTtlSecond : 86400L), + OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60L), + Config.max_external_file_cache_num, + false, + null); CacheLoader loader = new CacheBulkLoader() { @Override protected ExecutorService getExecutor() { - return HiveMetaStoreCache.this.executor; + return HiveMetaStoreCache.this.fileListingExecutor; } @Override @@ -203,11 +198,11 @@ public FileCacheValue load(FileCacheKey key) { } }; - LoadingCache preFileCache = fileCacheRef.get(); + LoadingCache oldFileCache = fileCacheRef.get(); - fileCacheRef.set(fileCacheBuilder.build(loader)); - if (Objects.nonNull(preFileCache)) { - preFileCache.invalidateAll(); + fileCacheRef.set(fileCacheFactory.buildCache(loader, this.refreshExecutor)); + if (Objects.nonNull(oldFileCache)) { + oldFileCache.invalidateAll(); } } @@ -217,7 +212,7 @@ private void initMetrics() { Metric.MetricUnit.NOUNIT, "hive partition value cache number") { @Override public Long getValue() { - return partitionValuesCache.size(); + return partitionValuesCache.estimatedSize(); } }; valueCacheGauge.addLabel(new MetricLabel("type", "partition_value")); @@ -228,7 +223,7 @@ public Long getValue() { Metric.MetricUnit.NOUNIT, "hive partition cache number") { @Override public Long getValue() { - return partitionCache.size(); + return partitionCache.estimatedSize(); } }; partitionCacheGauge.addLabel(new MetricLabel("type", "partition")); @@ -239,7 +234,7 @@ public Long getValue() { Metric.MetricUnit.NOUNIT, "hive file cache number") { @Override public Long getValue() { - return fileCacheRef.get().size(); + return fileCacheRef.get().estimatedSize(); } }; fileCacheGauge.addLabel(new MetricLabel("type", "file")); @@ -468,11 +463,7 @@ public HivePartitionValues getPartitionValues(String dbName, String tblName, Lis } public HivePartitionValues getPartitionValues(PartitionValueCacheKey key) { - try { - return partitionValuesCache.get(key); - } catch (ExecutionException e) { - throw new CacheException("failed to get partition values for %s in catalog %s", e, key, catalog.getName()); - } + return partitionValuesCache.get(key); } public List getFilesByPartitionsWithCache(List partitions, @@ -499,10 +490,10 @@ private List getFilesByPartitions(List partitions List fileLists; try { if (withCache) { - fileLists = fileCacheRef.get().getAll(keys).values().asList(); + fileLists = fileCacheRef.get().getAll(keys).values().stream().collect(Collectors.toList()); } else { List>> pList = keys.stream() - .map(key -> Pair.of(key, executor.submit(() -> loadFiles(key)))) + .map(key -> Pair.of(key, fileListingExecutor.submit(() -> loadFiles(key)))) .collect(Collectors.toList()); fileLists = Lists.newArrayListWithExpectedSize(keys.size()); @@ -545,15 +536,11 @@ private List getAllPartitions(String dbName, String name, List

  • partitions; - try { - if (withCache) { - partitions = partitionCache.getAll(keys).values().asList(); - } else { - Map map = loadPartitions(keys); - partitions = map.values().stream().collect(Collectors.toList()); - } - } catch (ExecutionException e) { - throw new CacheException("failed to get partition in catalog %s", e, catalog.getName()); + if (withCache) { + partitions = partitionCache.getAll(keys).values().stream().collect(Collectors.toList()); + } else { + Map map = loadPartitions(keys); + partitions = map.values().stream().collect(Collectors.toList()); } if (LOG.isDebugEnabled()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java index c5e74defc1af4c..a4566cd0b7a03c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java @@ -78,7 +78,7 @@ public HiveMetadataOps(HMSExternalCatalog catalog, HMSCachedClient client) { this.fs = new DFSFileSystem(catalog.getProperties()); } - // for test + @VisibleForTesting public HiveMetadataOps(HMSExternalCatalog catalog, HMSCachedClient client, FileSystem fs) { this.catalog = catalog; this.client = client; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java index 90c89dbbda2c76..39e68ea7a168ef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java @@ -17,6 +17,7 @@ package org.apache.doris.datasource.hudi.source; +import org.apache.doris.common.CacheFactory; import org.apache.doris.common.Config; import org.apache.doris.datasource.CacheException; import org.apache.doris.datasource.TablePartitionValues; @@ -24,10 +25,8 @@ import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.hive.HMSExternalTable; +import com.github.benmanes.caffeine.cache.LoadingCache; import com.google.common.base.Preconditions; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -37,8 +36,9 @@ import java.util.Arrays; import java.util.List; +import java.util.OptionalLong; import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; public class HudiCachedPartitionProcessor extends HudiPartitionProcessor { @@ -47,22 +47,16 @@ public class HudiCachedPartitionProcessor extends HudiPartitionProcessor { private final Executor executor; private final LoadingCache partitionCache; - public HudiCachedPartitionProcessor(long catalogId, Executor executor) { + public HudiCachedPartitionProcessor(long catalogId, ExecutorService executor) { this.catalogId = catalogId; this.executor = executor; - this.partitionCache = CacheBuilder.newBuilder().maximumSize(Config.max_hive_table_cache_num) - .expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES) - .build(CacheLoader.asyncReloading( - new CacheLoader() { - @Override - public TablePartitionValues load(TablePartitionKey key) throws Exception { - return new TablePartitionValues(); - } - }, executor)); - } - - public Executor getExecutor() { - return executor; + CacheFactory partitionCacheFactory = new CacheFactory( + OptionalLong.of(86400L), + OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60), + Config.max_hive_table_cache_num, + false, + null); + this.partitionCache = partitionCacheFactory.buildCache(key -> new TablePartitionValues(), executor); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionMgr.java index b1d894e3d7f003..b6bfd0ec499427 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionMgr.java @@ -23,15 +23,13 @@ import com.google.common.collect.Maps; import java.util.Map; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; public class HudiPartitionMgr { - private static volatile HudiPartitionMgr partitionMgr = null; + private final Map partitionProcessors = Maps.newConcurrentMap(); + private final ExecutorService executor; - private static final Map partitionProcessors = Maps.newConcurrentMap(); - private final Executor executor; - - private HudiPartitionMgr(Executor executor) { + public HudiPartitionMgr(ExecutorService executor) { this.executor = executor; } @@ -72,15 +70,4 @@ public void cleanTablePartitions(long catalogId, String dbName, String tblName) processor.cleanTablePartitions(dbName, tblName); } } - - public static HudiPartitionMgr get(Executor executor) { - if (partitionMgr == null) { - synchronized (HudiPartitionMgr.class) { - if (partitionMgr == null) { - partitionMgr = new HudiPartitionMgr(executor); - } - } - } - return partitionMgr; - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java index 58068c575d7365..eb1d77a322dfc2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java @@ -324,8 +324,7 @@ public List getSplits() throws UserException { List partitions = HiveMetaStoreClientHelper.ugiDoAs( HiveMetaStoreClientHelper.getConfiguration(hmsTable), () -> getPrunedPartitions(hudiClient, snapshotTimestamp)); - Executor executor = ((HudiCachedPartitionProcessor) Env.getCurrentEnv() - .getExtMetaCacheMgr().getHudiPartitionProcess(hmsTable.getCatalog())).getExecutor(); + Executor executor = Env.getCurrentEnv().getExtMetaCacheMgr().getFileListingExecutor(); List splits = Collections.synchronizedList(new ArrayList<>()); CountDownLatch countDownLatch = new CountDownLatch(partitions.size()); partitions.forEach(partition -> executor.execute(() -> { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java index 10fa205b095133..adc1e1f74f176b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java @@ -18,6 +18,7 @@ package org.apache.doris.datasource.iceberg; import org.apache.doris.catalog.Env; +import org.apache.doris.common.CacheFactory; import org.apache.doris.common.Config; import org.apache.doris.common.UserException; import org.apache.doris.datasource.CatalogIf; @@ -27,8 +28,7 @@ import org.apache.doris.thrift.TIcebergMetadataParams; import avro.shaded.com.google.common.collect.Lists; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; +import com.github.benmanes.caffeine.cache.LoadingCache; import com.google.common.collect.Iterables; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.HdfsConfiguration; @@ -38,77 +38,85 @@ import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hive.HiveCatalog; +import org.jetbrains.annotations.NotNull; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.TimeUnit; +import java.util.OptionalLong; +import java.util.concurrent.ExecutorService; public class IcebergMetadataCache { - private final Cache> snapshotListCache; - private final Cache tableCache; - - public IcebergMetadataCache() { - this.snapshotListCache = CacheBuilder.newBuilder().maximumSize(Config.max_hive_table_cache_num) - .expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES) - .build(); - - this.tableCache = CacheBuilder.newBuilder().maximumSize(Config.max_hive_table_cache_num) - .expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES) - .build(); + private final LoadingCache> snapshotListCache; + private final LoadingCache tableCache; + + public IcebergMetadataCache(ExecutorService executor) { + CacheFactory snapshotListCacheFactory = new CacheFactory( + OptionalLong.of(86400L), + OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60), + Config.max_hive_table_cache_num, + false, + null); + this.snapshotListCache = snapshotListCacheFactory.buildCache(key -> loadSnapshots(key), executor); + + CacheFactory tableCacheFactory = new CacheFactory( + OptionalLong.of(86400L), + OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60), + Config.max_hive_table_cache_num, + false, + null); + this.tableCache = tableCacheFactory.buildCache(key -> loadTable(key), executor); } public List getSnapshotList(TIcebergMetadataParams params) throws UserException { CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(params.getCatalog()); IcebergMetadataCacheKey key = - IcebergMetadataCacheKey.of(catalog.getId(), params.getDatabase(), params.getTable()); - List ifPresent = snapshotListCache.getIfPresent(key); - if (ifPresent != null) { - return ifPresent; - } + IcebergMetadataCacheKey.of(catalog, params.getDatabase(), params.getTable()); + return snapshotListCache.get(key); + } + + public Table getIcebergTable(CatalogIf catalog, String dbName, String tbName) { + IcebergMetadataCacheKey key = IcebergMetadataCacheKey.of(catalog, dbName, tbName); + return tableCache.get(key); + } - Table icebergTable = getIcebergTable(catalog, params.getDatabase(), params.getTable()); + @NotNull + private List loadSnapshots(IcebergMetadataCacheKey key) { + Table icebergTable = getIcebergTable(key.catalog, key.dbName, key.tableName); List snaps = Lists.newArrayList(); Iterables.addAll(snaps, icebergTable.snapshots()); - snapshotListCache.put(key, snaps); return snaps; } - public Table getIcebergTable(CatalogIf catalog, String dbName, String tbName) { - IcebergMetadataCacheKey key = IcebergMetadataCacheKey.of(catalog.getId(), dbName, tbName); - Table cacheTable = tableCache.getIfPresent(key); - if (cacheTable != null) { - return cacheTable; - } - + @NotNull + private Table loadTable(IcebergMetadataCacheKey key) { Catalog icebergCatalog; - if (catalog instanceof HMSExternalCatalog) { - HMSExternalCatalog ctg = (HMSExternalCatalog) catalog; + if (key.catalog instanceof HMSExternalCatalog) { + HMSExternalCatalog ctg = (HMSExternalCatalog) key.catalog; icebergCatalog = createIcebergHiveCatalog( ctg.getHiveMetastoreUris(), ctg.getCatalogProperty().getHadoopProperties(), ctg.getProperties()); - } else if (catalog instanceof IcebergExternalCatalog) { - icebergCatalog = ((IcebergExternalCatalog) catalog).getCatalog(); + } else if (key.catalog instanceof IcebergExternalCatalog) { + icebergCatalog = ((IcebergExternalCatalog) key.catalog).getCatalog(); } else { throw new RuntimeException("Only support 'hms' and 'iceberg' type for iceberg table"); } - Table icebergTable = HiveMetaStoreClientHelper.ugiDoAs(catalog.getId(), - () -> icebergCatalog.loadTable(TableIdentifier.of(dbName, tbName))); - initIcebergTableFileIO(icebergTable, catalog.getProperties()); - tableCache.put(key, icebergTable); + Table icebergTable = HiveMetaStoreClientHelper.ugiDoAs(key.catalog.getId(), + () -> icebergCatalog.loadTable(TableIdentifier.of(key.dbName, key.tableName))); + initIcebergTableFileIO(icebergTable, key.catalog.getProperties()); return icebergTable; } public void invalidateCatalogCache(long catalogId) { snapshotListCache.asMap().keySet().stream() - .filter(key -> key.catalogId == catalogId) + .filter(key -> key.catalog.getId() == catalogId) .forEach(snapshotListCache::invalidate); tableCache.asMap().entrySet().stream() - .filter(entry -> entry.getKey().catalogId == catalogId) + .filter(entry -> entry.getKey().catalog.getId() == catalogId) .forEach(entry -> { ManifestFiles.dropCache(entry.getValue().io()); tableCache.invalidate(entry.getKey()); @@ -117,13 +125,15 @@ public void invalidateCatalogCache(long catalogId) { public void invalidateTableCache(long catalogId, String dbName, String tblName) { snapshotListCache.asMap().keySet().stream() - .filter(key -> key.catalogId == catalogId && key.dbName.equals(dbName) && key.tableName.equals(tblName)) + .filter(key -> key.catalog.getId() == catalogId && key.dbName.equals(dbName) && key.tableName.equals( + tblName)) .forEach(snapshotListCache::invalidate); tableCache.asMap().entrySet().stream() .filter(entry -> { IcebergMetadataCacheKey key = entry.getKey(); - return key.catalogId == catalogId && key.dbName.equals(dbName) && key.tableName.equals(tblName); + return key.catalog.getId() == catalogId && key.dbName.equals(dbName) && key.tableName.equals( + tblName); }) .forEach(entry -> { ManifestFiles.dropCache(entry.getValue().io()); @@ -133,13 +143,13 @@ public void invalidateTableCache(long catalogId, String dbName, String tblName) public void invalidateDbCache(long catalogId, String dbName) { snapshotListCache.asMap().keySet().stream() - .filter(key -> key.catalogId == catalogId && key.dbName.equals(dbName)) + .filter(key -> key.catalog.getId() == catalogId && key.dbName.equals(dbName)) .forEach(snapshotListCache::invalidate); tableCache.asMap().entrySet().stream() .filter(entry -> { IcebergMetadataCacheKey key = entry.getKey(); - return key.catalogId == catalogId && key.dbName.equals(dbName); + return key.catalog.getId() == catalogId && key.dbName.equals(dbName); }) .forEach(entry -> { ManifestFiles.dropCache(entry.getValue().io()); @@ -184,22 +194,18 @@ private static void initIcebergTableFileIO(Table table, Map prop } static class IcebergMetadataCacheKey { - long catalogId; + CatalogIf catalog; String dbName; String tableName; - public IcebergMetadataCacheKey(long catalogId, String dbName, String tableName) { - this.catalogId = catalogId; + public IcebergMetadataCacheKey(CatalogIf catalog, String dbName, String tableName) { + this.catalog = catalog; this.dbName = dbName; this.tableName = tableName; } - static IcebergMetadataCacheKey of(long catalogId, String dbName, String tableName) { - return new IcebergMetadataCacheKey( - catalogId, - dbName, - tableName - ); + static IcebergMetadataCacheKey of(CatalogIf catalog, String dbName, String tableName) { + return new IcebergMetadataCacheKey(catalog, dbName, tableName); } @Override @@ -211,14 +217,14 @@ public boolean equals(Object o) { return false; } IcebergMetadataCacheKey that = (IcebergMetadataCacheKey) o; - return catalogId == that.catalogId + return catalog.getId() == that.catalog.getId() && Objects.equals(dbName, that.dbName) && Objects.equals(tableName, that.tableName); } @Override public int hashCode() { - return Objects.hash(catalogId, dbName, tableName); + return Objects.hash(catalog.getId(), dbName, tableName); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCacheMgr.java index 012af41879cc68..69a29cc2885e83 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCacheMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCacheMgr.java @@ -17,11 +17,14 @@ package org.apache.doris.datasource.iceberg; +import java.util.concurrent.ExecutorService; + public class IcebergMetadataCacheMgr { - private final IcebergMetadataCache icebergMetadataCache = new IcebergMetadataCache(); + private IcebergMetadataCache icebergMetadataCache; - public IcebergMetadataCacheMgr() { + public IcebergMetadataCacheMgr(ExecutorService executor) { + this.icebergMetadataCache = new IcebergMetadataCache(executor); } public IcebergMetadataCache getIcebergMetadataCache() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java index 474b4e6cb8e7f8..363b7ce689dab1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java @@ -82,9 +82,19 @@ public long getTotalRows() throws TunnelException { MaxComputeMetadataCache metadataCache = Env.getCurrentEnv().getExtMetaCacheMgr() .getMaxComputeMetadataCache(catalog.getId()); MaxComputeExternalCatalog mcCatalog = ((MaxComputeExternalCatalog) catalog); - return metadataCache.getCachedRowCount(dbName, name, null, () -> mcCatalog.getTableTunnel() - .getDownloadSession(dbName, name, null) - .getRecordCount()); + return metadataCache.getCachedRowCount(dbName, name, null, key -> { + try { + return loadRowCount(mcCatalog, key); + } catch (TunnelException e) { + throw new RuntimeException(e); + } + }); + } + + private long loadRowCount(MaxComputeExternalCatalog catalog, MaxComputeCacheKey key) throws TunnelException { + return catalog.getTableTunnel() + .getDownloadSession(key.getDbName(), key.getTblName(), null) + .getRecordCount(); } @Override @@ -106,21 +116,23 @@ public TablePartitionValues getPartitionValues() { MaxComputeMetadataCache metadataCache = Env.getCurrentEnv().getExtMetaCacheMgr() .getMaxComputeMetadataCache(catalog.getId()); return metadataCache.getCachedPartitionValues( - new MaxComputeCacheKey(projectName, tableName), - () -> { - TablePartitionValues partitionValues = new TablePartitionValues(); - partitionValues.addPartitions(partitionSpecs, - partitionSpecs.stream() - .map(p -> parsePartitionValues(new ArrayList<>(getPartitionNames()), p)) - .collect(Collectors.toList()), - partitionTypes); - return partitionValues; - }); + new MaxComputeCacheKey(projectName, tableName), key -> loadPartitionValues(key)); + } + + private TablePartitionValues loadPartitionValues(MaxComputeCacheKey key) { + TablePartitionValues partitionValues = new TablePartitionValues(); + partitionValues.addPartitions(partitionSpecs, + partitionSpecs.stream() + .map(p -> parsePartitionValues(new ArrayList<>(getPartitionNames()), p)) + .collect(Collectors.toList()), + partitionTypes); + return partitionValues; } /** * parse all values from partitionPath to a single list. - * @param partitionColumns partitionColumns can contain the part1,part2,part3... + * + * @param partitionColumns partitionColumns can contain the part1,part2,part3... * @param partitionPath partitionPath format is like the 'part1=123/part2=abc/part3=1bc' * @return all values of partitionPath */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeMetadataCache.java index b26ad8364e34f4..cce570a11746aa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeMetadataCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeMetadataCache.java @@ -20,14 +20,12 @@ import org.apache.doris.common.Config; import org.apache.doris.datasource.TablePartitionValues; -import com.aliyun.odps.tunnel.TunnelException; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.stream.Collectors; public class MaxComputeMetadataCache { @@ -35,32 +33,23 @@ public class MaxComputeMetadataCache { private final Cache tableRowCountCache; public MaxComputeMetadataCache() { - partitionValuesCache = CacheBuilder.newBuilder().maximumSize(Config.max_hive_partition_cache_num) + partitionValuesCache = Caffeine.newBuilder().maximumSize(Config.max_hive_partition_cache_num) .expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES) .build(); - tableRowCountCache = CacheBuilder.newBuilder().maximumSize(10000) + tableRowCountCache = Caffeine.newBuilder().maximumSize(10000) .expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES) .build(); } public Long getCachedRowCount(String dbName, String tblName, String partitionSpec, - Callable loader) throws TunnelException { - try { - MaxComputeCacheKey tablePartitionKey = new MaxComputeCacheKey(dbName, tblName, partitionSpec); - return tableRowCountCache.get(tablePartitionKey, loader); - } catch (ExecutionException e) { - throw new TunnelException(e.getMessage(), e); - } + Function loader) { + MaxComputeCacheKey tablePartitionKey = new MaxComputeCacheKey(dbName, tblName, partitionSpec); + return tableRowCountCache.get(tablePartitionKey, loader); } public TablePartitionValues getCachedPartitionValues(MaxComputeCacheKey tablePartitionKey, - Callable loader) { - try { - return partitionValuesCache.get(tablePartitionKey, loader); - } catch (ExecutionException e) { - throw new RuntimeException("Fail to load partition values for table:" - + " '" + tablePartitionKey.getDbName() + "." + tablePartitionKey.getTblName() + "'"); - } + Function loader) { + return partitionValuesCache.get(tablePartitionKey, loader); } public void cleanUp() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeMetadataCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeMetadataCacheMgr.java index 6ee75eae26d747..270c47f0df6f6c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeMetadataCacheMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeMetadataCacheMgr.java @@ -23,7 +23,10 @@ public class MaxComputeMetadataCacheMgr { - private static final Map maxComputeMetadataCaches = Maps.newConcurrentMap(); + private final Map maxComputeMetadataCaches = Maps.newConcurrentMap(); + + public MaxComputeMetadataCacheMgr() { + } public MaxComputeMetadataCache getMaxComputeMetadataCache(long catalogId) { MaxComputeMetadataCache cache = maxComputeMetadataCaches.get(catalogId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java index 7946dd5e8a7cea..149bbe2d378817 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java @@ -17,39 +17,30 @@ package org.apache.doris.fs; +import org.apache.doris.common.CacheFactory; import org.apache.doris.common.Config; import org.apache.doris.common.Pair; -import org.apache.doris.common.util.CacheBulkLoader; -import org.apache.doris.datasource.CacheException; import org.apache.doris.fs.remote.RemoteFileSystem; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.LoadingCache; import org.apache.hadoop.mapred.JobConf; import java.util.Objects; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.OptionalLong; public class FileSystemCache { private LoadingCache fileSystemCache; - public FileSystemCache(ExecutorService executor) { - fileSystemCache = CacheBuilder.newBuilder().maximumSize(Config.max_remote_file_system_cache_num) - .expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES) - .build(new CacheBulkLoader() { - @Override - protected ExecutorService getExecutor() { - return executor; - } - - @Override - public RemoteFileSystem load(FileSystemCacheKey key) { - return loadFileSystem(key); - } - }); + public FileSystemCache() { + // no need to set refreshAfterWrite, because the FileSystem is created once and never changed + CacheFactory fsCacheFactory = new CacheFactory( + OptionalLong.of(86400L), + OptionalLong.empty(), + Config.max_remote_file_system_cache_num, + false, + null); + fileSystemCache = fsCacheFactory.buildCache(key -> loadFileSystem(key)); } private RemoteFileSystem loadFileSystem(FileSystemCacheKey key) { @@ -57,11 +48,7 @@ private RemoteFileSystem loadFileSystem(FileSystemCacheKey key) { } public RemoteFileSystem getRemoteFileSystem(FileSystemCacheKey key) { - try { - return fileSystemCache.get(key); - } catch (ExecutionException e) { - throw new CacheException("failed to get remote filesystem for type[%s]", e, key.type); - } + return fileSystemCache.get(key); } public static class FileSystemCacheKey { diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/ShowAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/ShowAction.java index 0aa5dd54d78b46..8156710aa1ec5b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/ShowAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/ShowAction.java @@ -324,7 +324,7 @@ public long getDataSizeOfDatabase(DatabaseIf db) { return totalSize; } - public Map getDataSizeOfTables(DatabaseIf db, String tableName, boolean singleReplica) { + private Map getDataSizeOfTables(DatabaseIf db, String tableName, boolean singleReplica) { Map oneEntry = Maps.newHashMap(); db.readLock(); try { @@ -348,7 +348,7 @@ public Map getDataSizeOfTables(DatabaseIf db, String tableName, bo return oneEntry; } - public Map getDataSizeOfTable(Table table, boolean singleReplica) { + private Map getDataSizeOfTable(Table table, boolean singleReplica) { Map oneEntry = Maps.newHashMap(); if (table.getType() == TableType.VIEW || table.getType() == TableType.ODBC) { oneEntry.put(table.getName(), 0L); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/GlobalVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/GlobalVariable.java index a8374a7b4406c0..30f91c6f8c1ac4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/GlobalVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/GlobalVariable.java @@ -56,6 +56,8 @@ public final class GlobalVariable { public static final String AUDIT_PLUGIN_MAX_BATCH_INTERVAL_SEC = "audit_plugin_max_batch_interval_sec"; public static final String AUDIT_PLUGIN_MAX_SQL_LENGTH = "audit_plugin_max_sql_length"; + public static final String ENABLE_GET_ROW_COUNT_FROM_FILE_LIST = "enable_get_row_count_from_file_list"; + @VariableMgr.VarAttr(name = VERSION_COMMENT, flag = VariableMgr.READ_ONLY) public static String versionComment = "Doris version " + Version.DORIS_BUILD_VERSION + "-" + Version.DORIS_BUILD_SHORT_HASH; @@ -125,6 +127,16 @@ public final class GlobalVariable { @VariableMgr.VarAttr(name = AUDIT_PLUGIN_MAX_SQL_LENGTH, flag = VariableMgr.GLOBAL) public static int auditPluginMaxSqlLength = 4096; + @VariableMgr.VarAttr(name = ENABLE_GET_ROW_COUNT_FROM_FILE_LIST, flag = VariableMgr.GLOBAL, + description = { + "针对外表,是否允许根据文件列表估算表行数。获取文件列表可能是一个耗时的操作," + + "如果不需要估算表行数或者对性能有影响,可以关闭该功能。", + "For external tables, whether to enable getting row count from file list. " + + "Getting file list may be a time-consuming operation. " + + "If you don't need to estimate the number of rows in the table " + + "or it affects performance, you can disable this feature."}) + public static boolean enable_get_row_count_from_file_list = true; + // Don't allow creating instance. private GlobalVariable() { } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/BasicAsyncCacheLoader.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/BasicAsyncCacheLoader.java index ac5896bb06c93d..59e1ad390d34b8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BasicAsyncCacheLoader.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BasicAsyncCacheLoader.java @@ -39,7 +39,9 @@ public abstract class BasicAsyncCacheLoader implements AsyncCacheLoader filesByPartitions - = getFilesForPartitions(table, partitionValues, samplePartitionSize); - long totalSize = 0; - // Calculate the total file size. - for (HiveMetaStoreCache.FileCacheValue files : filesByPartitions) { - for (HiveMetaStoreCache.HiveFileStatus file : files.getFiles()) { - totalSize += file.getLength(); - } - } - // Estimate row count: totalSize/estimatedRowSize - long estimatedRowSize = 0; - List partitionColumns = table.getPartitionColumns(); - for (Column column : table.getFullSchema()) { - // Partition column shouldn't count to the row size, because it is not in the data file. - if (partitionColumns != null && partitionColumns.contains(column)) { - continue; - } - estimatedRowSize += column.getDataType().getSlotSize(); - } - if (estimatedRowSize == 0) { - return 0; - } - if (samplePartitionSize < totalPartitionSize) { - totalSize = totalSize * totalPartitionSize / samplePartitionSize; - } - return totalSize / estimatedRowSize; - } - - public static HiveMetaStoreCache.HivePartitionValues getPartitionValuesForTable(HMSExternalTable table) { - if (table.isView()) { - return null; - } - HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() - .getMetaStoreCache((HMSExternalCatalog) table.getCatalog()); - List partitionColumnTypes = table.getPartitionColumnTypes(); - HiveMetaStoreCache.HivePartitionValues partitionValues = null; - // Get table partitions from cache. - if (!partitionColumnTypes.isEmpty()) { - // It is ok to get partition values from cache, - // no need to worry that this call will invalid or refresh the cache. - // because it has enough space to keep partition info of all tables in cache. - partitionValues = cache.getPartitionValues(table.getDbName(), table.getName(), partitionColumnTypes); - } - return partitionValues; - } - - public static List getFilesForPartitions( - HMSExternalTable table, HiveMetaStoreCache.HivePartitionValues partitionValues, int sampleSize) { - if (table.isView()) { - return Lists.newArrayList(); - } - HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() - .getMetaStoreCache((HMSExternalCatalog) table.getCatalog()); - List hivePartitions = Lists.newArrayList(); - if (partitionValues != null) { - Map idToPartitionItem = partitionValues.getIdToPartitionItem(); - int totalPartitionSize = idToPartitionItem.size(); - Collection partitionItems; - List> partitionValuesList; - // If partition number is too large, randomly choose part of them to estimate the whole table. - if (sampleSize > 0 && sampleSize < totalPartitionSize) { - List items = new ArrayList<>(idToPartitionItem.values()); - Collections.shuffle(items); - partitionItems = items.subList(0, sampleSize); - partitionValuesList = Lists.newArrayListWithCapacity(sampleSize); - } else { - partitionItems = idToPartitionItem.values(); - partitionValuesList = Lists.newArrayListWithCapacity(totalPartitionSize); - } - for (PartitionItem item : partitionItems) { - partitionValuesList.add(((ListPartitionItem) item).getItems().get(0).getPartitionValuesAsStringList()); - } - // get partitions without cache, so that it will not invalid the cache when executing - // non query request such as `show table status` - hivePartitions = cache.getAllPartitionsWithoutCache(table.getDbName(), table.getName(), - partitionValuesList); - } else { - hivePartitions.add(new HivePartition(table.getDbName(), table.getName(), true, - table.getRemoteTable().getSd().getInputFormat(), - table.getRemoteTable().getSd().getLocation(), null, Maps.newHashMap())); - } - // Get files for all partitions. - String bindBrokerName = table.getCatalog().bindBrokerName(); - return cache.getFilesByPartitionsWithoutCache(hivePartitions, bindBrokerName); - } - /** * Get Iceberg column statistics. * diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/CacheBulkLoaderTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/CacheBulkLoaderTest.java index 0cc54817b346c4..c89eafda408685 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/CacheBulkLoaderTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/CacheBulkLoaderTest.java @@ -19,15 +19,14 @@ import org.apache.doris.common.util.CacheBulkLoader; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; import org.apache.commons.collections.MapUtils; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -41,7 +40,7 @@ public void test() { ThreadPoolExecutor executor = ThreadPoolManager.newDaemonFixedThreadPool( 10, 10, "TestThreadPool", 120, true); - LoadingCache testCache = CacheBuilder.newBuilder().maximumSize(100) + LoadingCache testCache = Caffeine.newBuilder().maximumSize(100) .expireAfterAccess(1, TimeUnit.MINUTES) .build(new CacheBulkLoader() { @Override @@ -63,15 +62,10 @@ public String load(String key) { List testKeys = IntStream.range(1, 101).boxed() .map(i -> String.format("k%d", i)).collect(Collectors.toList()); - try { - Map vMap = testCache.getAll(testKeys); - Assertions.assertTrue(MapUtils.isNotEmpty(vMap) && vMap.size() == testKeys.size()); - for (String key : vMap.keySet()) { - Assertions.assertTrue(key.replace("k", "v").equals(vMap.get(key))); - } - } catch (ExecutionException e) { - e.printStackTrace(); - Assertions.fail(); + Map vMap = testCache.getAll(testKeys); + Assertions.assertTrue(MapUtils.isNotEmpty(vMap) && vMap.size() == testKeys.size()); + for (String key : vMap.keySet()) { + Assertions.assertTrue(key.replace("k", "v").equals(vMap.get(key))); } try { diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/CacheFactoryTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/CacheFactoryTest.java new file mode 100644 index 00000000000000..60b73d6d5c8d4f --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/common/CacheFactoryTest.java @@ -0,0 +1,291 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common; + +import org.apache.doris.statistics.BasicAsyncCacheLoader; + +import com.github.benmanes.caffeine.cache.AsyncLoadingCache; +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.google.common.collect.Lists; +import com.google.common.testing.FakeTicker; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +public class CacheFactoryTest { + + private ExecutorService executor; + + public static class CacheValue { + private final String value; + + public CacheValue(String value) { + this.value = value; + } + + public String getValue() { + return value; + } + + public static CacheValue createValue(String val, AtomicLong counter) { + try { + System.out.println("before create value: " + val + ", Thread: " + Thread.currentThread().getName()); + Thread.sleep(2000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + System.out.println("create value: " + val); + counter.incrementAndGet(); + return new CacheValue(val); + } + } + + public static class CacheValueLoader extends BasicAsyncCacheLoader> { + + private AtomicLong counter; + + public CacheValueLoader(AtomicLong counter) { + this.counter = counter; + } + + @Override + protected Optional doLoad(Integer key) { + return Optional.of(CacheValue.createValue("value" + key, counter)); + } + } + + public static class LoaderRunner implements Runnable { + private final AsyncLoadingCache> cache; + private final int key; + private final AtomicLong counter; + + public LoaderRunner(AsyncLoadingCache> cache, int key, AtomicLong counter) { + this.cache = cache; + this.key = key; + this.counter = counter; + } + + @Override + public void run() { + try { + CompletableFuture> cacheValue = cache.get(key); + System.out.println("key: " + key + ", value: " + cacheValue.get().get().getValue()); + } catch (RejectedExecutionException e) { + counter.incrementAndGet(); + throw e; + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + } + + @BeforeEach + public void setUp() { + executor = ThreadPoolManager.newDaemonFixedThreadPool( + 10, 1, + "testCacheFactory", 0, false); + } + + @Test + public void testLoadingCacheWithoutExpireAfterWrite() throws InterruptedException { + FakeTicker ticker = new FakeTicker(); + AtomicLong counter = new AtomicLong(0); + CacheFactory cacheFactory = new CacheFactory( + OptionalLong.empty(), + OptionalLong.of(10), + 1000, + false, + ticker::read); + LoadingCache loadingCache = cacheFactory.buildCache( + key -> CacheValue.createValue("value" + key, counter), executor); + CacheValue value = loadingCache.get(1); + Assertions.assertEquals("value1", value.getValue()); + Assertions.assertEquals(1, counter.get()); + value = loadingCache.get(1); + Assertions.assertEquals("value1", value.getValue()); + Assertions.assertEquals(1, counter.get()); + // advance 11 seconds to pass the refreshAfterWrite + ticker.advance(11, TimeUnit.SECONDS); + // trigger refresh + value = loadingCache.get(1); + // refresh in background, so still get value1 + Assertions.assertEquals("value1", value.getValue()); + // sleep longer to wait for refresh + Thread.sleep(2500); + value = loadingCache.get(1); + Assertions.assertEquals("value1", value.getValue()); + Assertions.assertEquals(2, counter.get()); + } + + @Test + public void testLoadingCacheWithExpireAfterWrite() throws InterruptedException { + FakeTicker ticker = new FakeTicker(); + AtomicLong counter = new AtomicLong(0); + CacheFactory cacheFactory = new CacheFactory( + OptionalLong.of(60L), + OptionalLong.of(10), + 1000, + false, + ticker::read); + LoadingCache loadingCache = cacheFactory.buildCache( + key -> CacheValue.createValue("value" + key, counter), executor); + CacheValue value = loadingCache.get(1); + Assertions.assertEquals("value1", value.getValue()); + Assertions.assertEquals(1, counter.get()); + value = loadingCache.get(1); + Assertions.assertEquals("value1", value.getValue()); + Assertions.assertEquals(1, counter.get()); + // advance 11 seconds to pass the refreshAfterWrite + ticker.advance(11, TimeUnit.SECONDS); + // trigger refresh + value = loadingCache.get(1); + // refresh in background, so still get value1 + Assertions.assertEquals("value1", value.getValue()); + // sleep longer to wait for refresh + Thread.sleep(2500); + value = loadingCache.get(1); + Assertions.assertEquals("value1", value.getValue()); + // refreshed, so counter +1 + Assertions.assertEquals(2, counter.get()); + // advance 61 seconds to pass the expireAfterWrite + ticker.advance(61, TimeUnit.SECONDS); + value = loadingCache.get(1); + Assertions.assertEquals("value1", value.getValue()); + // expired, so counter +1 + Assertions.assertEquals(3, counter.get()); + } + + @Test + public void testLoadingCacheWithoutRefreshAfterWrite() throws InterruptedException { + FakeTicker ticker = new FakeTicker(); + AtomicLong counter = new AtomicLong(0); + CacheFactory cacheFactory = new CacheFactory( + OptionalLong.of(60L), + OptionalLong.empty(), + 1000, + false, + ticker::read); + LoadingCache loadingCache = cacheFactory.buildCache( + key -> CacheValue.createValue("value" + key, counter), executor); + CacheValue value = loadingCache.get(1); + Assertions.assertEquals("value1", value.getValue()); + Assertions.assertEquals(1, counter.get()); + // advance 30 seconds, key still not expired + ticker.advance(30, TimeUnit.SECONDS); + value = loadingCache.get(1); + Assertions.assertEquals("value1", value.getValue()); + Assertions.assertEquals(1, counter.get()); + // advance 31 seconds to pass the expireAfterWrite + ticker.advance(31, TimeUnit.SECONDS); + value = loadingCache.get(1); + Assertions.assertEquals("value1", value.getValue()); + // expired, so counter +1 + Assertions.assertEquals(2, counter.get()); + } + + @Test + public void testAsyncLoadingCacheWithExpireAfterWrite() throws InterruptedException, ExecutionException { + FakeTicker ticker = new FakeTicker(); + AtomicLong counter = new AtomicLong(0); + CacheFactory cacheFactory = new CacheFactory( + OptionalLong.of(60L), + OptionalLong.of(10), + 1000, + false, + ticker::read); + CacheValueLoader loader = new CacheValueLoader(counter); + AsyncLoadingCache> loadingCache = cacheFactory.buildAsyncCache(loader, executor); + CompletableFuture> futureValue = loadingCache.get(1); + Assertions.assertFalse(futureValue.isDone()); + Assertions.assertEquals("value1", futureValue.get().get().getValue()); + Assertions.assertEquals(1, counter.get()); + futureValue = loadingCache.get(1); + Assertions.assertTrue(futureValue.isDone()); + Assertions.assertEquals("value1", futureValue.get().get().getValue()); + Assertions.assertEquals(1, counter.get()); + // advance 11 seconds to pass the refreshAfterWrite + ticker.advance(11, TimeUnit.SECONDS); + // trigger refresh + futureValue = loadingCache.get(1); + // refresh in background, so still get value1 + Assertions.assertTrue(futureValue.isDone()); + Assertions.assertEquals("value1", futureValue.get().get().getValue()); + // sleep longer to wait for refresh + Thread.sleep(2500); + futureValue = loadingCache.get(1); + Assertions.assertEquals("value1", futureValue.get().get().getValue()); + // refreshed, so counter +1 + Assertions.assertEquals(2, counter.get()); + // advance 61 seconds to pass the expireAfterWrite + ticker.advance(61, TimeUnit.SECONDS); + futureValue = loadingCache.get(1); + Assertions.assertFalse(futureValue.isDone()); + Assertions.assertEquals("value1", futureValue.get().get().getValue()); + // expired, so counter +1 + Assertions.assertEquals(3, counter.get()); + } + + @Test + public void testTooManyGetRequests() { + ExecutorService executor = ThreadPoolManager.newDaemonFixedThreadPool( + 10, 1, + "testCacheFactory", 0, false); + FakeTicker ticker = new FakeTicker(); + AtomicLong counter = new AtomicLong(0); + CacheFactory cacheFactory = new CacheFactory( + OptionalLong.of(60L), + OptionalLong.of(10), + 1000, + false, + ticker::read); + CacheValueLoader loader = new CacheValueLoader(counter); + AsyncLoadingCache> loadingCache = cacheFactory.buildAsyncCache(loader, executor); + + AtomicLong rejectCounter = new AtomicLong(0); + List threads = Lists.newArrayList(); + for (int i = 0; i < 12; i++) { + Thread thread = new Thread(new LoaderRunner(loadingCache, i, rejectCounter)); + threads.add(thread); + thread.start(); + } + + for (Thread thread : threads) { + try { + thread.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + // The thread pool size is 10, add 1 queue size. + // So there will be 11 threads executed, and 1 thread will be rejected. + Assertions.assertTrue(counter.get() < 12); + Assertions.assertTrue(rejectCounter.get() >= 1); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java index 2184e2ab37fda1..5faa9952b9502f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java @@ -50,6 +50,7 @@ import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.hive.HiveMetaStoreCache; import org.apache.doris.datasource.hive.HiveMetaStoreCache.FileCacheKey; +import org.apache.doris.datasource.hive.HiveMetaStoreCache.FileCacheValue; import org.apache.doris.datasource.hive.HiveMetaStoreCache.HivePartitionValues; import org.apache.doris.datasource.hive.HiveMetaStoreCache.PartitionValueCacheKey; import org.apache.doris.mysql.privilege.Auth; @@ -60,8 +61,8 @@ import org.apache.doris.qe.ShowResultSet; import org.apache.doris.utframe.TestWithFeService; +import com.github.benmanes.caffeine.cache.LoadingCache; import com.google.common.base.Preconditions; -import com.google.common.cache.LoadingCache; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Range; @@ -742,7 +743,7 @@ public void testAlterFileCache() throws Exception { HMSExternalCatalog hiveCatalog = (HMSExternalCatalog) mgr.getCatalog(catalogName); HiveMetaStoreCache metaStoreCache = externalMetaCacheMgr.getMetaStoreCache(hiveCatalog); - LoadingCache preFileCache = metaStoreCache.getFileCacheRef().get(); + LoadingCache preFileCache = metaStoreCache.getFileCacheRef().get(); // 1. properties contains `file.meta.cache.ttl-second`, it should not be equal @@ -759,8 +760,6 @@ public void testAlterFileCache() throws Exception { + " (\"type\" = \"hms\", \"hive.metastore.uris\" = \"thrift://172.16.5.9:9083\");"; mgr.alterCatalogProps((AlterCatalogPropertyStmt) parseAndAnalyzeStmt(alterCatalogProp)); Assertions.assertEquals(preFileCache, metaStoreCache.getFileCacheRef().get()); - - } @Test diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalRowCountCacheTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalRowCountCacheTest.java deleted file mode 100644 index e8622f6b59a239..00000000000000 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalRowCountCacheTest.java +++ /dev/null @@ -1,100 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.datasource; - -import org.apache.doris.datasource.ExternalRowCountCache.RowCountKey; -import org.apache.doris.statistics.BasicAsyncCacheLoader; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.util.Optional; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicLong; - -public class ExternalRowCountCacheTest { - private ExternalRowCountCache cache; - private ExecutorService executorService; - - public static class TestLoader extends BasicAsyncCacheLoader> { - - private AtomicLong incr = new AtomicLong(333); - - @Override - protected Optional doLoad(RowCountKey rowCountKey) { - if (rowCountKey.getTableId() == 1) { - return Optional.of(111L); - } else if (rowCountKey.getTableId() == 2) { - return Optional.of(222L); - } else if (rowCountKey.getTableId() == 3) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - System.out.println("load: " + incr.get()); - return Optional.of(incr.incrementAndGet()); - } - return Optional.empty(); - } - } - - @BeforeEach - public void setUp() { - executorService = Executors.newFixedThreadPool(2); - cache = new ExternalRowCountCache(executorService, 2, new TestLoader()); - } - - @Test - public void test() throws Exception { - // table 1 - long rowCount = cache.getCachedRowCount(1, 1, 1); - Assertions.assertEquals(0, rowCount); - Thread.sleep(1000); - rowCount = cache.getCachedRowCount(1, 1, 1); - Assertions.assertEquals(111, rowCount); - - // table 2 - rowCount = cache.getCachedRowCount(1, 1, 2); - Assertions.assertEquals(0, rowCount); - Thread.sleep(1000); - rowCount = cache.getCachedRowCount(1, 1, 2); - Assertions.assertEquals(222, rowCount); - - // table 3 - rowCount = cache.getCachedRowCount(1, 1, 3); - // first get, it should be 0 because the loader is async - Assertions.assertEquals(0, rowCount); - // After sleep 2 sec and then get, it should be 1 - Thread.sleep(2000); - rowCount = cache.getCachedRowCount(1, 1, 3); - Assertions.assertEquals(334, rowCount); - // sleep 3 sec to trigger refresh - Thread.sleep(3000); - rowCount = cache.getCachedRowCount(1, 1, 3); - // the refresh will be triggered only when query it, so it should still be 1 - Assertions.assertEquals(334, rowCount); - // sleep 2 sec to wait for the doLoad - Thread.sleep(2000); - rowCount = cache.getCachedRowCount(1, 1, 3); - // refresh done, value should be 2 - Assertions.assertEquals(335, rowCount); - } -} diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/HMSCachedClientTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/TestHMSCachedClient.java similarity index 99% rename from fe/fe-core/src/test/java/org/apache/doris/datasource/HMSCachedClientTest.java rename to fe/fe-core/src/test/java/org/apache/doris/datasource/TestHMSCachedClient.java index 126df780dbf16a..3927c0db52460f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/HMSCachedClientTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/TestHMSCachedClient.java @@ -46,7 +46,7 @@ import java.util.function.Function; import java.util.stream.Collectors; -public class HMSCachedClientTest implements HMSCachedClient { +public class TestHMSCachedClient implements HMSCachedClient { public Map> partitions = new ConcurrentHashMap<>(); public Map> tables = new HashMap<>(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java index 54bbf5eca3f12b..dedc7738c8601c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java @@ -19,7 +19,7 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.PrimitiveType; -import org.apache.doris.datasource.HMSCachedClientTest; +import org.apache.doris.datasource.TestHMSCachedClient; import org.apache.doris.fs.LocalDfsFileSystem; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.THiveLocationParams; @@ -90,7 +90,7 @@ public static void createTestHiveCatalog() throws IOException { entries.set("hive.metastore.uris", uri); hmsClient = new ThriftHMSCachedClient(entries, 2); } else { - hmsClient = new HMSCachedClientTest(); + hmsClient = new TestHMSCachedClient(); } hmsOps = new HiveMetadataOps(null, hmsClient, fs); } diff --git a/fe/pom.xml b/fe/pom.xml index a15b2709a1b4b8..e30fdc2f6cb6a4 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -680,6 +680,11 @@ under the License. guava ${guava.version} + + com.google.guava + guava-testlib + ${guava.version} + com.fasterxml.jackson.core