Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1314,12 +1314,18 @@ public class Config extends ConfigBase {
* Minimum interval between last version when caching results,
* This parameter distinguishes between offline and real-time updates
*/
@ConfField(mutable = true, masterOnly = false)
public static int cache_last_version_interval_second = 30;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add description in annotation


/**
* Expire sql sql in frontend time
*/
@ConfField(
mutable = true,
masterOnly = false,
callbackClassString = "org.apache.doris.common.NereidsSqlCacheManager$UpdateConfig"
)
public static int cache_last_version_interval_second = 30;
public static int expire_sql_cache_in_fe_second = 300;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add description in annotation


/**
* Set the maximum number of rows that can be cached
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -780,9 +780,7 @@ public Env(boolean isCheckpointCatalog) {
this.mtmvService = new MTMVService();
this.insertOverwriteManager = new InsertOverwriteManager();
this.dnsCache = new DNSCache();
this.sqlCacheManager = new NereidsSqlCacheManager(
Config.sql_cache_manage_num, Config.cache_last_version_interval_second
);
this.sqlCacheManager = new NereidsSqlCacheManager();
}

public static void destroyCheckpoint() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,11 @@ public class NereidsSqlCacheManager {
// value: SqlCacheContext
private volatile Cache<String, SqlCacheContext> sqlCaches;

public NereidsSqlCacheManager(int sqlCacheNum, long cacheIntervalSeconds) {
sqlCaches = buildSqlCaches(sqlCacheNum, cacheIntervalSeconds);
public NereidsSqlCacheManager() {
sqlCaches = buildSqlCaches(
Config.sql_cache_manage_num,
Config.expire_sql_cache_in_fe_second
);
}

public static synchronized void updateConfig() {
Expand All @@ -90,22 +93,24 @@ public static synchronized void updateConfig() {

Cache<String, SqlCacheContext> sqlCaches = buildSqlCaches(
Config.sql_cache_manage_num,
Config.cache_last_version_interval_second
Config.expire_sql_cache_in_fe_second
);
sqlCaches.putAll(sqlCacheManager.sqlCaches.asMap());
sqlCacheManager.sqlCaches = sqlCaches;
}

private static Cache<String, SqlCacheContext> buildSqlCaches(int sqlCacheNum, long cacheIntervalSeconds) {
sqlCacheNum = sqlCacheNum < 0 ? 100 : sqlCacheNum;
cacheIntervalSeconds = cacheIntervalSeconds < 0 ? 30 : cacheIntervalSeconds;

return Caffeine.newBuilder()
.maximumSize(sqlCacheNum)
.expireAfterAccess(Duration.ofSeconds(cacheIntervalSeconds))
private static Cache<String, SqlCacheContext> buildSqlCaches(int sqlCacheNum, long expireAfterAccessSeconds) {
Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder()
// auto evict cache when jvm memory too low
.softValues()
.build();
.softValues();
if (sqlCacheNum > 0) {
cacheBuilder = cacheBuilder.maximumSize(sqlCacheNum);
}
if (expireAfterAccessSeconds > 0) {
cacheBuilder = cacheBuilder.expireAfterAccess(Duration.ofSeconds(expireAfterAccessSeconds));
}

return cacheBuilder.build();
}

/** tryAddFeCache */
Expand Down Expand Up @@ -237,9 +242,6 @@ public Optional<LogicalSqlCache> tryParseSql(ConnectContext connectContext, Stri
}

private boolean tablesOrDataChanged(Env env, SqlCacheContext sqlCacheContext) {
long latestPartitionTime = sqlCacheContext.getLatestPartitionTime();
long latestPartitionVersion = sqlCacheContext.getLatestPartitionVersion();

if (sqlCacheContext.hasUnsupportedTables()) {
return true;
}
Expand All @@ -255,7 +257,7 @@ private boolean tablesOrDataChanged(Env env, SqlCacheContext sqlCacheContext) {
long cacheTableTime = scanTable.latestTimestamp;
long currentTableVersion = olapTable.getVisibleVersion();
long cacheTableVersion = scanTable.latestVersion;
// some partitions have been dropped, or delete or update or insert rows into new partition?
// some partitions have been dropped, or delete or updated or replaced, or insert rows into new partition?
if (currentTableTime > cacheTableTime
|| (currentTableTime == cacheTableTime && currentTableVersion > cacheTableVersion)) {
return true;
Expand All @@ -264,9 +266,7 @@ private boolean tablesOrDataChanged(Env env, SqlCacheContext sqlCacheContext) {
for (Long scanPartitionId : scanTable.getScanPartitions()) {
Partition partition = olapTable.getPartition(scanPartitionId);
// partition == null: is this partition truncated?
if (partition == null || partition.getVisibleVersionTime() > latestPartitionTime
|| (partition.getVisibleVersionTime() == latestPartitionTime
&& partition.getVisibleVersion() > latestPartitionVersion)) {
if (partition == null) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -701,11 +701,11 @@ private CacheTable buildCacheTableForOlapScanNode(OlapScanNode node) {
scanTables.add(scanTable);
for (Long partitionId : node.getSelectedPartitionIds()) {
Partition partition = olapTable.getPartition(partitionId);
scanTable.addScanPartition(partitionId);
if (partition.getVisibleVersionTime() >= cacheTable.latestPartitionTime) {
cacheTable.latestPartitionId = partition.getId();
cacheTable.latestPartitionTime = partition.getVisibleVersionTime();
cacheTable.latestPartitionVersion = partition.getVisibleVersion();
scanTable.addScanPartition(partitionId);
}
}
return cacheTable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ suite("parse_sql_from_sql_cache") {
}
}


sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')"

combineFutures(
Expand Down Expand Up @@ -704,6 +703,43 @@ suite("parse_sql_from_sql_cache") {
assertHasCache "select * from test_use_plan_cache20 where id=999"
def result6 = sql "select * from test_use_plan_cache20 where id=999"
assertTrue(result6.isEmpty())
}),
extraThread("test_truncate_partition", {
sql "drop table if exists test_use_plan_cache21"
sql """create table test_use_plan_cache21 (
id int,
dt int
)
partition by range(dt)
(
partition dt1 values [('1'), ('2')),
partition dt2 values [('2'), ('3'))
)
distributed by hash(id)
properties('replication_num'='1')"""



sql "insert into test_use_plan_cache21 values('2', '2')"
sleep(100)
sql "insert into test_use_plan_cache21 values('1', '1')"

// after partition changed 10s, the sql cache can be used
sleep(10000)

sql "set enable_nereids_planner=true"
sql "set enable_fallback_to_original_planner=false"
sql "set enable_sql_cache=true"

assertNoCache "select * from test_use_plan_cache21"
def result1 = sql "select * from test_use_plan_cache21"
assertTrue(result1.size() == 2)
assertHasCache "select * from test_use_plan_cache21"

sql "truncate table test_use_plan_cache21 partition dt2"
assertNoCache "select * from test_use_plan_cache21"
def result2 = sql "select * from test_use_plan_cache21"
assertTrue(result2.size() == 1)
})
).get()
}