Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.iceberg.flink.sink.dynamic;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -56,7 +54,7 @@ class DynamicWriter implements CommittingSinkWriter<DynamicRecordInternal, Dynam

private static final Logger LOG = LoggerFactory.getLogger(DynamicWriter.class);

private final Cache<WriteTarget, RowDataTaskWriterFactory> taskWriterFactories;
private final Map<WriteTarget, RowDataTaskWriterFactory> taskWriterFactories;
private final Map<WriteTarget, TaskWriter<RowData>> writers;
private final DynamicWriterMetrics metrics;
private final int subTaskId;
Expand All @@ -82,7 +80,7 @@ class DynamicWriter implements CommittingSinkWriter<DynamicRecordInternal, Dynam
this.metrics = metrics;
this.subTaskId = subTaskId;
this.attemptId = attemptId;
this.taskWriterFactories = Caffeine.newBuilder().maximumSize(cacheMaximumSize).build();
this.taskWriterFactories = new LRUCache<>(cacheMaximumSize);
this.writers = Maps.newHashMap();

LOG.debug("DynamicIcebergSinkWriter created for subtask {} attemptId {}", subTaskId, attemptId);
Expand All @@ -102,7 +100,7 @@ public void write(DynamicRecordInternal element, Context context)
element.equalityFields()),
writerKey -> {
RowDataTaskWriterFactory taskWriterFactory =
taskWriterFactories.get(
taskWriterFactories.computeIfAbsent(
writerKey,
factoryKey -> {
Table table =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@

import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -62,11 +61,11 @@ class HashKeyGenerator {
private static final Logger LOG = LoggerFactory.getLogger(HashKeyGenerator.class);

private final int maxWriteParallelism;
private final Cache<SelectorKey, KeySelector<RowData, Integer>> keySelectorCache;
private final Map<SelectorKey, KeySelector<RowData, Integer>> keySelectorCache;

HashKeyGenerator(int maxCacheSize, int maxWriteParallelism) {
this.maxWriteParallelism = maxWriteParallelism;
this.keySelectorCache = Caffeine.newBuilder().maximumSize(maxCacheSize).build();
this.keySelectorCache = new LRUCache<>(maxCacheSize);
}

int generateKey(DynamicRecord dynamicRecord) throws Exception {
Expand All @@ -89,7 +88,7 @@ int generateKey(
dynamicRecord.spec(),
dynamicRecord.equalityFields());
KeySelector<RowData, Integer> keySelector =
keySelectorCache.get(
keySelectorCache.computeIfAbsent(
cacheKey,
k ->
getKeySelector(
Expand Down Expand Up @@ -377,7 +376,7 @@ public String toString() {
}

@VisibleForTesting
Cache<SelectorKey, KeySelector<RowData, Integer>> getKeySelectorCache() {
Map<SelectorKey, KeySelector<RowData, Integer>> getKeySelectorCache() {
return keySelectorCache;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ class LRUCache<K, V> extends LinkedHashMap<K, V> {
private final int maximumSize;
private final Consumer<Map.Entry<K, V>> evictionCallback;

LRUCache(int maximumSize) {
this(maximumSize, ignored -> {});
}

LRUCache(int maximumSize, Consumer<Map.Entry<K, V>> evictionCallback) {
super(Math.min(maximumSize, DEFAULT_INITIAL_CAPACITY), DEFAULT_LOAD_FACTOR, true);
this.maximumSize = maximumSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.iceberg.flink.sink.dynamic;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.util.Map;
import java.util.Set;
import org.apache.flink.annotation.Internal;
Expand Down Expand Up @@ -53,18 +51,18 @@ class TableMetadataCache {
private final Catalog catalog;
private final long refreshMs;
private final int inputSchemasPerTableCacheMaximumSize;
private final Cache<TableIdentifier, CacheItem> cache;
private final Map<TableIdentifier, CacheItem> tableCache;

TableMetadataCache(
Catalog catalog, int maximumSize, long refreshMs, int inputSchemasPerTableCacheMaximumSize) {
this.catalog = catalog;
this.refreshMs = refreshMs;
this.inputSchemasPerTableCacheMaximumSize = inputSchemasPerTableCacheMaximumSize;
this.cache = Caffeine.newBuilder().maximumSize(maximumSize).build();
this.tableCache = new LRUCache<>(maximumSize);
}

Tuple2<Boolean, Exception> exists(TableIdentifier identifier) {
CacheItem cached = cache.getIfPresent(identifier);
CacheItem cached = tableCache.get(identifier);
if (cached != null && Boolean.TRUE.equals(cached.tableExists)) {
return EXISTS;
} else if (needsRefresh(cached, true)) {
Expand All @@ -87,7 +85,7 @@ PartitionSpec spec(TableIdentifier identifier, PartitionSpec spec) {
}

void update(TableIdentifier identifier, Table table) {
cache.put(
tableCache.put(
identifier,
new CacheItem(
true,
Expand All @@ -98,7 +96,7 @@ void update(TableIdentifier identifier, Table table) {
}

private String branch(TableIdentifier identifier, String branch, boolean allowRefresh) {
CacheItem cached = cache.getIfPresent(identifier);
CacheItem cached = tableCache.get(identifier);
if (cached != null && cached.tableExists && cached.branches.contains(branch)) {
return branch;
}
Expand All @@ -113,7 +111,7 @@ private String branch(TableIdentifier identifier, String branch, boolean allowRe

private ResolvedSchemaInfo schema(
TableIdentifier identifier, Schema input, boolean allowRefresh) {
CacheItem cached = cache.getIfPresent(identifier);
CacheItem cached = tableCache.get(identifier);
Schema compatible = null;
if (cached != null && cached.tableExists) {
// This only works if the {@link Schema#equals(Object)} returns true for the old schema
Expand Down Expand Up @@ -164,7 +162,7 @@ private ResolvedSchemaInfo schema(
}

private PartitionSpec spec(TableIdentifier identifier, PartitionSpec spec, boolean allowRefresh) {
CacheItem cached = cache.getIfPresent(identifier);
CacheItem cached = tableCache.get(identifier);
if (cached != null && cached.tableExists) {
for (PartitionSpec tableSpec : cached.specs.values()) {
if (PartitionSpecEvolution.checkCompatibility(tableSpec, spec)) {
Expand All @@ -188,7 +186,7 @@ private Tuple2<Boolean, Exception> refreshTable(TableIdentifier identifier) {
return EXISTS;
} catch (NoSuchTableException e) {
LOG.debug("Table doesn't exist {}", identifier, e);
cache.put(identifier, new CacheItem(false, null, null, null, 1));
tableCache.put(identifier, new CacheItem(false, null, null, null, 1));
return Tuple2.of(false, e);
}
}
Expand All @@ -199,7 +197,7 @@ private boolean needsRefresh(CacheItem cacheItem, boolean allowRefresh) {
}

public void invalidate(TableIdentifier identifier) {
cache.invalidate(identifier);
tableCache.remove(identifier);
}

/** Handles timeout for missing items only. Caffeine performance causes noticeable delays. */
Expand All @@ -210,7 +208,7 @@ static class CacheItem {
private final Set<String> branches;
private final Map<Integer, Schema> tableSchemas;
private final Map<Integer, PartitionSpec> specs;
private final LRUCache<Schema, ResolvedSchemaInfo> inputSchemas;
private final Map<Schema, ResolvedSchemaInfo> inputSchemas;

private CacheItem(
boolean tableExists,
Expand Down Expand Up @@ -268,7 +266,7 @@ DataConverter recordConverter() {
}

@VisibleForTesting
Cache<TableIdentifier, CacheItem> getInternalCache() {
return cache;
Map<TableIdentifier, CacheItem> getInternalCache() {
return tableCache;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.iceberg.flink.sink.dynamic;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.io.Serializable;
import java.util.Map;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -52,7 +50,7 @@ class TableSerializerCache implements Serializable {

private final CatalogLoader catalogLoader;
private final int maximumSize;
private transient Cache<String, SerializerInfo> serializers;
private transient Map<String, SerializerInfo> serializers;

TableSerializerCache(CatalogLoader catalogLoader, int maximumSize) {
this.catalogLoader = catalogLoader;
Expand Down Expand Up @@ -80,10 +78,10 @@ private Tuple3<RowDataSerializer, Schema, PartitionSpec> serializer(

if (serializers == null) {
// We need to initialize the cache at the first time
this.serializers = Caffeine.newBuilder().maximumSize(maximumSize).build();
this.serializers = new LRUCache<>(maximumSize);
}

SerializerInfo info = serializers.get(tableName, SerializerInfo::new);
SerializerInfo info = serializers.computeIfAbsent(tableName, SerializerInfo::new);
Schema schema = unknownSchema != null ? unknownSchema : info.schemas.get(schemaId);
PartitionSpec spec = unknownSpec != null ? unknownSpec : info.specs.get(specId);

Expand Down Expand Up @@ -129,7 +127,7 @@ private void update() {
}

@VisibleForTesting
Cache<String, SerializerInfo> getCache() {
Map<String, SerializerInfo> getCache() {
return serializers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import static org.assertj.core.api.Assertions.assertThat;

import com.github.benmanes.caffeine.cache.Cache;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
Expand Down Expand Up @@ -302,7 +302,7 @@ void testCaching() throws Exception {
int writeParallelism = 2;
int maxWriteParallelism = 8;
HashKeyGenerator generator = new HashKeyGenerator(maxCacheSize, maxWriteParallelism);
Cache<HashKeyGenerator.SelectorKey, KeySelector<RowData, Integer>> keySelectorCache =
Map<HashKeyGenerator.SelectorKey, KeySelector<RowData, Integer>> keySelectorCache =
generator.getKeySelectorCache();

PartitionSpec unpartitioned = PartitionSpec.unpartitioned();
Expand All @@ -317,18 +317,14 @@ void testCaching() throws Exception {
writeParallelism);

int writeKey1 = generator.generateKey(record);
assertThat(keySelectorCache.estimatedSize()).isEqualTo(1);
assertThat(keySelectorCache).hasSize(1);

int writeKey2 = generator.generateKey(record);
assertThat(writeKey2).isNotEqualTo(writeKey1);
// Manually clean up because the cleanup is not always triggered
keySelectorCache.cleanUp();
assertThat(keySelectorCache.estimatedSize()).isEqualTo(1);
assertThat(keySelectorCache).hasSize(1);

int writeKey3 = generator.generateKey(record);
// Manually clean up because the cleanup is not always triggered
keySelectorCache.cleanUp();
assertThat(keySelectorCache.estimatedSize()).isEqualTo(1);
assertThat(keySelectorCache).hasSize(1);
// We create a new key selector which will start off at the same position
assertThat(writeKey1).isEqualTo(writeKey3);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,6 @@ void testCachingDisabled() {
catalog.createTable(tableIdentifier, SCHEMA);
TableMetadataCache cache = new TableMetadataCache(catalog, 0, Long.MAX_VALUE, 10);

// Cleanup routine doesn't run after every write
cache.getInternalCache().cleanUp();
assertThat(cache.getInternalCache().estimatedSize()).isEqualTo(0);
assertThat(cache.getInternalCache()).isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ void testCacheEviction() {
RowDataSerializer serializer1 = creator1.get();
RowDataSerializer serializer2 = creator2.get();

cache.getCache().cleanUp();
cache.getCache().clear();
assertThat(serializer1).isNotSameAs(creator1.get());
assertThat(serializer2).isNotSameAs(creator2.get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.util.Map;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
Expand Down Expand Up @@ -85,11 +86,11 @@ void testBranchCreationAndCaching() {

catalog.createTable(tableIdentifier, SCHEMA);
tableUpdater.update(tableIdentifier, "myBranch", SCHEMA, PartitionSpec.unpartitioned());
TableMetadataCache.CacheItem cacheItem = cache.getInternalCache().getIfPresent(tableIdentifier);
TableMetadataCache.CacheItem cacheItem = cache.getInternalCache().get(tableIdentifier);
assertThat(cacheItem).isNotNull();

tableUpdater.update(tableIdentifier, "myBranch", SCHEMA, PartitionSpec.unpartitioned());
assertThat(cache.getInternalCache().getIfPresent(tableIdentifier)).isEqualTo(cacheItem);
assertThat(cache.getInternalCache()).contains(Map.entry(tableIdentifier, cacheItem));
}

@Test
Expand Down Expand Up @@ -153,7 +154,7 @@ void testLastResultInvalidation() {
.isEqualTo(CompareSchemasVisitor.Result.SAME);

// Last result cache should be cleared
assertThat(cache.getInternalCache().getIfPresent(tableIdentifier).inputSchemas().get(SCHEMA2))
.isNull();
assertThat(cache.getInternalCache().get(tableIdentifier).inputSchemas())
.doesNotContainKey(SCHEMA2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.iceberg.flink.sink.dynamic;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -56,7 +54,7 @@ class DynamicWriter implements CommittingSinkWriter<DynamicRecordInternal, Dynam

private static final Logger LOG = LoggerFactory.getLogger(DynamicWriter.class);

private final Cache<WriteTarget, RowDataTaskWriterFactory> taskWriterFactories;
private final Map<WriteTarget, RowDataTaskWriterFactory> taskWriterFactories;
private final Map<WriteTarget, TaskWriter<RowData>> writers;
private final DynamicWriterMetrics metrics;
private final int subTaskId;
Expand All @@ -82,7 +80,7 @@ class DynamicWriter implements CommittingSinkWriter<DynamicRecordInternal, Dynam
this.metrics = metrics;
this.subTaskId = subTaskId;
this.attemptId = attemptId;
this.taskWriterFactories = Caffeine.newBuilder().maximumSize(cacheMaximumSize).build();
this.taskWriterFactories = new LRUCache<>(cacheMaximumSize);
this.writers = Maps.newHashMap();

LOG.debug("DynamicIcebergSinkWriter created for subtask {} attemptId {}", subTaskId, attemptId);
Expand All @@ -102,7 +100,7 @@ public void write(DynamicRecordInternal element, Context context)
element.equalityFields()),
writerKey -> {
RowDataTaskWriterFactory taskWriterFactory =
taskWriterFactories.get(
taskWriterFactories.computeIfAbsent(
writerKey,
factoryKey -> {
Table table =
Expand Down
Loading