Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions fe/fe-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@ under the License.
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava-testlib</artifactId>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
Expand Down
112 changes: 112 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/common/CacheFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.common;

import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.Ticker;
import org.jetbrains.annotations.NotNull;

import java.time.Duration;
import java.util.OptionalLong;
import java.util.concurrent.ExecutorService;

/**
* Factory to create Caffeine cache.
* <p>
* This class is used to create Caffeine cache with specified parameters.
* It is used to create both sync and async cache.
* The cache is created with the following parameters:
* - expireAfterWriteSec: The duration after which the cache entries will expire.
* - refreshAfterWriteSec: The duration after which the cache entries will be refreshed.
* - maxSize: The maximum size of the cache.
* - enableStats: Whether to enable stats for the cache.
* - ticker: The ticker to use for the cache.
* The cache can be created with the above parameters using the buildCache and buildAsyncCache methods.
* </p>
*/
public class CacheFactory {

private OptionalLong expireAfterWriteSec;
private OptionalLong refreshAfterWriteSec;
private long maxSize;
private boolean enableStats;
// Ticker is used to provide a time source for the cache.
// Only used for test, to provide a fake time source.
// If not provided, the system time is used.
private Ticker ticker;

public CacheFactory(
OptionalLong expireAfterWriteSec,
OptionalLong refreshAfterWriteSec,
long maxSize,
boolean enableStats,
Ticker ticker) {
this.expireAfterWriteSec = expireAfterWriteSec;
this.refreshAfterWriteSec = refreshAfterWriteSec;
this.maxSize = maxSize;
this.enableStats = enableStats;
this.ticker = ticker;
}

// Build a loading cache, without executor, it will use fork-join pool for refresh
public <K, V> LoadingCache<K, V> buildCache(CacheLoader<K, V> cacheLoader) {
Caffeine<Object, Object> builder = buildWithParams();
return builder.build(cacheLoader);
}

// Build a loading cache, with executor, it will use given executor for refresh
public <K, V> LoadingCache<K, V> buildCache(CacheLoader<K, V> cacheLoader, ExecutorService executor) {
Caffeine<Object, Object> builder = buildWithParams();
builder.executor(executor);
return builder.build(cacheLoader);
}

// Build an async loading cache
public <K, V> AsyncLoadingCache<K, V> buildAsyncCache(AsyncCacheLoader<K, V> cacheLoader,
ExecutorService executor) {
Caffeine<Object, Object> builder = buildWithParams();
builder.executor(executor);
return builder.buildAsync(cacheLoader);
}

@NotNull
private Caffeine<Object, Object> buildWithParams() {
Caffeine<Object, Object> builder = Caffeine.newBuilder();
builder.maximumSize(maxSize);

if (expireAfterWriteSec.isPresent()) {
builder.expireAfterWrite(Duration.ofSeconds(expireAfterWriteSec.getAsLong()));
}
if (refreshAfterWriteSec.isPresent()) {
builder.refreshAfterWrite(Duration.ofSeconds(refreshAfterWriteSec.getAsLong()));
}

if (enableStats) {
builder.recordStats();
}

if (ticker != null) {
builder.ticker(ticker);
}
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.doris.common.Pair;

import com.google.common.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.CacheLoader;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Streams;
Expand All @@ -31,7 +31,7 @@
import java.util.concurrent.Future;
import java.util.stream.Collectors;

public abstract class CacheBulkLoader<K, V> extends CacheLoader<K, V> {
public abstract class CacheBulkLoader<K, V> implements CacheLoader<K, V> {

protected abstract ExecutorService getExecutor();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public abstract class ExternalCatalog
@SerializedName(value = "catalogProperty")
protected CatalogProperty catalogProperty;
@SerializedName(value = "initialized")
private boolean initialized = false;
protected boolean initialized = false;
@SerializedName(value = "idToDb")
protected Map<Long, ExternalDatabase<? extends ExternalTable>> idToDb = Maps.newConcurrentMap();
@SerializedName(value = "lastUpdateTime")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,35 @@
public class ExternalMetaCacheMgr {
private static final Logger LOG = LogManager.getLogger(ExternalMetaCacheMgr.class);

/**
* Executors for loading caches
* 1. rowCountRefreshExecutor
* For row count cache.
* Row count cache is an async loading cache, and we can ignore the result
* if cache missing or thread pool is full.
* So use a separate executor for this cache.
* <p>
* 2. commonRefreshExecutor
* For other caches. Other caches are sync loading cache.
* But commonRefreshExecutor will be used for async refresh.
* That is, if cache entry is missing, the cache value will be loaded in caller thread, sychronously.
* if cache entry need refresh, it will be reloaded in commonRefreshExecutor.
* <p>
* 3. fileListingExecutor
* File listing is a heavy operation, so use a separate executor for it.
* For fileCache, the refresh operation will still use commonRefreshExecutor to trigger refresh.
* And fileListingExecutor will be used to list file.
*/
private ExecutorService rowCountRefreshExecutor;
private ExecutorService commonRefreshExecutor;
private ExecutorService fileListingExecutor;

// catalog id -> HiveMetaStoreCache
private final Map<Long, HiveMetaStoreCache> cacheMap = Maps.newConcurrentMap();
// catalog id -> table schema cache
private Map<Long, ExternalSchemaCache> schemaCacheMap = Maps.newHashMap();
// hudi partition manager
private final HudiPartitionMgr hudiPartitionMgr;
private ExecutorService executor;
// all catalogs could share the same fsCache.
private FileSystemCache fsCache;
// all external table row count cache.
Expand All @@ -65,24 +87,42 @@ public class ExternalMetaCacheMgr {
private final MaxComputeMetadataCacheMgr maxComputeMetadataCacheMgr;

public ExternalMetaCacheMgr() {
executor = ThreadPoolManager.newDaemonFixedThreadPool(
rowCountRefreshExecutor = ThreadPoolManager.newDaemonFixedThreadPool(
Config.max_external_cache_loader_thread_pool_size,
Config.max_external_cache_loader_thread_pool_size,
"RowCountRefreshExecutor", 0, true);

commonRefreshExecutor = ThreadPoolManager.newDaemonFixedThreadPool(
Config.max_external_cache_loader_thread_pool_size,
Config.max_external_cache_loader_thread_pool_size * 1000,
"ExternalMetaCacheMgr", 120, true);
hudiPartitionMgr = HudiPartitionMgr.get(executor);
fsCache = new FileSystemCache(executor);
rowCountCache = new ExternalRowCountCache(executor,
Config.external_cache_expire_time_minutes_after_access * 60, null);
icebergMetadataCacheMgr = new IcebergMetadataCacheMgr();
"CommonRefreshExecutor", 10, true);

// The queue size should be large enough,
// because there may be thousands of partitions being queried at the same time.
fileListingExecutor = ThreadPoolManager.newDaemonFixedThreadPool(
Config.max_external_cache_loader_thread_pool_size,
Config.max_external_cache_loader_thread_pool_size * 1000,
"FileListingExecutor", 10, true);

fsCache = new FileSystemCache();
rowCountCache = new ExternalRowCountCache(rowCountRefreshExecutor);

hudiPartitionMgr = new HudiPartitionMgr(commonRefreshExecutor);
icebergMetadataCacheMgr = new IcebergMetadataCacheMgr(commonRefreshExecutor);
maxComputeMetadataCacheMgr = new MaxComputeMetadataCacheMgr();
}

public ExecutorService getFileListingExecutor() {
return fileListingExecutor;
}

public HiveMetaStoreCache getMetaStoreCache(HMSExternalCatalog catalog) {
HiveMetaStoreCache cache = cacheMap.get(catalog.getId());
if (cache == null) {
synchronized (cacheMap) {
if (!cacheMap.containsKey(catalog.getId())) {
cacheMap.put(catalog.getId(), new HiveMetaStoreCache(catalog, executor));
cacheMap.put(catalog.getId(),
new HiveMetaStoreCache(catalog, commonRefreshExecutor, fileListingExecutor));
}
cache = cacheMap.get(catalog.getId());
}
Expand All @@ -95,7 +135,7 @@ public ExternalSchemaCache getSchemaCache(ExternalCatalog catalog) {
if (cache == null) {
synchronized (schemaCacheMap) {
if (!schemaCacheMap.containsKey(catalog.getId())) {
schemaCacheMap.put(catalog.getId(), new ExternalSchemaCache(catalog));
schemaCacheMap.put(catalog.getId(), new ExternalSchemaCache(catalog, commonRefreshExecutor));
}
cache = schemaCacheMap.get(catalog.getId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@
package org.apache.doris.datasource;

import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.CacheFactory;
import org.apache.doris.common.Config;
import org.apache.doris.statistics.BasicAsyncCacheLoader;
import org.apache.doris.statistics.util.StatisticsUtil;

import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import lombok.Getter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.time.Duration;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;

Expand All @@ -38,16 +38,16 @@ public class ExternalRowCountCache {
private static final Logger LOG = LogManager.getLogger(ExternalRowCountCache.class);
private final AsyncLoadingCache<RowCountKey, Optional<Long>> rowCountCache;

public ExternalRowCountCache(ExecutorService executor, long refreshAfterWriteSeconds,
BasicAsyncCacheLoader<RowCountKey, Optional<Long>> loader) {
public ExternalRowCountCache(ExecutorService executor) {
// 1. set expireAfterWrite to 1 day, avoid too many entries
// 2. set refreshAfterWrite to 10min(default), so that the cache will be refreshed after 10min
rowCountCache = Caffeine.newBuilder()
.maximumSize(Config.max_external_table_row_count_cache_num)
.expireAfterAccess(Duration.ofDays(1))
.refreshAfterWrite(Duration.ofSeconds(refreshAfterWriteSeconds))
.executor(executor)
.buildAsync(loader == null ? new RowCountCacheLoader() : loader);
CacheFactory rowCountCacheFactory = new CacheFactory(
OptionalLong.of(86400L),
OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60),
Config.max_external_table_row_count_cache_num,
false,
null);
rowCountCache = rowCountCacheFactory.buildAsyncCache(new RowCountCacheLoader(), executor);
}

@Getter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,24 @@
package org.apache.doris.datasource;

import org.apache.doris.catalog.Column;
import org.apache.doris.common.CacheFactory;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.Util;
import org.apache.doris.metric.GaugeMetric;
import org.apache.doris.metric.Metric;
import org.apache.doris.metric.MetricLabel;
import org.apache.doris.metric.MetricRepo;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import lombok.Data;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutorService;

// The schema cache for external table
public class ExternalSchemaCache {
Expand All @@ -46,21 +44,20 @@ public class ExternalSchemaCache {

private LoadingCache<SchemaCacheKey, ImmutableList<Column>> schemaCache;

public ExternalSchemaCache(ExternalCatalog catalog) {
public ExternalSchemaCache(ExternalCatalog catalog, ExecutorService executor) {
this.catalog = catalog;
init();
init(executor);
initMetrics();
}

private void init() {
schemaCache = CacheBuilder.newBuilder().maximumSize(Config.max_external_schema_cache_num)
.expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES)
.build(new CacheLoader<SchemaCacheKey, ImmutableList<Column>>() {
@Override
public ImmutableList<Column> load(SchemaCacheKey key) {
return loadSchema(key);
}
});
private void init(ExecutorService executor) {
CacheFactory schemaCacheeFactory = new CacheFactory(
OptionalLong.of(86400L),
OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60),
Config.max_external_schema_cache_num,
false,
null);
schemaCache = schemaCacheeFactory.buildCache(key -> loadSchema(key), executor);
}

private void initMetrics() {
Expand All @@ -69,7 +66,7 @@ private void initMetrics() {
Metric.MetricUnit.NOUNIT, "external schema cache number") {
@Override
public Long getValue() {
return schemaCache.size();
return schemaCache.estimatedSize();
}
};
schemaCacheGauge.addLabel(new MetricLabel("catalog", catalog.getName()));
Expand All @@ -86,12 +83,7 @@ private ImmutableList<Column> loadSchema(SchemaCacheKey key) {

public List<Column> getSchema(String dbName, String tblName) {
SchemaCacheKey key = new SchemaCacheKey(dbName, tblName);
try {
return schemaCache.get(key);
} catch (ExecutionException e) {
throw new CacheException("failed to get schema for %s in catalog %s. err: %s",
e, key, catalog.getName(), Util.getRootCauseMessage(e));
}
return schemaCache.get(key);
}

public void addSchemaForTest(String dbName, String tblName, ImmutableList<Column> schema) {
Expand Down
Loading