diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java index c4c4e61de151..341bdac03ca9 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java @@ -18,8 +18,6 @@ */ package org.apache.iceberg.flink.sink.dynamic; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; import java.io.IOException; import java.util.Collection; import java.util.Collections; @@ -56,7 +54,7 @@ class DynamicWriter implements CommittingSinkWriter taskWriterFactories; + private final Map taskWriterFactories; private final Map> writers; private final DynamicWriterMetrics metrics; private final int subTaskId; @@ -82,7 +80,7 @@ class DynamicWriter implements CommittingSinkWriter(cacheMaximumSize); this.writers = Maps.newHashMap(); LOG.debug("DynamicIcebergSinkWriter created for subtask {} attemptId {}", subTaskId, attemptId); @@ -102,7 +100,7 @@ public void write(DynamicRecordInternal element, Context context) element.equalityFields()), writerKey -> { RowDataTaskWriterFactory taskWriterFactory = - taskWriterFactories.get( + taskWriterFactories.computeIfAbsent( writerKey, factoryKey -> { Table table = diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java index d0909e0605d4..91aa4a91710c 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java @@ -20,9 +20,8 @@ import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; import java.util.Collections; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -62,11 +61,11 @@ class HashKeyGenerator { private static final Logger LOG = LoggerFactory.getLogger(HashKeyGenerator.class); private final int maxWriteParallelism; - private final Cache> keySelectorCache; + private final Map> keySelectorCache; HashKeyGenerator(int maxCacheSize, int maxWriteParallelism) { this.maxWriteParallelism = maxWriteParallelism; - this.keySelectorCache = Caffeine.newBuilder().maximumSize(maxCacheSize).build(); + this.keySelectorCache = new LRUCache<>(maxCacheSize); } int generateKey(DynamicRecord dynamicRecord) throws Exception { @@ -89,7 +88,7 @@ int generateKey( dynamicRecord.spec(), dynamicRecord.equalityFields()); KeySelector keySelector = - keySelectorCache.get( + keySelectorCache.computeIfAbsent( cacheKey, k -> getKeySelector( @@ -377,7 +376,7 @@ public String toString() { } @VisibleForTesting - Cache> getKeySelectorCache() { + Map> getKeySelectorCache() { return keySelectorCache; } } diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/LRUCache.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/LRUCache.java index bb1e17405377..be2866dc4e19 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/LRUCache.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/LRUCache.java @@ -42,6 +42,10 @@ class LRUCache extends LinkedHashMap { private final int maximumSize; private final Consumer> evictionCallback; + LRUCache(int maximumSize) { + this(maximumSize, ignored -> {}); + } + LRUCache(int maximumSize, Consumer> evictionCallback) { super(Math.min(maximumSize, DEFAULT_INITIAL_CAPACITY), DEFAULT_LOAD_FACTOR, true); this.maximumSize = maximumSize; diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java index 479982bc41ba..85a5a4abf29c 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java @@ -18,8 +18,6 @@ */ package org.apache.iceberg.flink.sink.dynamic; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; import java.util.Map; import java.util.Set; import org.apache.flink.annotation.Internal; @@ -53,18 +51,18 @@ class TableMetadataCache { private final Catalog catalog; private final long refreshMs; private final int inputSchemasPerTableCacheMaximumSize; - private final Cache cache; + private final Map tableCache; TableMetadataCache( Catalog catalog, int maximumSize, long refreshMs, int inputSchemasPerTableCacheMaximumSize) { this.catalog = catalog; this.refreshMs = refreshMs; this.inputSchemasPerTableCacheMaximumSize = inputSchemasPerTableCacheMaximumSize; - this.cache = Caffeine.newBuilder().maximumSize(maximumSize).build(); + this.tableCache = new LRUCache<>(maximumSize); } Tuple2 exists(TableIdentifier identifier) { - CacheItem cached = cache.getIfPresent(identifier); + CacheItem cached = tableCache.get(identifier); if (cached != null && Boolean.TRUE.equals(cached.tableExists)) { return EXISTS; } else if (needsRefresh(cached, true)) { @@ -87,7 +85,7 @@ PartitionSpec spec(TableIdentifier identifier, PartitionSpec spec) { } void update(TableIdentifier identifier, Table table) { - cache.put( + tableCache.put( identifier, new CacheItem( true, @@ -98,7 +96,7 @@ void update(TableIdentifier identifier, Table table) { } private String branch(TableIdentifier identifier, String branch, boolean allowRefresh) { - CacheItem cached = cache.getIfPresent(identifier); + CacheItem cached = tableCache.get(identifier); if (cached != null && cached.tableExists && cached.branches.contains(branch)) { return branch; } @@ -113,7 +111,7 @@ private String branch(TableIdentifier identifier, String branch, boolean allowRe private ResolvedSchemaInfo schema( TableIdentifier identifier, Schema input, boolean allowRefresh) { - CacheItem cached = cache.getIfPresent(identifier); + CacheItem cached = tableCache.get(identifier); Schema compatible = null; if (cached != null && cached.tableExists) { // This only works if the {@link Schema#equals(Object)} returns true for the old schema @@ -164,7 +162,7 @@ private ResolvedSchemaInfo schema( } private PartitionSpec spec(TableIdentifier identifier, PartitionSpec spec, boolean allowRefresh) { - CacheItem cached = cache.getIfPresent(identifier); + CacheItem cached = tableCache.get(identifier); if (cached != null && cached.tableExists) { for (PartitionSpec tableSpec : cached.specs.values()) { if (PartitionSpecEvolution.checkCompatibility(tableSpec, spec)) { @@ -188,7 +186,7 @@ private Tuple2 refreshTable(TableIdentifier identifier) { return EXISTS; } catch (NoSuchTableException e) { LOG.debug("Table doesn't exist {}", identifier, e); - cache.put(identifier, new CacheItem(false, null, null, null, 1)); + tableCache.put(identifier, new CacheItem(false, null, null, null, 1)); return Tuple2.of(false, e); } } @@ -199,7 +197,7 @@ private boolean needsRefresh(CacheItem cacheItem, boolean allowRefresh) { } public void invalidate(TableIdentifier identifier) { - cache.invalidate(identifier); + tableCache.remove(identifier); } /** Handles timeout for missing items only. Caffeine performance causes noticeable delays. */ @@ -210,7 +208,7 @@ static class CacheItem { private final Set branches; private final Map tableSchemas; private final Map specs; - private final LRUCache inputSchemas; + private final Map inputSchemas; private CacheItem( boolean tableExists, @@ -268,7 +266,7 @@ DataConverter recordConverter() { } @VisibleForTesting - Cache getInternalCache() { - return cache; + Map getInternalCache() { + return tableCache; } } diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableSerializerCache.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableSerializerCache.java index 6acf873a94dc..84d0ed9be5d0 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableSerializerCache.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableSerializerCache.java @@ -18,8 +18,6 @@ */ package org.apache.iceberg.flink.sink.dynamic; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; import java.io.Serializable; import java.util.Map; import javax.annotation.Nullable; @@ -52,7 +50,7 @@ class TableSerializerCache implements Serializable { private final CatalogLoader catalogLoader; private final int maximumSize; - private transient Cache serializers; + private transient Map serializers; TableSerializerCache(CatalogLoader catalogLoader, int maximumSize) { this.catalogLoader = catalogLoader; @@ -80,10 +78,10 @@ private Tuple3 serializer( if (serializers == null) { // We need to initialize the cache at the first time - this.serializers = Caffeine.newBuilder().maximumSize(maximumSize).build(); + this.serializers = new LRUCache<>(maximumSize); } - SerializerInfo info = serializers.get(tableName, SerializerInfo::new); + SerializerInfo info = serializers.computeIfAbsent(tableName, SerializerInfo::new); Schema schema = unknownSchema != null ? unknownSchema : info.schemas.get(schemaId); PartitionSpec spec = unknownSpec != null ? unknownSpec : info.specs.get(specId); @@ -129,7 +127,7 @@ private void update() { } @VisibleForTesting - Cache getCache() { + Map getCache() { return serializers; } } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java index b8d86feb99cf..8d559e920620 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java @@ -20,8 +20,8 @@ import static org.assertj.core.api.Assertions.assertThat; -import com.github.benmanes.caffeine.cache.Cache; import java.util.Collections; +import java.util.Map; import java.util.Set; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; @@ -302,7 +302,7 @@ void testCaching() throws Exception { int writeParallelism = 2; int maxWriteParallelism = 8; HashKeyGenerator generator = new HashKeyGenerator(maxCacheSize, maxWriteParallelism); - Cache> keySelectorCache = + Map> keySelectorCache = generator.getKeySelectorCache(); PartitionSpec unpartitioned = PartitionSpec.unpartitioned(); @@ -317,18 +317,14 @@ void testCaching() throws Exception { writeParallelism); int writeKey1 = generator.generateKey(record); - assertThat(keySelectorCache.estimatedSize()).isEqualTo(1); + assertThat(keySelectorCache).hasSize(1); int writeKey2 = generator.generateKey(record); assertThat(writeKey2).isNotEqualTo(writeKey1); - // Manually clean up because the cleanup is not always triggered - keySelectorCache.cleanUp(); - assertThat(keySelectorCache.estimatedSize()).isEqualTo(1); + assertThat(keySelectorCache).hasSize(1); int writeKey3 = generator.generateKey(record); - // Manually clean up because the cleanup is not always triggered - keySelectorCache.cleanUp(); - assertThat(keySelectorCache.estimatedSize()).isEqualTo(1); + assertThat(keySelectorCache).hasSize(1); // We create a new key selector which will start off at the same position assertThat(writeKey1).isEqualTo(writeKey3); } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java index 5d222a212fff..2264cc3a8db0 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java @@ -89,8 +89,6 @@ void testCachingDisabled() { catalog.createTable(tableIdentifier, SCHEMA); TableMetadataCache cache = new TableMetadataCache(catalog, 0, Long.MAX_VALUE, 10); - // Cleanup routine doesn't run after every write - cache.getInternalCache().cleanUp(); - assertThat(cache.getInternalCache().estimatedSize()).isEqualTo(0); + assertThat(cache.getInternalCache()).isEmpty(); } } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java index 720a0a582e94..ec610a3357ba 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java @@ -111,7 +111,7 @@ void testCacheEviction() { RowDataSerializer serializer1 = creator1.get(); RowDataSerializer serializer2 = creator2.get(); - cache.getCache().cleanUp(); + cache.getCache().clear(); assertThat(serializer1).isNotSameAs(creator1.get()); assertThat(serializer2).isNotSameAs(creator2.get()); } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java index 9099fd1f3ae6..ad35d929728d 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java @@ -20,6 +20,7 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.util.Map; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -85,11 +86,11 @@ void testBranchCreationAndCaching() { catalog.createTable(tableIdentifier, SCHEMA); tableUpdater.update(tableIdentifier, "myBranch", SCHEMA, PartitionSpec.unpartitioned()); - TableMetadataCache.CacheItem cacheItem = cache.getInternalCache().getIfPresent(tableIdentifier); + TableMetadataCache.CacheItem cacheItem = cache.getInternalCache().get(tableIdentifier); assertThat(cacheItem).isNotNull(); tableUpdater.update(tableIdentifier, "myBranch", SCHEMA, PartitionSpec.unpartitioned()); - assertThat(cache.getInternalCache().getIfPresent(tableIdentifier)).isEqualTo(cacheItem); + assertThat(cache.getInternalCache()).contains(Map.entry(tableIdentifier, cacheItem)); } @Test @@ -153,7 +154,7 @@ void testLastResultInvalidation() { .isEqualTo(CompareSchemasVisitor.Result.SAME); // Last result cache should be cleared - assertThat(cache.getInternalCache().getIfPresent(tableIdentifier).inputSchemas().get(SCHEMA2)) - .isNull(); + assertThat(cache.getInternalCache().get(tableIdentifier).inputSchemas()) + .doesNotContainKey(SCHEMA2); } } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java index c4c4e61de151..341bdac03ca9 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java @@ -18,8 +18,6 @@ */ package org.apache.iceberg.flink.sink.dynamic; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; import java.io.IOException; import java.util.Collection; import java.util.Collections; @@ -56,7 +54,7 @@ class DynamicWriter implements CommittingSinkWriter taskWriterFactories; + private final Map taskWriterFactories; private final Map> writers; private final DynamicWriterMetrics metrics; private final int subTaskId; @@ -82,7 +80,7 @@ class DynamicWriter implements CommittingSinkWriter(cacheMaximumSize); this.writers = Maps.newHashMap(); LOG.debug("DynamicIcebergSinkWriter created for subtask {} attemptId {}", subTaskId, attemptId); @@ -102,7 +100,7 @@ public void write(DynamicRecordInternal element, Context context) element.equalityFields()), writerKey -> { RowDataTaskWriterFactory taskWriterFactory = - taskWriterFactories.get( + taskWriterFactories.computeIfAbsent( writerKey, factoryKey -> { Table table = diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java index d0909e0605d4..91aa4a91710c 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java @@ -20,9 +20,8 @@ import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; import java.util.Collections; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -62,11 +61,11 @@ class HashKeyGenerator { private static final Logger LOG = LoggerFactory.getLogger(HashKeyGenerator.class); private final int maxWriteParallelism; - private final Cache> keySelectorCache; + private final Map> keySelectorCache; HashKeyGenerator(int maxCacheSize, int maxWriteParallelism) { this.maxWriteParallelism = maxWriteParallelism; - this.keySelectorCache = Caffeine.newBuilder().maximumSize(maxCacheSize).build(); + this.keySelectorCache = new LRUCache<>(maxCacheSize); } int generateKey(DynamicRecord dynamicRecord) throws Exception { @@ -89,7 +88,7 @@ int generateKey( dynamicRecord.spec(), dynamicRecord.equalityFields()); KeySelector keySelector = - keySelectorCache.get( + keySelectorCache.computeIfAbsent( cacheKey, k -> getKeySelector( @@ -377,7 +376,7 @@ public String toString() { } @VisibleForTesting - Cache> getKeySelectorCache() { + Map> getKeySelectorCache() { return keySelectorCache; } } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/LRUCache.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/LRUCache.java index bb1e17405377..be2866dc4e19 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/LRUCache.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/LRUCache.java @@ -42,6 +42,10 @@ class LRUCache extends LinkedHashMap { private final int maximumSize; private final Consumer> evictionCallback; + LRUCache(int maximumSize) { + this(maximumSize, ignored -> {}); + } + LRUCache(int maximumSize, Consumer> evictionCallback) { super(Math.min(maximumSize, DEFAULT_INITIAL_CAPACITY), DEFAULT_LOAD_FACTOR, true); this.maximumSize = maximumSize; diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java index 479982bc41ba..85a5a4abf29c 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java @@ -18,8 +18,6 @@ */ package org.apache.iceberg.flink.sink.dynamic; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; import java.util.Map; import java.util.Set; import org.apache.flink.annotation.Internal; @@ -53,18 +51,18 @@ class TableMetadataCache { private final Catalog catalog; private final long refreshMs; private final int inputSchemasPerTableCacheMaximumSize; - private final Cache cache; + private final Map tableCache; TableMetadataCache( Catalog catalog, int maximumSize, long refreshMs, int inputSchemasPerTableCacheMaximumSize) { this.catalog = catalog; this.refreshMs = refreshMs; this.inputSchemasPerTableCacheMaximumSize = inputSchemasPerTableCacheMaximumSize; - this.cache = Caffeine.newBuilder().maximumSize(maximumSize).build(); + this.tableCache = new LRUCache<>(maximumSize); } Tuple2 exists(TableIdentifier identifier) { - CacheItem cached = cache.getIfPresent(identifier); + CacheItem cached = tableCache.get(identifier); if (cached != null && Boolean.TRUE.equals(cached.tableExists)) { return EXISTS; } else if (needsRefresh(cached, true)) { @@ -87,7 +85,7 @@ PartitionSpec spec(TableIdentifier identifier, PartitionSpec spec) { } void update(TableIdentifier identifier, Table table) { - cache.put( + tableCache.put( identifier, new CacheItem( true, @@ -98,7 +96,7 @@ void update(TableIdentifier identifier, Table table) { } private String branch(TableIdentifier identifier, String branch, boolean allowRefresh) { - CacheItem cached = cache.getIfPresent(identifier); + CacheItem cached = tableCache.get(identifier); if (cached != null && cached.tableExists && cached.branches.contains(branch)) { return branch; } @@ -113,7 +111,7 @@ private String branch(TableIdentifier identifier, String branch, boolean allowRe private ResolvedSchemaInfo schema( TableIdentifier identifier, Schema input, boolean allowRefresh) { - CacheItem cached = cache.getIfPresent(identifier); + CacheItem cached = tableCache.get(identifier); Schema compatible = null; if (cached != null && cached.tableExists) { // This only works if the {@link Schema#equals(Object)} returns true for the old schema @@ -164,7 +162,7 @@ private ResolvedSchemaInfo schema( } private PartitionSpec spec(TableIdentifier identifier, PartitionSpec spec, boolean allowRefresh) { - CacheItem cached = cache.getIfPresent(identifier); + CacheItem cached = tableCache.get(identifier); if (cached != null && cached.tableExists) { for (PartitionSpec tableSpec : cached.specs.values()) { if (PartitionSpecEvolution.checkCompatibility(tableSpec, spec)) { @@ -188,7 +186,7 @@ private Tuple2 refreshTable(TableIdentifier identifier) { return EXISTS; } catch (NoSuchTableException e) { LOG.debug("Table doesn't exist {}", identifier, e); - cache.put(identifier, new CacheItem(false, null, null, null, 1)); + tableCache.put(identifier, new CacheItem(false, null, null, null, 1)); return Tuple2.of(false, e); } } @@ -199,7 +197,7 @@ private boolean needsRefresh(CacheItem cacheItem, boolean allowRefresh) { } public void invalidate(TableIdentifier identifier) { - cache.invalidate(identifier); + tableCache.remove(identifier); } /** Handles timeout for missing items only. Caffeine performance causes noticeable delays. */ @@ -210,7 +208,7 @@ static class CacheItem { private final Set branches; private final Map tableSchemas; private final Map specs; - private final LRUCache inputSchemas; + private final Map inputSchemas; private CacheItem( boolean tableExists, @@ -268,7 +266,7 @@ DataConverter recordConverter() { } @VisibleForTesting - Cache getInternalCache() { - return cache; + Map getInternalCache() { + return tableCache; } } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableSerializerCache.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableSerializerCache.java index 6acf873a94dc..84d0ed9be5d0 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableSerializerCache.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableSerializerCache.java @@ -18,8 +18,6 @@ */ package org.apache.iceberg.flink.sink.dynamic; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; import java.io.Serializable; import java.util.Map; import javax.annotation.Nullable; @@ -52,7 +50,7 @@ class TableSerializerCache implements Serializable { private final CatalogLoader catalogLoader; private final int maximumSize; - private transient Cache serializers; + private transient Map serializers; TableSerializerCache(CatalogLoader catalogLoader, int maximumSize) { this.catalogLoader = catalogLoader; @@ -80,10 +78,10 @@ private Tuple3 serializer( if (serializers == null) { // We need to initialize the cache at the first time - this.serializers = Caffeine.newBuilder().maximumSize(maximumSize).build(); + this.serializers = new LRUCache<>(maximumSize); } - SerializerInfo info = serializers.get(tableName, SerializerInfo::new); + SerializerInfo info = serializers.computeIfAbsent(tableName, SerializerInfo::new); Schema schema = unknownSchema != null ? unknownSchema : info.schemas.get(schemaId); PartitionSpec spec = unknownSpec != null ? unknownSpec : info.specs.get(specId); @@ -129,7 +127,7 @@ private void update() { } @VisibleForTesting - Cache getCache() { + Map getCache() { return serializers; } } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java index b8d86feb99cf..8d559e920620 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java @@ -20,8 +20,8 @@ import static org.assertj.core.api.Assertions.assertThat; -import com.github.benmanes.caffeine.cache.Cache; import java.util.Collections; +import java.util.Map; import java.util.Set; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; @@ -302,7 +302,7 @@ void testCaching() throws Exception { int writeParallelism = 2; int maxWriteParallelism = 8; HashKeyGenerator generator = new HashKeyGenerator(maxCacheSize, maxWriteParallelism); - Cache> keySelectorCache = + Map> keySelectorCache = generator.getKeySelectorCache(); PartitionSpec unpartitioned = PartitionSpec.unpartitioned(); @@ -317,18 +317,14 @@ void testCaching() throws Exception { writeParallelism); int writeKey1 = generator.generateKey(record); - assertThat(keySelectorCache.estimatedSize()).isEqualTo(1); + assertThat(keySelectorCache).hasSize(1); int writeKey2 = generator.generateKey(record); assertThat(writeKey2).isNotEqualTo(writeKey1); - // Manually clean up because the cleanup is not always triggered - keySelectorCache.cleanUp(); - assertThat(keySelectorCache.estimatedSize()).isEqualTo(1); + assertThat(keySelectorCache).hasSize(1); int writeKey3 = generator.generateKey(record); - // Manually clean up because the cleanup is not always triggered - keySelectorCache.cleanUp(); - assertThat(keySelectorCache.estimatedSize()).isEqualTo(1); + assertThat(keySelectorCache).hasSize(1); // We create a new key selector which will start off at the same position assertThat(writeKey1).isEqualTo(writeKey3); } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java index 5d222a212fff..2264cc3a8db0 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java @@ -89,8 +89,6 @@ void testCachingDisabled() { catalog.createTable(tableIdentifier, SCHEMA); TableMetadataCache cache = new TableMetadataCache(catalog, 0, Long.MAX_VALUE, 10); - // Cleanup routine doesn't run after every write - cache.getInternalCache().cleanUp(); - assertThat(cache.getInternalCache().estimatedSize()).isEqualTo(0); + assertThat(cache.getInternalCache()).isEmpty(); } } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java index 720a0a582e94..ec610a3357ba 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java @@ -111,7 +111,7 @@ void testCacheEviction() { RowDataSerializer serializer1 = creator1.get(); RowDataSerializer serializer2 = creator2.get(); - cache.getCache().cleanUp(); + cache.getCache().clear(); assertThat(serializer1).isNotSameAs(creator1.get()); assertThat(serializer2).isNotSameAs(creator2.get()); } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java index 9099fd1f3ae6..ad35d929728d 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java @@ -20,6 +20,7 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.util.Map; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -85,11 +86,11 @@ void testBranchCreationAndCaching() { catalog.createTable(tableIdentifier, SCHEMA); tableUpdater.update(tableIdentifier, "myBranch", SCHEMA, PartitionSpec.unpartitioned()); - TableMetadataCache.CacheItem cacheItem = cache.getInternalCache().getIfPresent(tableIdentifier); + TableMetadataCache.CacheItem cacheItem = cache.getInternalCache().get(tableIdentifier); assertThat(cacheItem).isNotNull(); tableUpdater.update(tableIdentifier, "myBranch", SCHEMA, PartitionSpec.unpartitioned()); - assertThat(cache.getInternalCache().getIfPresent(tableIdentifier)).isEqualTo(cacheItem); + assertThat(cache.getInternalCache()).contains(Map.entry(tableIdentifier, cacheItem)); } @Test @@ -153,7 +154,7 @@ void testLastResultInvalidation() { .isEqualTo(CompareSchemasVisitor.Result.SAME); // Last result cache should be cleared - assertThat(cache.getInternalCache().getIfPresent(tableIdentifier).inputSchemas().get(SCHEMA2)) - .isNull(); + assertThat(cache.getInternalCache().get(tableIdentifier).inputSchemas()) + .doesNotContainKey(SCHEMA2); } }