From 7ba99609bc41e1e21efa29cb05bbf860c321989e Mon Sep 17 00:00:00 2001 From: aiborodin Date: Wed, 25 Jun 2025 18:07:58 +1000 Subject: [PATCH] Replace Caffeine maxSize cache with LRUCache We recently discovered that LRUCache, based on LinkedHashMap, performs almost twice as fast as the Caffeine max size cache. Let's replace the caffeine cache to optimise the performance. --- .../flink/sink/dynamic/DynamicWriter.java | 8 +++--- .../flink/sink/dynamic/HashKeyGenerator.java | 11 ++++---- .../iceberg/flink/sink/dynamic/LRUCache.java | 4 +++ .../sink/dynamic/TableMetadataCache.java | 26 +++++++++---------- .../sink/dynamic/TableSerializerCache.java | 10 +++---- .../sink/dynamic/TestHashKeyGenerator.java | 14 ++++------ .../sink/dynamic/TestTableMetadataCache.java | 4 +-- .../dynamic/TestTableSerializerCache.java | 2 +- .../flink/sink/dynamic/TestTableUpdater.java | 9 ++++--- 9 files changed, 40 insertions(+), 48 deletions(-) diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java index c4c4e61de151..341bdac03ca9 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java @@ -18,8 +18,6 @@ */ package org.apache.iceberg.flink.sink.dynamic; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; import java.io.IOException; import java.util.Collection; import java.util.Collections; @@ -56,7 +54,7 @@ class DynamicWriter implements CommittingSinkWriter taskWriterFactories; + private final Map taskWriterFactories; private final Map> writers; private final DynamicWriterMetrics metrics; private final int subTaskId; @@ -82,7 +80,7 @@ class DynamicWriter implements CommittingSinkWriter(cacheMaximumSize); this.writers = Maps.newHashMap(); LOG.debug("DynamicIcebergSinkWriter created for subtask {} attemptId {}", subTaskId, attemptId); @@ -102,7 +100,7 @@ public void write(DynamicRecordInternal element, Context context) element.equalityFields()), writerKey -> { RowDataTaskWriterFactory taskWriterFactory = - taskWriterFactories.get( + taskWriterFactories.computeIfAbsent( writerKey, factoryKey -> { Table table = diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java index d0909e0605d4..91aa4a91710c 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java @@ -20,9 +20,8 @@ import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; import java.util.Collections; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -62,11 +61,11 @@ class HashKeyGenerator { private static final Logger LOG = LoggerFactory.getLogger(HashKeyGenerator.class); private final int maxWriteParallelism; - private final Cache> keySelectorCache; + private final Map> keySelectorCache; HashKeyGenerator(int maxCacheSize, int maxWriteParallelism) { this.maxWriteParallelism = maxWriteParallelism; - this.keySelectorCache = Caffeine.newBuilder().maximumSize(maxCacheSize).build(); + this.keySelectorCache = new LRUCache<>(maxCacheSize); } int generateKey(DynamicRecord dynamicRecord) throws Exception { @@ -89,7 +88,7 @@ int generateKey( dynamicRecord.spec(), dynamicRecord.equalityFields()); KeySelector keySelector = - keySelectorCache.get( + keySelectorCache.computeIfAbsent( cacheKey, k -> getKeySelector( @@ -377,7 +376,7 @@ public String toString() { } @VisibleForTesting - Cache> getKeySelectorCache() { + Map> getKeySelectorCache() { return keySelectorCache; } } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/LRUCache.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/LRUCache.java index bb1e17405377..be2866dc4e19 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/LRUCache.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/LRUCache.java @@ -42,6 +42,10 @@ class LRUCache extends LinkedHashMap { private final int maximumSize; private final Consumer> evictionCallback; + LRUCache(int maximumSize) { + this(maximumSize, ignored -> {}); + } + LRUCache(int maximumSize, Consumer> evictionCallback) { super(Math.min(maximumSize, DEFAULT_INITIAL_CAPACITY), DEFAULT_LOAD_FACTOR, true); this.maximumSize = maximumSize; diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java index 479982bc41ba..85a5a4abf29c 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java @@ -18,8 +18,6 @@ */ package org.apache.iceberg.flink.sink.dynamic; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; import java.util.Map; import java.util.Set; import org.apache.flink.annotation.Internal; @@ -53,18 +51,18 @@ class TableMetadataCache { private final Catalog catalog; private final long refreshMs; private final int inputSchemasPerTableCacheMaximumSize; - private final Cache cache; + private final Map tableCache; TableMetadataCache( Catalog catalog, int maximumSize, long refreshMs, int inputSchemasPerTableCacheMaximumSize) { this.catalog = catalog; this.refreshMs = refreshMs; this.inputSchemasPerTableCacheMaximumSize = inputSchemasPerTableCacheMaximumSize; - this.cache = Caffeine.newBuilder().maximumSize(maximumSize).build(); + this.tableCache = new LRUCache<>(maximumSize); } Tuple2 exists(TableIdentifier identifier) { - CacheItem cached = cache.getIfPresent(identifier); + CacheItem cached = tableCache.get(identifier); if (cached != null && Boolean.TRUE.equals(cached.tableExists)) { return EXISTS; } else if (needsRefresh(cached, true)) { @@ -87,7 +85,7 @@ PartitionSpec spec(TableIdentifier identifier, PartitionSpec spec) { } void update(TableIdentifier identifier, Table table) { - cache.put( + tableCache.put( identifier, new CacheItem( true, @@ -98,7 +96,7 @@ void update(TableIdentifier identifier, Table table) { } private String branch(TableIdentifier identifier, String branch, boolean allowRefresh) { - CacheItem cached = cache.getIfPresent(identifier); + CacheItem cached = tableCache.get(identifier); if (cached != null && cached.tableExists && cached.branches.contains(branch)) { return branch; } @@ -113,7 +111,7 @@ private String branch(TableIdentifier identifier, String branch, boolean allowRe private ResolvedSchemaInfo schema( TableIdentifier identifier, Schema input, boolean allowRefresh) { - CacheItem cached = cache.getIfPresent(identifier); + CacheItem cached = tableCache.get(identifier); Schema compatible = null; if (cached != null && cached.tableExists) { // This only works if the {@link Schema#equals(Object)} returns true for the old schema @@ -164,7 +162,7 @@ private ResolvedSchemaInfo schema( } private PartitionSpec spec(TableIdentifier identifier, PartitionSpec spec, boolean allowRefresh) { - CacheItem cached = cache.getIfPresent(identifier); + CacheItem cached = tableCache.get(identifier); if (cached != null && cached.tableExists) { for (PartitionSpec tableSpec : cached.specs.values()) { if (PartitionSpecEvolution.checkCompatibility(tableSpec, spec)) { @@ -188,7 +186,7 @@ private Tuple2 refreshTable(TableIdentifier identifier) { return EXISTS; } catch (NoSuchTableException e) { LOG.debug("Table doesn't exist {}", identifier, e); - cache.put(identifier, new CacheItem(false, null, null, null, 1)); + tableCache.put(identifier, new CacheItem(false, null, null, null, 1)); return Tuple2.of(false, e); } } @@ -199,7 +197,7 @@ private boolean needsRefresh(CacheItem cacheItem, boolean allowRefresh) { } public void invalidate(TableIdentifier identifier) { - cache.invalidate(identifier); + tableCache.remove(identifier); } /** Handles timeout for missing items only. Caffeine performance causes noticeable delays. */ @@ -210,7 +208,7 @@ static class CacheItem { private final Set branches; private final Map tableSchemas; private final Map specs; - private final LRUCache inputSchemas; + private final Map inputSchemas; private CacheItem( boolean tableExists, @@ -268,7 +266,7 @@ DataConverter recordConverter() { } @VisibleForTesting - Cache getInternalCache() { - return cache; + Map getInternalCache() { + return tableCache; } } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableSerializerCache.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableSerializerCache.java index 6acf873a94dc..84d0ed9be5d0 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableSerializerCache.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableSerializerCache.java @@ -18,8 +18,6 @@ */ package org.apache.iceberg.flink.sink.dynamic; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; import java.io.Serializable; import java.util.Map; import javax.annotation.Nullable; @@ -52,7 +50,7 @@ class TableSerializerCache implements Serializable { private final CatalogLoader catalogLoader; private final int maximumSize; - private transient Cache serializers; + private transient Map serializers; TableSerializerCache(CatalogLoader catalogLoader, int maximumSize) { this.catalogLoader = catalogLoader; @@ -80,10 +78,10 @@ private Tuple3 serializer( if (serializers == null) { // We need to initialize the cache at the first time - this.serializers = Caffeine.newBuilder().maximumSize(maximumSize).build(); + this.serializers = new LRUCache<>(maximumSize); } - SerializerInfo info = serializers.get(tableName, SerializerInfo::new); + SerializerInfo info = serializers.computeIfAbsent(tableName, SerializerInfo::new); Schema schema = unknownSchema != null ? unknownSchema : info.schemas.get(schemaId); PartitionSpec spec = unknownSpec != null ? unknownSpec : info.specs.get(specId); @@ -129,7 +127,7 @@ private void update() { } @VisibleForTesting - Cache getCache() { + Map getCache() { return serializers; } } diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java index b8d86feb99cf..8d559e920620 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java @@ -20,8 +20,8 @@ import static org.assertj.core.api.Assertions.assertThat; -import com.github.benmanes.caffeine.cache.Cache; import java.util.Collections; +import java.util.Map; import java.util.Set; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; @@ -302,7 +302,7 @@ void testCaching() throws Exception { int writeParallelism = 2; int maxWriteParallelism = 8; HashKeyGenerator generator = new HashKeyGenerator(maxCacheSize, maxWriteParallelism); - Cache> keySelectorCache = + Map> keySelectorCache = generator.getKeySelectorCache(); PartitionSpec unpartitioned = PartitionSpec.unpartitioned(); @@ -317,18 +317,14 @@ void testCaching() throws Exception { writeParallelism); int writeKey1 = generator.generateKey(record); - assertThat(keySelectorCache.estimatedSize()).isEqualTo(1); + assertThat(keySelectorCache).hasSize(1); int writeKey2 = generator.generateKey(record); assertThat(writeKey2).isNotEqualTo(writeKey1); - // Manually clean up because the cleanup is not always triggered - keySelectorCache.cleanUp(); - assertThat(keySelectorCache.estimatedSize()).isEqualTo(1); + assertThat(keySelectorCache).hasSize(1); int writeKey3 = generator.generateKey(record); - // Manually clean up because the cleanup is not always triggered - keySelectorCache.cleanUp(); - assertThat(keySelectorCache.estimatedSize()).isEqualTo(1); + assertThat(keySelectorCache).hasSize(1); // We create a new key selector which will start off at the same position assertThat(writeKey1).isEqualTo(writeKey3); } diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java index 5d222a212fff..2264cc3a8db0 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java @@ -89,8 +89,6 @@ void testCachingDisabled() { catalog.createTable(tableIdentifier, SCHEMA); TableMetadataCache cache = new TableMetadataCache(catalog, 0, Long.MAX_VALUE, 10); - // Cleanup routine doesn't run after every write - cache.getInternalCache().cleanUp(); - assertThat(cache.getInternalCache().estimatedSize()).isEqualTo(0); + assertThat(cache.getInternalCache()).isEmpty(); } } diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java index 70a512e7c7da..1cf2c8bae001 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java @@ -111,7 +111,7 @@ void testCacheEviction() { RowDataSerializer serializer1 = creator1.get(); RowDataSerializer serializer2 = creator2.get(); - cache.getCache().cleanUp(); + cache.getCache().clear(); assertThat(serializer1).isNotSameAs(creator1.get()); assertThat(serializer2).isNotSameAs(creator2.get()); } diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java index 9099fd1f3ae6..ad35d929728d 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java @@ -20,6 +20,7 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.util.Map; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -85,11 +86,11 @@ void testBranchCreationAndCaching() { catalog.createTable(tableIdentifier, SCHEMA); tableUpdater.update(tableIdentifier, "myBranch", SCHEMA, PartitionSpec.unpartitioned()); - TableMetadataCache.CacheItem cacheItem = cache.getInternalCache().getIfPresent(tableIdentifier); + TableMetadataCache.CacheItem cacheItem = cache.getInternalCache().get(tableIdentifier); assertThat(cacheItem).isNotNull(); tableUpdater.update(tableIdentifier, "myBranch", SCHEMA, PartitionSpec.unpartitioned()); - assertThat(cache.getInternalCache().getIfPresent(tableIdentifier)).isEqualTo(cacheItem); + assertThat(cache.getInternalCache()).contains(Map.entry(tableIdentifier, cacheItem)); } @Test @@ -153,7 +154,7 @@ void testLastResultInvalidation() { .isEqualTo(CompareSchemasVisitor.Result.SAME); // Last result cache should be cleared - assertThat(cache.getInternalCache().getIfPresent(tableIdentifier).inputSchemas().get(SCHEMA2)) - .isNull(); + assertThat(cache.getInternalCache().get(tableIdentifier).inputSchemas()) + .doesNotContainKey(SCHEMA2); } }