diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DataConverter.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DataConverter.java new file mode 100644 index 000000000000..34da5efd940f --- /dev/null +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DataConverter.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.Map; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +/** + * {@link org.apache.iceberg.flink.sink.dynamic.DataConverter} is responsible to change the input + * data to make it compatible with the target schema. This is done when + * + * + * + *

The resolution is as follows: + * + *

+ */ +interface DataConverter { + Object convert(Object object); + + static DataConverter identity() { + return object -> object; + } + + static DataConverter getNullable(LogicalType sourceType, LogicalType targetType) { + return nullable(get(sourceType, targetType)); + } + + static DataConverter get(LogicalType sourceType, LogicalType targetType) { + switch (targetType.getTypeRoot()) { + case BOOLEAN: + case INTEGER: + case FLOAT: + case VARCHAR: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + case BINARY: + case VARBINARY: + return object -> object; + case DOUBLE: + return object -> { + if (object instanceof Float) { + return ((Float) object).doubleValue(); + } else { + return object; + } + }; + case BIGINT: + return object -> { + if (object instanceof Integer) { + return ((Integer) object).longValue(); + } else { + return object; + } + }; + case DECIMAL: + return object -> { + DecimalType toDecimalType = (DecimalType) targetType; + DecimalData decimalData = (DecimalData) object; + if (((DecimalType) sourceType).getPrecision() == toDecimalType.getPrecision()) { + return object; + } else { + return DecimalData.fromBigDecimal( + decimalData.toBigDecimal(), toDecimalType.getPrecision(), toDecimalType.getScale()); + } + }; + case TIMESTAMP_WITHOUT_TIME_ZONE: + return object -> { + if (object instanceof Integer) { + LocalDateTime dateTime = + LocalDateTime.of(LocalDate.ofEpochDay((Integer) object), LocalTime.MIN); + return TimestampData.fromLocalDateTime(dateTime); + } else { + return object; + } + }; + case ROW: + return new RowDataConverter((RowType) sourceType, (RowType) targetType); + case ARRAY: + return new ArrayConverter((ArrayType) sourceType, (ArrayType) targetType); + case MAP: + return new MapConverter((MapType) sourceType, (MapType) targetType); + default: + throw new UnsupportedOperationException("Not a supported type: " + targetType); + } + } + + static DataConverter nullable(DataConverter converter) { + return value -> value == null ? null : converter.convert(value); + } + + class RowDataConverter implements DataConverter { + private final RowData.FieldGetter[] fieldGetters; + private final DataConverter[] dataConverters; + + RowDataConverter(RowType sourceType, RowType targetType) { + this.fieldGetters = new RowData.FieldGetter[targetType.getFields().size()]; + this.dataConverters = new DataConverter[targetType.getFields().size()]; + + for (int i = 0; i < targetType.getFields().size(); i++) { + RowData.FieldGetter fieldGetter; + DataConverter dataConverter; + RowType.RowField targetField = targetType.getFields().get(i); + int sourceFieldIndex = sourceType.getFieldIndex(targetField.getName()); + if (sourceFieldIndex == -1) { + if (targetField.getType().isNullable()) { + fieldGetter = row -> null; + dataConverter = value -> null; + } else { + throw new IllegalArgumentException( + String.format( + "Field %s in target schema %s is non-nullable but does not exist in source schema.", + i + 1, targetType)); + } + } else { + RowType.RowField sourceField = sourceType.getFields().get(sourceFieldIndex); + fieldGetter = RowData.createFieldGetter(sourceField.getType(), sourceFieldIndex); + dataConverter = DataConverter.getNullable(sourceField.getType(), targetField.getType()); + } + + this.fieldGetters[i] = fieldGetter; + this.dataConverters[i] = dataConverter; + } + } + + @Override + public RowData convert(Object object) { + RowData sourceData = (RowData) object; + GenericRowData targetData = new GenericRowData(fieldGetters.length); + for (int i = 0; i < fieldGetters.length; i++) { + Object value = fieldGetters[i].getFieldOrNull(sourceData); + targetData.setField(i, dataConverters[i].convert(value)); + } + + return targetData; + } + } + + class ArrayConverter implements DataConverter { + private final ArrayData.ElementGetter elementGetter; + private final DataConverter elementConverter; + + ArrayConverter(ArrayType sourceType, ArrayType targetType) { + this.elementGetter = ArrayData.createElementGetter(sourceType.getElementType()); + this.elementConverter = + DataConverter.getNullable(sourceType.getElementType(), targetType.getElementType()); + } + + @Override + public ArrayData convert(Object object) { + ArrayData arrayData = (ArrayData) object; + Object[] convertedArray = new Object[arrayData.size()]; + for (int i = 0; i < convertedArray.length; i++) { + Object element = elementGetter.getElementOrNull(arrayData, i); + convertedArray[i] = elementConverter.convert(element); + } + + return new GenericArrayData(convertedArray); + } + } + + class MapConverter implements DataConverter { + private final ArrayData.ElementGetter keyGetter; + private final ArrayData.ElementGetter valueGetter; + private final DataConverter keyConverter; + private final DataConverter valueConverter; + + MapConverter(MapType sourceType, MapType targetType) { + this.keyGetter = ArrayData.createElementGetter(sourceType.getKeyType()); + this.valueGetter = ArrayData.createElementGetter(sourceType.getValueType()); + this.keyConverter = + DataConverter.getNullable(sourceType.getKeyType(), targetType.getKeyType()); + this.valueConverter = + DataConverter.getNullable(sourceType.getValueType(), targetType.getValueType()); + } + + @Override + public MapData convert(Object object) { + MapData sourceData = (MapData) object; + ArrayData keyArray = sourceData.keyArray(); + ArrayData valueArray = sourceData.valueArray(); + Map convertedMap = Maps.newLinkedHashMap(); + for (int i = 0; i < keyArray.size(); ++i) { + convertedMap.put( + keyConverter.convert(keyGetter.getElementOrNull(keyArray, i)), + valueConverter.convert(valueGetter.getElementOrNull(valueArray, i))); + } + + return new GenericMapData(convertedMap); + } + } +} diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java index 5591cbe4ffc4..9547de78d6ba 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java @@ -188,6 +188,7 @@ public static class Builder { private boolean immediateUpdate = false; private int cacheMaximumSize = 100; private long cacheRefreshMs = 1_000; + private int inputSchemasPerTableCacheMaximumSize = 10; Builder() {} @@ -315,6 +316,16 @@ public Builder cacheRefreshMs(long refreshMs) { return this; } + /** + * Maximum input {@link org.apache.iceberg.Schema} objects to cache per each Iceberg table. The + * cache improves Dynamic Sink performance by storing {@link org.apache.iceberg.Schema} + * comparison results. + */ + public Builder inputSchemasPerTableCacheMaxSize(int inputSchemasPerTableCacheMaxSize) { + this.inputSchemasPerTableCacheMaximumSize = inputSchemasPerTableCacheMaxSize; + return this; + } + private String operatorName(String suffix) { return uidPrefix != null ? uidPrefix + "-" + suffix : suffix; } @@ -358,7 +369,12 @@ public DataStreamSink append() { input .process( new DynamicRecordProcessor<>( - generator, catalogLoader, immediateUpdate, cacheMaximumSize, cacheRefreshMs)) + generator, + catalogLoader, + immediateUpdate, + cacheMaximumSize, + cacheRefreshMs, + inputSchemasPerTableCacheMaximumSize)) .uid(prefixIfNotNull(uidPrefix, "-generator")) .name(operatorName("generator")) .returns(type); @@ -370,7 +386,12 @@ public DataStreamSink append() { DynamicRecordProcessor.DYNAMIC_TABLE_UPDATE_STREAM, new DynamicRecordInternalType(catalogLoader, true, cacheMaximumSize))) .keyBy((KeySelector) DynamicRecordInternal::tableName) - .map(new DynamicTableUpdateOperator(catalogLoader, cacheMaximumSize, cacheRefreshMs)) + .map( + new DynamicTableUpdateOperator( + catalogLoader, + cacheMaximumSize, + cacheRefreshMs, + inputSchemasPerTableCacheMaximumSize)) .uid(prefixIfNotNull(uidPrefix, "-updater")) .name(operatorName("Updater")) .returns(type) diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java index d6c33c95e75b..166217a0140e 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java @@ -22,7 +22,6 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.table.data.RowData; import org.apache.flink.util.Collector; @@ -43,6 +42,7 @@ class DynamicRecordProcessor extends ProcessFunction extends ProcessFunction foundSchema = + TableMetadataCache.ResolvedSchemaInfo foundSchema = exists ? tableCache.schema(data.tableIdentifier(), data.schema()) : TableMetadataCache.NOT_FOUND; @@ -107,16 +111,23 @@ public void collect(DynamicRecord data) { if (!exists || foundBranch == null || foundSpec == null - || foundSchema.f1 == CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED) { + || foundSchema.compareResult() == CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED) { if (immediateUpdate) { - Tuple3 newData = + Tuple2 newData = updater.update(data.tableIdentifier(), data.branch(), data.schema(), data.spec()); - emit(collector, data, newData.f0, newData.f1, newData.f2); + emit( + collector, + data, + newData.f0.resolvedTableSchema(), + newData.f0.recordConverter(), + newData.f1); } else { int writerKey = hashKeyGenerator.generateKey( data, - foundSchema.f0 != null ? foundSchema.f0 : data.schema(), + foundSchema.resolvedTableSchema() != null + ? foundSchema.resolvedTableSchema() + : data.schema(), foundSpec != null ? foundSpec : data.spec(), data.rowData()); context.output( @@ -132,7 +143,12 @@ public void collect(DynamicRecord data) { DynamicSinkUtil.getEqualityFieldIds(data.equalityFields(), data.schema()))); } } else { - emit(collector, data, foundSchema.f0, foundSchema.f1, foundSpec); + emit( + collector, + data, + foundSchema.resolvedTableSchema(), + foundSchema.recordConverter(), + foundSpec); } } @@ -140,12 +156,9 @@ private void emit( Collector out, DynamicRecord data, Schema schema, - CompareSchemasVisitor.Result result, + DataConverter recordConverter, PartitionSpec spec) { - RowData rowData = - result == CompareSchemasVisitor.Result.SAME - ? data.rowData() - : RowDataEvolver.convert(data.rowData(), data.schema(), schema); + RowData rowData = (RowData) recordConverter.convert(data.rowData()); int writerKey = hashKeyGenerator.generateKey(data, schema, spec, rowData); String tableName = data.tableIdentifier().toString(); out.collect( diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java index c37532714d48..6057d773c3f0 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java @@ -21,10 +21,9 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.table.data.RowData; import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.flink.CatalogLoader; @@ -41,13 +40,19 @@ class DynamicTableUpdateOperator private final CatalogLoader catalogLoader; private final int cacheMaximumSize; private final long cacheRefreshMs; + private final int inputSchemasPerTableCacheMaximumSize; + private transient TableUpdater updater; DynamicTableUpdateOperator( - CatalogLoader catalogLoader, int cacheMaximumSize, long cacheRefreshMs) { + CatalogLoader catalogLoader, + int cacheMaximumSize, + long cacheRefreshMs, + int inputSchemasPerTableCacheMaximumSize) { this.catalogLoader = catalogLoader; this.cacheMaximumSize = cacheMaximumSize; this.cacheRefreshMs = cacheRefreshMs; + this.inputSchemasPerTableCacheMaximumSize = inputSchemasPerTableCacheMaximumSize; } @Override @@ -56,22 +61,23 @@ public void open(OpenContext openContext) throws Exception { Catalog catalog = catalogLoader.loadCatalog(); this.updater = new TableUpdater( - new TableMetadataCache(catalog, cacheMaximumSize, cacheRefreshMs), catalog); + new TableMetadataCache( + catalog, cacheMaximumSize, cacheRefreshMs, inputSchemasPerTableCacheMaximumSize), + catalog); } @Override public DynamicRecordInternal map(DynamicRecordInternal data) throws Exception { - Tuple3 newData = + Tuple2 newData = updater.update( TableIdentifier.parse(data.tableName()), data.branch(), data.schema(), data.spec()); + TableMetadataCache.ResolvedSchemaInfo compareInfo = newData.f0; - data.setSchema(newData.f0); - data.setSpec(newData.f2); + data.setSchema(compareInfo.resolvedTableSchema()); + data.setSpec(newData.f1); - if (newData.f1 == CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED) { - RowData newRowData = RowDataEvolver.convert(data.rowData(), data.schema(), newData.f0); - data.setRowData(newRowData); - } + RowData newRowData = (RowData) newData.f0.recordConverter().convert(data.rowData()); + data.setRowData(newRowData); return data; } diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/LRUCache.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/LRUCache.java new file mode 100644 index 000000000000..bb1e17405377 --- /dev/null +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/LRUCache.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.function.Consumer; + +/** + * A performant, fixed size least recently used (LRU) cache implementation. + * + *

This cache has O(1) time complexity for get/put operations and provides eviction notifications + * when entries are removed due to size constraints. It offers better performance than similarly + * configured Caffeine caches, making it ideal for hot path operations. + * + *

This implementation extends {@link LinkedHashMap} with access-order traversal and automated + * removal of least recently used entries when the maximum size is reached. + */ +@SuppressWarnings("checkstyle:IllegalType") +class LRUCache extends LinkedHashMap { + /** Defaults from {@link java.util.HashMap} */ + private static final int DEFAULT_INITIAL_CAPACITY = 1 << 4; + + private static final float DEFAULT_LOAD_FACTOR = 0.75f; + + private final int maximumSize; + private final Consumer> evictionCallback; + + LRUCache(int maximumSize, Consumer> evictionCallback) { + super(Math.min(maximumSize, DEFAULT_INITIAL_CAPACITY), DEFAULT_LOAD_FACTOR, true); + this.maximumSize = maximumSize; + this.evictionCallback = evictionCallback; + } + + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + boolean remove = size() > maximumSize; + if (remove) { + evictionCallback.accept(eldest); + } + + return remove; + } +} diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/RowDataEvolver.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/RowDataEvolver.java deleted file mode 100644 index fe670c54ebd2..000000000000 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/RowDataEvolver.java +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.flink.sink.dynamic; - -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.LocalTime; -import java.util.List; -import java.util.Map; -import org.apache.flink.table.data.ArrayData; -import org.apache.flink.table.data.DecimalData; -import org.apache.flink.table.data.GenericArrayData; -import org.apache.flink.table.data.GenericMapData; -import org.apache.flink.table.data.GenericRowData; -import org.apache.flink.table.data.MapData; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.data.TimestampData; -import org.apache.flink.table.types.logical.ArrayType; -import org.apache.flink.table.types.logical.DecimalType; -import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.MapType; -import org.apache.flink.table.types.logical.RowType; -import org.apache.iceberg.Schema; -import org.apache.iceberg.flink.FlinkSchemaUtil; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; - -/** - * A RowDataEvolver is responsible to change the input RowData to make it compatible with the target - * schema. This is done when - * - *

    - *
  1. The input schema has fewer fields than the target schema. - *
  2. The table types are wider than the input type. - *
  3. The field order differs for source and target schema. - *
- * - *

The resolution is as follows: - * - *

    - *
  1. In the first case, we would add a null values for the missing field (if the field is - * optional). - *
  2. In the second case, we would convert the data for the input field to a wider type, e.g. int - * (input type) => long (table type). - *
  3. In the third case, we would rearrange the input data to match the target table. - *
- */ -class RowDataEvolver { - private RowDataEvolver() {} - - public static RowData convert(RowData sourceData, Schema sourceSchema, Schema targetSchema) { - return convertStruct( - sourceData, FlinkSchemaUtil.convert(sourceSchema), FlinkSchemaUtil.convert(targetSchema)); - } - - private static Object convert(Object object, LogicalType sourceType, LogicalType targetType) { - if (object == null) { - return null; - } - - switch (targetType.getTypeRoot()) { - case BOOLEAN: - case INTEGER: - case FLOAT: - case VARCHAR: - case DATE: - case TIME_WITHOUT_TIME_ZONE: - case TIMESTAMP_WITH_LOCAL_TIME_ZONE: - case BINARY: - case VARBINARY: - return object; - case DOUBLE: - if (object instanceof Float) { - return ((Float) object).doubleValue(); - } else { - return object; - } - case BIGINT: - if (object instanceof Integer) { - return ((Integer) object).longValue(); - } else { - return object; - } - case DECIMAL: - DecimalType toDecimalType = (DecimalType) targetType; - DecimalData decimalData = (DecimalData) object; - if (((DecimalType) sourceType).getPrecision() == toDecimalType.getPrecision()) { - return object; - } else { - return DecimalData.fromBigDecimal( - decimalData.toBigDecimal(), toDecimalType.getPrecision(), toDecimalType.getScale()); - } - case TIMESTAMP_WITHOUT_TIME_ZONE: - if (object instanceof Integer) { - LocalDateTime dateTime = - LocalDateTime.of(LocalDate.ofEpochDay((Integer) object), LocalTime.MIN); - return TimestampData.fromLocalDateTime(dateTime); - } else { - return object; - } - case ROW: - return convertStruct((RowData) object, (RowType) sourceType, (RowType) targetType); - case ARRAY: - return convertArray((ArrayData) object, (ArrayType) sourceType, (ArrayType) targetType); - case MAP: - return convertMap((MapData) object, (MapType) sourceType, (MapType) targetType); - default: - throw new UnsupportedOperationException("Not a supported type: " + targetType); - } - } - - private static RowData convertStruct(RowData sourceData, RowType sourceType, RowType targetType) { - GenericRowData targetData = new GenericRowData(targetType.getFields().size()); - List targetFields = targetType.getFields(); - for (int i = 0; i < targetFields.size(); i++) { - RowType.RowField targetField = targetFields.get(i); - - int sourceFieldId = sourceType.getFieldIndex(targetField.getName()); - if (sourceFieldId == -1) { - if (targetField.getType().isNullable()) { - targetData.setField(i, null); - } else { - throw new IllegalArgumentException( - String.format( - "Field %s in target schema %s is non-nullable but does not exist in source schema.", - i + 1, targetType)); - } - } else { - RowData.FieldGetter getter = - RowData.createFieldGetter(sourceType.getTypeAt(sourceFieldId), sourceFieldId); - targetData.setField( - i, - convert( - getter.getFieldOrNull(sourceData), - sourceType.getFields().get(sourceFieldId).getType(), - targetField.getType())); - } - } - - return targetData; - } - - private static ArrayData convertArray( - ArrayData sourceData, ArrayType sourceType, ArrayType targetType) { - LogicalType fromElementType = sourceType.getElementType(); - LogicalType toElementType = targetType.getElementType(); - ArrayData.ElementGetter elementGetter = ArrayData.createElementGetter(fromElementType); - Object[] convertedArray = new Object[sourceData.size()]; - for (int i = 0; i < convertedArray.length; i++) { - convertedArray[i] = - convert(elementGetter.getElementOrNull(sourceData, i), fromElementType, toElementType); - } - - return new GenericArrayData(convertedArray); - } - - private static MapData convertMap(MapData sourceData, MapType sourceType, MapType targetType) { - LogicalType fromMapKeyType = sourceType.getKeyType(); - LogicalType fromMapValueType = sourceType.getValueType(); - LogicalType toMapKeyType = targetType.getKeyType(); - LogicalType toMapValueType = targetType.getValueType(); - ArrayData keyArray = sourceData.keyArray(); - ArrayData valueArray = sourceData.valueArray(); - ArrayData.ElementGetter keyGetter = ArrayData.createElementGetter(fromMapKeyType); - ArrayData.ElementGetter valueGetter = ArrayData.createElementGetter(fromMapValueType); - Map convertedMap = Maps.newLinkedHashMap(); - for (int i = 0; i < keyArray.size(); ++i) { - convertedMap.put( - convert(keyGetter.getElementOrNull(keyArray, i), fromMapKeyType, toMapKeyType), - convert(valueGetter.getElementOrNull(valueArray, i), fromMapValueType, toMapValueType)); - } - - return new GenericMapData(convertedMap); - } -} diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java index e9c77ea8c809..479982bc41ba 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java @@ -20,7 +20,6 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; -import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; import org.apache.flink.annotation.Internal; @@ -32,6 +31,7 @@ import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.flink.FlinkSchemaUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,19 +44,22 @@ class TableMetadataCache { private static final Logger LOG = LoggerFactory.getLogger(TableMetadataCache.class); - private static final int MAX_SCHEMA_COMPARISON_RESULTS_TO_CACHE = 10; private static final Tuple2 EXISTS = Tuple2.of(true, null); private static final Tuple2 NOT_EXISTS = Tuple2.of(false, null); - static final Tuple2 NOT_FOUND = - Tuple2.of(null, CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); + static final ResolvedSchemaInfo NOT_FOUND = + new ResolvedSchemaInfo( + null, CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED, DataConverter.identity()); private final Catalog catalog; private final long refreshMs; + private final int inputSchemasPerTableCacheMaximumSize; private final Cache cache; - TableMetadataCache(Catalog catalog, int maximumSize, long refreshMs) { + TableMetadataCache( + Catalog catalog, int maximumSize, long refreshMs, int inputSchemasPerTableCacheMaximumSize) { this.catalog = catalog; this.refreshMs = refreshMs; + this.inputSchemasPerTableCacheMaximumSize = inputSchemasPerTableCacheMaximumSize; this.cache = Caffeine.newBuilder().maximumSize(maximumSize).build(); } @@ -75,7 +78,7 @@ String branch(TableIdentifier identifier, String branch) { return branch(identifier, branch, true); } - Tuple2 schema(TableIdentifier identifier, Schema input) { + ResolvedSchemaInfo schema(TableIdentifier identifier, Schema input) { return schema(identifier, input, true); } @@ -86,7 +89,12 @@ PartitionSpec spec(TableIdentifier identifier, PartitionSpec spec) { void update(TableIdentifier identifier, Table table) { cache.put( identifier, - new CacheItem(true, table.refs().keySet(), new SchemaInfo(table.schemas()), table.specs())); + new CacheItem( + true, + table.refs().keySet(), + table.schemas(), + table.specs(), + inputSchemasPerTableCacheMaximumSize)); } private String branch(TableIdentifier identifier, String branch, boolean allowRefresh) { @@ -103,7 +111,7 @@ private String branch(TableIdentifier identifier, String branch, boolean allowRe } } - private Tuple2 schema( + private ResolvedSchemaInfo schema( TableIdentifier identifier, Schema input, boolean allowRefresh) { CacheItem cached = cache.getIfPresent(identifier); Schema compatible = null; @@ -112,19 +120,21 @@ private Tuple2 schema( // and a new schema. Performance is paramount as this code is on the hot path. Every other // way for comparing 2 schemas were performing worse than the // {@link CompareByNameVisitor#visit(Schema, Schema, boolean)}, so caching was useless. - Tuple2 lastResult = - cached.schema.lastResults.get(input); + ResolvedSchemaInfo lastResult = cached.inputSchemas.get(input); if (lastResult != null) { return lastResult; } - for (Map.Entry tableSchema : cached.schema.schemas.entrySet()) { + for (Map.Entry tableSchema : cached.tableSchemas.entrySet()) { CompareSchemasVisitor.Result result = CompareSchemasVisitor.visit(input, tableSchema.getValue(), true); if (result == CompareSchemasVisitor.Result.SAME) { - Tuple2 newResult = - Tuple2.of(tableSchema.getValue(), CompareSchemasVisitor.Result.SAME); - cached.schema.update(input, newResult); + ResolvedSchemaInfo newResult = + new ResolvedSchemaInfo( + tableSchema.getValue(), + CompareSchemasVisitor.Result.SAME, + DataConverter.identity()); + cached.inputSchemas.put(input, newResult); return newResult; } else if (compatible == null && result == CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED) { @@ -137,12 +147,16 @@ private Tuple2 schema( refreshTable(identifier); return schema(identifier, input, false); } else if (compatible != null) { - Tuple2 newResult = - Tuple2.of(compatible, CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED); - cached.schema.update(input, newResult); + ResolvedSchemaInfo newResult = + new ResolvedSchemaInfo( + compatible, + CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED, + DataConverter.get( + FlinkSchemaUtil.convert(input), FlinkSchemaUtil.convert(compatible))); + cached.inputSchemas.put(input, newResult); return newResult; } else if (cached != null && cached.tableExists) { - cached.schema.update(input, NOT_FOUND); + cached.inputSchemas.put(input, NOT_FOUND); return NOT_FOUND; } else { return NOT_FOUND; @@ -174,7 +188,7 @@ private Tuple2 refreshTable(TableIdentifier identifier) { return EXISTS; } catch (NoSuchTableException e) { LOG.debug("Table doesn't exist {}", identifier, e); - cache.put(identifier, new CacheItem(false, null, null, null)); + cache.put(identifier, new CacheItem(false, null, null, null, 1)); return Tuple2.of(false, e); } } @@ -194,63 +208,62 @@ static class CacheItem { private final boolean tableExists; private final Set branches; - private final SchemaInfo schema; + private final Map tableSchemas; private final Map specs; + private final LRUCache inputSchemas; private CacheItem( boolean tableExists, Set branches, - SchemaInfo schema, - Map specs) { + Map tableSchemas, + Map specs, + int inputSchemaCacheMaximumSize) { this.tableExists = tableExists; this.branches = branches; - this.schema = schema; + this.tableSchemas = tableSchemas; this.specs = specs; + this.inputSchemas = + new LRUCache<>(inputSchemaCacheMaximumSize, CacheItem::inputSchemaEvictionListener); + } + + private static void inputSchemaEvictionListener( + Map.Entry evictedEntry) { + LOG.warn( + "Performance degraded as records with different schema is generated for the same table. " + + "Likely the DynamicRecord.schema is not reused. " + + "Reuse the same instance if the record schema is the same to improve performance"); } @VisibleForTesting - SchemaInfo getSchemaInfo() { - return schema; + Map inputSchemas() { + return inputSchemas; } } - /** - * Stores precalculated results for {@link CompareSchemasVisitor#visit(Schema, Schema, boolean)} - * in the cache. - */ - static class SchemaInfo { - private final Map schemas; - private final Map> lastResults; + static class ResolvedSchemaInfo { + private final Schema resolvedTableSchema; + private final CompareSchemasVisitor.Result compareResult; + private final DataConverter recordConverter; - private SchemaInfo(Map schemas) { - this.schemas = schemas; - this.lastResults = new LimitedLinkedHashMap<>(); + ResolvedSchemaInfo( + Schema tableSchema, + CompareSchemasVisitor.Result compareResult, + DataConverter recordConverter) { + this.resolvedTableSchema = tableSchema; + this.compareResult = compareResult; + this.recordConverter = recordConverter; } - private void update( - Schema newLastSchema, Tuple2 newLastResult) { - lastResults.put(newLastSchema, newLastResult); + Schema resolvedTableSchema() { + return resolvedTableSchema; } - @VisibleForTesting - Tuple2 getLastResult(Schema schema) { - return lastResults.get(schema); + CompareSchemasVisitor.Result compareResult() { + return compareResult; } - } - - @SuppressWarnings("checkstyle:IllegalType") - private static class LimitedLinkedHashMap extends LinkedHashMap { - @Override - protected boolean removeEldestEntry(Map.Entry eldest) { - boolean remove = size() > MAX_SCHEMA_COMPARISON_RESULTS_TO_CACHE; - if (remove) { - LOG.warn( - "Performance degraded as records with different schema is generated for the same table. " - + "Likely the DynamicRecord.schema is not reused. " - + "Reuse the same instance if the record schema is the same to improve performance"); - } - return remove; + DataConverter recordConverter() { + return recordConverter; } } diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java index 40bb66f65125..fdd182830b2c 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java @@ -32,6 +32,7 @@ import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.flink.FlinkSchemaUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,14 +55,14 @@ class TableUpdater { * @return a {@link Tuple3} of the new {@link Schema}, the status of the schema compared to the * requested one, and the new {@link PartitionSpec#specId()}. */ - Tuple3 update( + Tuple2 update( TableIdentifier tableIdentifier, String branch, Schema schema, PartitionSpec spec) { findOrCreateTable(tableIdentifier, schema, spec); findOrCreateBranch(tableIdentifier, branch); - Tuple2 newSchema = + TableMetadataCache.ResolvedSchemaInfo newSchemaInfo = findOrCreateSchema(tableIdentifier, schema); PartitionSpec newSpec = findOrCreateSpec(tableIdentifier, spec); - return Tuple3.of(newSchema.f0, newSchema.f1, newSpec); + return Tuple2.of(newSchemaInfo, newSpec); } private void findOrCreateTable(TableIdentifier identifier, Schema schema, PartitionSpec spec) { @@ -110,10 +111,10 @@ private void findOrCreateBranch(TableIdentifier identifier, String branch) { } } - private Tuple2 findOrCreateSchema( + private TableMetadataCache.ResolvedSchemaInfo findOrCreateSchema( TableIdentifier identifier, Schema schema) { - Tuple2 fromCache = cache.schema(identifier, schema); - if (fromCache.f1 != CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED) { + TableMetadataCache.ResolvedSchemaInfo fromCache = cache.schema(identifier, schema); + if (fromCache.compareResult() != CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED) { return fromCache; } else { Table table = catalog.loadTable(identifier); @@ -121,9 +122,16 @@ private Tuple2 findOrCreateSchema( CompareSchemasVisitor.Result result = CompareSchemasVisitor.visit(schema, tableSchema, true); switch (result) { case SAME: + cache.update(identifier, table); + return new TableMetadataCache.ResolvedSchemaInfo( + tableSchema, result, DataConverter.identity()); case DATA_CONVERSION_NEEDED: cache.update(identifier, table); - return Tuple2.of(tableSchema, result); + return new TableMetadataCache.ResolvedSchemaInfo( + tableSchema, + result, + DataConverter.get( + FlinkSchemaUtil.convert(schema), FlinkSchemaUtil.convert(tableSchema))); case SCHEMA_UPDATE_NEEDED: LOG.info( "Triggering schema update for table {} {} to {}", identifier, tableSchema, schema); @@ -133,16 +141,15 @@ private Tuple2 findOrCreateSchema( try { updateApi.commit(); cache.update(identifier, table); - Tuple2 comparisonAfterMigration = + TableMetadataCache.ResolvedSchemaInfo comparisonAfterMigration = cache.schema(identifier, schema); - Schema newSchema = comparisonAfterMigration.f0; + Schema newSchema = comparisonAfterMigration.resolvedTableSchema(); LOG.info("Table {} schema updated from {} to {}", identifier, tableSchema, newSchema); return comparisonAfterMigration; } catch (CommitFailedException e) { cache.invalidate(identifier); - Tuple2 newSchema = - cache.schema(identifier, schema); - if (newSchema.f1 != CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED) { + TableMetadataCache.ResolvedSchemaInfo newSchema = cache.schema(identifier, schema); + if (newSchema.compareResult() != CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED) { LOG.debug("Table {} schema updated concurrently to {}", identifier, schema); return newSchema; } else { diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java index 22b6476b9c25..01e2c440df67 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java @@ -52,13 +52,17 @@ class TestDynamicTableUpdateOperator { void testDynamicTableUpdateOperatorNewTable() throws Exception { int cacheMaximumSize = 10; int cacheRefreshMs = 1000; + int inputSchemaCacheMaximumSize = 10; Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier table = TableIdentifier.of(TABLE); assertThat(catalog.tableExists(table)).isFalse(); DynamicTableUpdateOperator operator = new DynamicTableUpdateOperator( - CATALOG_EXTENSION.catalogLoader(), cacheMaximumSize, cacheRefreshMs); + CATALOG_EXTENSION.catalogLoader(), + cacheMaximumSize, + cacheRefreshMs, + inputSchemaCacheMaximumSize); operator.open((OpenContext) null); DynamicRecordInternal input = @@ -81,12 +85,16 @@ void testDynamicTableUpdateOperatorNewTable() throws Exception { void testDynamicTableUpdateOperatorSchemaChange() throws Exception { int cacheMaximumSize = 10; int cacheRefreshMs = 1000; + int inputSchemaCacheMaximumSize = 10; Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier table = TableIdentifier.of(TABLE); DynamicTableUpdateOperator operator = new DynamicTableUpdateOperator( - CATALOG_EXTENSION.catalogLoader(), cacheMaximumSize, cacheRefreshMs); + CATALOG_EXTENSION.catalogLoader(), + cacheMaximumSize, + cacheRefreshMs, + inputSchemaCacheMaximumSize); operator.open((OpenContext) null); catalog.createTable(table, SCHEMA1); diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestLRUCache.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestLRUCache.java new file mode 100644 index 000000000000..59a020a496b5 --- /dev/null +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestLRUCache.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.jupiter.api.Test; + +class TestLRUCache { + private static final Consumer> NO_OP_CALLBACK = ignored -> {}; + + @Test + void testPut() { + LRUCache cache = new LRUCache<>(1, NO_OP_CALLBACK); + cache.put(1, 1); + + assertThat(cache.size()).isEqualTo(1); + assertThat(cache).containsExactlyEntriesOf(Map.of(1, 1)); + } + + @Test + void testGet() { + LRUCache cache = new LRUCache<>(1, NO_OP_CALLBACK); + cache.put(1, 123); + + assertThat(cache.size()).isEqualTo(1); + assertThat(cache.get(1)).isEqualTo(123); + } + + @Test + void testElementEviction() { + int maxSize = 2; + LRUCache cache = new LRUCache<>(maxSize, NO_OP_CALLBACK); + + cache.put(1, 1); + cache.put(2, 2); + Integer value = cache.get(1); + assertThat(value).isEqualTo(1); + + cache.put(3, 3); // "2" should be evicted + + assertThat(cache.size()).isEqualTo(2); + assertThat(cache).containsExactly(Map.entry(1, 1), Map.entry(3, 3)); + } + + @Test + void testEvictionCallback() { + int maxSize = 2; + TestEvictionCallback callback = new TestEvictionCallback(); + LRUCache cache = new LRUCache<>(maxSize, callback); + + cache.put(1, 1); + cache.put(2, 2); + Integer value = cache.get(1); + assertThat(value).isEqualTo(1); + + cache.put(3, 3); // "2" should be evicted + + assertThat(callback.evictedEntries).containsExactly(Map.entry(2, 2)); + } + + private static class TestEvictionCallback implements Consumer> { + private final List> evictedEntries = Lists.newArrayList(); + + @Override + public void accept(Map.Entry entry) { + evictedEntries.add(entry); + } + } +} diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestRowDataEvolver.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestRowDataConverter.java similarity index 88% rename from flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestRowDataEvolver.java rename to flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestRowDataConverter.java index 2553575f1893..73f36414f5eb 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestRowDataEvolver.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestRowDataConverter.java @@ -34,6 +34,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.flink.DataGenerator; import org.apache.iceberg.flink.DataGenerators; +import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.SimpleDataUtil; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.types.Types; @@ -42,7 +43,7 @@ import org.joda.time.Days; import org.junit.jupiter.api.Test; -class TestRowDataEvolver { +class TestRowDataConverter { static final Schema SCHEMA = new Schema( @@ -59,7 +60,7 @@ class TestRowDataEvolver { void testPrimitiveTypes() { DataGenerator generator = new DataGenerators.Primitives(); assertThat( - RowDataEvolver.convert( + convert( generator.generateFlinkRowData(), generator.icebergSchema(), generator.icebergSchema())) @@ -68,7 +69,7 @@ void testPrimitiveTypes() { @Test void testAddColumn() { - assertThat(RowDataEvolver.convert(SimpleDataUtil.createRowData(1, "a"), SCHEMA, SCHEMA2)) + assertThat(convert(SimpleDataUtil.createRowData(1, "a"), SCHEMA, SCHEMA2)) .isEqualTo(GenericRowData.of(1, StringData.fromString("a"), null)); } @@ -82,7 +83,7 @@ void testAddRequiredColumn() { assertThrows( IllegalArgumentException.class, - () -> RowDataEvolver.convert(GenericRowData.of(42), currentSchema, targetSchema)); + () -> convert(GenericRowData.of(42), currentSchema, targetSchema)); } @Test @@ -92,9 +93,7 @@ void testIntToLong() { Types.NestedField.optional(2, "id", Types.LongType.get()), Types.NestedField.optional(4, "data", Types.StringType.get())); - assertThat( - RowDataEvolver.convert( - SimpleDataUtil.createRowData(1, "a"), SimpleDataUtil.SCHEMA, schemaWithLong)) + assertThat(convert(SimpleDataUtil.createRowData(1, "a"), SimpleDataUtil.SCHEMA, schemaWithLong)) .isEqualTo(GenericRowData.of(1L, StringData.fromString("a"))); } @@ -105,7 +104,7 @@ void testFloatToDouble() { Schema schemaWithDouble = new Schema(Types.NestedField.optional(2, "float2double", Types.DoubleType.get())); - assertThat(RowDataEvolver.convert(GenericRowData.of(1.5f), schemaWithFloat, schemaWithDouble)) + assertThat(convert(GenericRowData.of(1.5f), schemaWithFloat, schemaWithDouble)) .isEqualTo(GenericRowData.of(1.5d)); } @@ -121,7 +120,7 @@ void testDateToTimestamp() { int days = Days.daysBetween(new DateTime(1970, 1, 1, 0, 0, 0, 0, DateTimeZone.UTC), time).getDays(); - assertThat(RowDataEvolver.convert(GenericRowData.of(days), schemaWithFloat, schemaWithDouble)) + assertThat(convert(GenericRowData.of(days), schemaWithFloat, schemaWithDouble)) .isEqualTo(GenericRowData.of(TimestampData.fromEpochMillis(time.getMillis()))); } @@ -133,7 +132,7 @@ void testIncreasePrecision() { new Schema(Types.NestedField.required(14, "decimal_field", Types.DecimalType.of(10, 2))); assertThat( - RowDataEvolver.convert( + convert( GenericRowData.of(DecimalData.fromBigDecimal(new BigDecimal("-1.50"), 9, 2)), before, after)) @@ -161,7 +160,7 @@ void testStructAddOptionalFields() { StringData.fromString("row_id_value"), GenericRowData.of(1, null, StringData.fromString("Jane"))); - assertThat(RowDataEvolver.convert(oldData, oldSchema, newSchema)).isEqualTo(newData); + assertThat(convert(oldData, oldSchema, newSchema)).isEqualTo(newData); } @Test @@ -188,7 +187,7 @@ void testStructAddRequiredFieldsWithOptionalRoot() { RowData expectedData = GenericRowData.of(StringData.fromString("row_id_value"), null); - assertThat(RowDataEvolver.convert(oldData, oldSchema, newSchema)).isEqualTo(expectedData); + assertThat(convert(oldData, oldSchema, newSchema)).isEqualTo(expectedData); } @Test @@ -208,9 +207,7 @@ void testStructAddRequiredFields() { required(103, "required", Types.StringType.get()), required(102, "name", Types.StringType.get())))); - assertThrows( - IllegalArgumentException.class, - () -> RowDataEvolver.convert(oldData, oldSchema, newSchema)); + assertThrows(IllegalArgumentException.class, () -> convert(oldData, oldSchema, newSchema)); } @Test @@ -233,7 +230,7 @@ void testMap() { ImmutableMap.of( StringData.fromString("Jane"), 1L, StringData.fromString("Joe"), 2L))); - assertThat(RowDataEvolver.convert(oldData, oldSchema, newSchema)).isEqualTo(newData); + assertThat(convert(oldData, oldSchema, newSchema)).isEqualTo(newData); } @Test @@ -251,6 +248,13 @@ void testArray() { GenericRowData.of( StringData.fromString("row_id_value"), new GenericArrayData(new Long[] {1L, 2L, 3L})); - assertThat(RowDataEvolver.convert(oldData, oldSchema, newSchema)).isEqualTo(newData); + assertThat(convert(oldData, oldSchema, newSchema)).isEqualTo(newData); + } + + private static RowData convert(RowData sourceData, Schema sourceSchema, Schema targetSchema) { + return (RowData) + DataConverter.get( + FlinkSchemaUtil.convert(sourceSchema), FlinkSchemaUtil.convert(targetSchema)) + .convert(sourceData); } } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java index cedae887041e..5d222a212fff 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java @@ -47,17 +47,19 @@ void testCaching() { Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable"); catalog.createTable(tableIdentifier, SCHEMA); - TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE); + TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); - Schema schema1 = cache.schema(tableIdentifier, SCHEMA).f0; + Schema schema1 = cache.schema(tableIdentifier, SCHEMA).resolvedTableSchema(); assertThat(schema1.sameSchema(SCHEMA)).isTrue(); - assertThat(cache.schema(tableIdentifier, SerializationUtils.clone(SCHEMA)).f0) + assertThat( + cache.schema(tableIdentifier, SerializationUtils.clone(SCHEMA)).resolvedTableSchema()) .isEqualTo(schema1); assertThat(cache.schema(tableIdentifier, SCHEMA2)).isEqualTo(TableMetadataCache.NOT_FOUND); - schema1 = cache.schema(tableIdentifier, SCHEMA).f0; - assertThat(cache.schema(tableIdentifier, SerializationUtils.clone(SCHEMA)).f0) + schema1 = cache.schema(tableIdentifier, SCHEMA).resolvedTableSchema(); + assertThat( + cache.schema(tableIdentifier, SerializationUtils.clone(SCHEMA)).resolvedTableSchema()) .isEqualTo(schema1); } @@ -66,17 +68,17 @@ void testCacheInvalidationAfterSchemaChange() { Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable"); catalog.createTable(tableIdentifier, SCHEMA); - TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE); + TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); TableUpdater tableUpdater = new TableUpdater(cache, catalog); - Schema schema1 = cache.schema(tableIdentifier, SCHEMA).f0; + Schema schema1 = cache.schema(tableIdentifier, SCHEMA).resolvedTableSchema(); assertThat(schema1.sameSchema(SCHEMA)).isTrue(); catalog.dropTable(tableIdentifier); catalog.createTable(tableIdentifier, SCHEMA2); tableUpdater.update(tableIdentifier, "main", SCHEMA2, PartitionSpec.unpartitioned()); - Schema schema2 = cache.schema(tableIdentifier, SCHEMA2).f0; + Schema schema2 = cache.schema(tableIdentifier, SCHEMA2).resolvedTableSchema(); assertThat(schema2.sameSchema(SCHEMA2)).isTrue(); } @@ -85,7 +87,7 @@ void testCachingDisabled() { Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable"); catalog.createTable(tableIdentifier, SCHEMA); - TableMetadataCache cache = new TableMetadataCache(catalog, 0, Long.MAX_VALUE); + TableMetadataCache cache = new TableMetadataCache(catalog, 0, Long.MAX_VALUE, 10); // Cleanup routine doesn't run after every write cache.getInternalCache().cleanUp(); diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java index f7a3596462f1..9099fd1f3ae6 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java @@ -21,7 +21,6 @@ import static org.assertj.core.api.Assertions.assertThat; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; @@ -48,22 +47,21 @@ public class TestTableUpdater extends TestFlinkIcebergSinkBase { void testTableCreation() { Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier tableIdentifier = TableIdentifier.parse("myTable"); - TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE); + TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); TableUpdater tableUpdater = new TableUpdater(cache, catalog); tableUpdater.update(tableIdentifier, "main", SCHEMA, PartitionSpec.unpartitioned()); assertThat(catalog.tableExists(tableIdentifier)).isTrue(); - Tuple2 cachedSchema = - cache.schema(tableIdentifier, SCHEMA); - assertThat(cachedSchema.f0.sameSchema(SCHEMA)).isTrue(); + TableMetadataCache.ResolvedSchemaInfo cachedSchema = cache.schema(tableIdentifier, SCHEMA); + assertThat(cachedSchema.resolvedTableSchema().sameSchema(SCHEMA)).isTrue(); } @Test void testTableAlreadyExists() { Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier tableIdentifier = TableIdentifier.parse("myTable"); - TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE); + TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); TableUpdater tableUpdater = new TableUpdater(cache, catalog); // Make the table non-existent in cache @@ -71,18 +69,18 @@ void testTableAlreadyExists() { // Create the table catalog.createTable(tableIdentifier, SCHEMA); // Make sure that the cache is invalidated and the table refreshed without an error - Tuple3 result = + Tuple2 result = tableUpdater.update(tableIdentifier, "main", SCHEMA, PartitionSpec.unpartitioned()); - assertThat(result.f0.sameSchema(SCHEMA)).isTrue(); - assertThat(result.f1).isEqualTo(CompareSchemasVisitor.Result.SAME); - assertThat(result.f2).isEqualTo(PartitionSpec.unpartitioned()); + assertThat(result.f0.resolvedTableSchema().sameSchema(SCHEMA)).isTrue(); + assertThat(result.f0.compareResult()).isEqualTo(CompareSchemasVisitor.Result.SAME); + assertThat(result.f1).isEqualTo(PartitionSpec.unpartitioned()); } @Test void testBranchCreationAndCaching() { Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier tableIdentifier = TableIdentifier.parse("myTable"); - TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE); + TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); TableUpdater tableUpdater = new TableUpdater(cache, catalog); catalog.createTable(tableIdentifier, SCHEMA); @@ -98,12 +96,11 @@ void testBranchCreationAndCaching() { void testSpecCreation() { Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier tableIdentifier = TableIdentifier.parse("myTable"); - TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE); + TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); TableUpdater tableUpdater = new TableUpdater(cache, catalog); PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("data", 10).build(); - Tuple3 result = - tableUpdater.update(tableIdentifier, "main", SCHEMA, spec); + tableUpdater.update(tableIdentifier, "main", SCHEMA, spec); Table table = catalog.loadTable(tableIdentifier); assertThat(table).isNotNull(); @@ -115,14 +112,18 @@ void testInvalidateOldCacheEntryOnUpdate() { Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable"); catalog.createTable(tableIdentifier, SCHEMA); - TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE); + TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); cache.schema(tableIdentifier, SCHEMA); TableUpdater tableUpdater = new TableUpdater(cache, catalog); Schema updated = - tableUpdater.update(tableIdentifier, "main", SCHEMA2, PartitionSpec.unpartitioned()).f0; - assertThat(updated.sameSchema(SCHEMA2)); - assertThat(cache.schema(tableIdentifier, SCHEMA2).f0.sameSchema(SCHEMA2)).isTrue(); + tableUpdater + .update(tableIdentifier, "main", SCHEMA2, PartitionSpec.unpartitioned()) + .f0 + .resolvedTableSchema(); + assertThat(updated.sameSchema(SCHEMA2)).isTrue(); + assertThat(cache.schema(tableIdentifier, SCHEMA2).resolvedTableSchema().sameSchema(SCHEMA2)) + .isTrue(); } @Test @@ -130,7 +131,7 @@ void testLastResultInvalidation() { Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable"); catalog.createTable(tableIdentifier, SCHEMA); - TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE); + TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); TableUpdater tableUpdater = new TableUpdater(cache, catalog); // Initialize cache @@ -141,20 +142,18 @@ void testLastResultInvalidation() { catalog.createTable(tableIdentifier, SCHEMA2); // Cache still stores the old information - assertThat(cache.schema(tableIdentifier, SCHEMA2).f1) + assertThat(cache.schema(tableIdentifier, SCHEMA2).compareResult()) .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); assertThat( - tableUpdater.update(tableIdentifier, "main", SCHEMA2, PartitionSpec.unpartitioned()).f1) + tableUpdater + .update(tableIdentifier, "main", SCHEMA2, PartitionSpec.unpartitioned()) + .f0 + .compareResult()) .isEqualTo(CompareSchemasVisitor.Result.SAME); // Last result cache should be cleared - assertThat( - cache - .getInternalCache() - .getIfPresent(tableIdentifier) - .getSchemaInfo() - .getLastResult(SCHEMA2)) + assertThat(cache.getInternalCache().getIfPresent(tableIdentifier).inputSchemas().get(SCHEMA2)) .isNull(); } } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DataConverter.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DataConverter.java new file mode 100644 index 000000000000..34da5efd940f --- /dev/null +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DataConverter.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.Map; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +/** + * {@link org.apache.iceberg.flink.sink.dynamic.DataConverter} is responsible to change the input + * data to make it compatible with the target schema. This is done when + * + *
    + *
  • The input schema has fewer fields than the target schema. + *
  • The table types are wider than the input type. + *
  • The field order differs for source and target schema. + *
+ * + *

The resolution is as follows: + * + *

    + *
  • In the first case, we would add a null values for the missing field (if the field is + * optional). + *
  • In the second case, we would convert the data for the input field to a wider type, e.g. int + * (input type) => long (table type). + *
  • In the third case, we would rearrange the input data to match the target table. + *
+ */ +interface DataConverter { + Object convert(Object object); + + static DataConverter identity() { + return object -> object; + } + + static DataConverter getNullable(LogicalType sourceType, LogicalType targetType) { + return nullable(get(sourceType, targetType)); + } + + static DataConverter get(LogicalType sourceType, LogicalType targetType) { + switch (targetType.getTypeRoot()) { + case BOOLEAN: + case INTEGER: + case FLOAT: + case VARCHAR: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + case BINARY: + case VARBINARY: + return object -> object; + case DOUBLE: + return object -> { + if (object instanceof Float) { + return ((Float) object).doubleValue(); + } else { + return object; + } + }; + case BIGINT: + return object -> { + if (object instanceof Integer) { + return ((Integer) object).longValue(); + } else { + return object; + } + }; + case DECIMAL: + return object -> { + DecimalType toDecimalType = (DecimalType) targetType; + DecimalData decimalData = (DecimalData) object; + if (((DecimalType) sourceType).getPrecision() == toDecimalType.getPrecision()) { + return object; + } else { + return DecimalData.fromBigDecimal( + decimalData.toBigDecimal(), toDecimalType.getPrecision(), toDecimalType.getScale()); + } + }; + case TIMESTAMP_WITHOUT_TIME_ZONE: + return object -> { + if (object instanceof Integer) { + LocalDateTime dateTime = + LocalDateTime.of(LocalDate.ofEpochDay((Integer) object), LocalTime.MIN); + return TimestampData.fromLocalDateTime(dateTime); + } else { + return object; + } + }; + case ROW: + return new RowDataConverter((RowType) sourceType, (RowType) targetType); + case ARRAY: + return new ArrayConverter((ArrayType) sourceType, (ArrayType) targetType); + case MAP: + return new MapConverter((MapType) sourceType, (MapType) targetType); + default: + throw new UnsupportedOperationException("Not a supported type: " + targetType); + } + } + + static DataConverter nullable(DataConverter converter) { + return value -> value == null ? null : converter.convert(value); + } + + class RowDataConverter implements DataConverter { + private final RowData.FieldGetter[] fieldGetters; + private final DataConverter[] dataConverters; + + RowDataConverter(RowType sourceType, RowType targetType) { + this.fieldGetters = new RowData.FieldGetter[targetType.getFields().size()]; + this.dataConverters = new DataConverter[targetType.getFields().size()]; + + for (int i = 0; i < targetType.getFields().size(); i++) { + RowData.FieldGetter fieldGetter; + DataConverter dataConverter; + RowType.RowField targetField = targetType.getFields().get(i); + int sourceFieldIndex = sourceType.getFieldIndex(targetField.getName()); + if (sourceFieldIndex == -1) { + if (targetField.getType().isNullable()) { + fieldGetter = row -> null; + dataConverter = value -> null; + } else { + throw new IllegalArgumentException( + String.format( + "Field %s in target schema %s is non-nullable but does not exist in source schema.", + i + 1, targetType)); + } + } else { + RowType.RowField sourceField = sourceType.getFields().get(sourceFieldIndex); + fieldGetter = RowData.createFieldGetter(sourceField.getType(), sourceFieldIndex); + dataConverter = DataConverter.getNullable(sourceField.getType(), targetField.getType()); + } + + this.fieldGetters[i] = fieldGetter; + this.dataConverters[i] = dataConverter; + } + } + + @Override + public RowData convert(Object object) { + RowData sourceData = (RowData) object; + GenericRowData targetData = new GenericRowData(fieldGetters.length); + for (int i = 0; i < fieldGetters.length; i++) { + Object value = fieldGetters[i].getFieldOrNull(sourceData); + targetData.setField(i, dataConverters[i].convert(value)); + } + + return targetData; + } + } + + class ArrayConverter implements DataConverter { + private final ArrayData.ElementGetter elementGetter; + private final DataConverter elementConverter; + + ArrayConverter(ArrayType sourceType, ArrayType targetType) { + this.elementGetter = ArrayData.createElementGetter(sourceType.getElementType()); + this.elementConverter = + DataConverter.getNullable(sourceType.getElementType(), targetType.getElementType()); + } + + @Override + public ArrayData convert(Object object) { + ArrayData arrayData = (ArrayData) object; + Object[] convertedArray = new Object[arrayData.size()]; + for (int i = 0; i < convertedArray.length; i++) { + Object element = elementGetter.getElementOrNull(arrayData, i); + convertedArray[i] = elementConverter.convert(element); + } + + return new GenericArrayData(convertedArray); + } + } + + class MapConverter implements DataConverter { + private final ArrayData.ElementGetter keyGetter; + private final ArrayData.ElementGetter valueGetter; + private final DataConverter keyConverter; + private final DataConverter valueConverter; + + MapConverter(MapType sourceType, MapType targetType) { + this.keyGetter = ArrayData.createElementGetter(sourceType.getKeyType()); + this.valueGetter = ArrayData.createElementGetter(sourceType.getValueType()); + this.keyConverter = + DataConverter.getNullable(sourceType.getKeyType(), targetType.getKeyType()); + this.valueConverter = + DataConverter.getNullable(sourceType.getValueType(), targetType.getValueType()); + } + + @Override + public MapData convert(Object object) { + MapData sourceData = (MapData) object; + ArrayData keyArray = sourceData.keyArray(); + ArrayData valueArray = sourceData.valueArray(); + Map convertedMap = Maps.newLinkedHashMap(); + for (int i = 0; i < keyArray.size(); ++i) { + convertedMap.put( + keyConverter.convert(keyGetter.getElementOrNull(keyArray, i)), + valueConverter.convert(valueGetter.getElementOrNull(valueArray, i))); + } + + return new GenericMapData(convertedMap); + } + } +} diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java index 5591cbe4ffc4..9547de78d6ba 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java @@ -188,6 +188,7 @@ public static class Builder { private boolean immediateUpdate = false; private int cacheMaximumSize = 100; private long cacheRefreshMs = 1_000; + private int inputSchemasPerTableCacheMaximumSize = 10; Builder() {} @@ -315,6 +316,16 @@ public Builder cacheRefreshMs(long refreshMs) { return this; } + /** + * Maximum input {@link org.apache.iceberg.Schema} objects to cache per each Iceberg table. The + * cache improves Dynamic Sink performance by storing {@link org.apache.iceberg.Schema} + * comparison results. + */ + public Builder inputSchemasPerTableCacheMaxSize(int inputSchemasPerTableCacheMaxSize) { + this.inputSchemasPerTableCacheMaximumSize = inputSchemasPerTableCacheMaxSize; + return this; + } + private String operatorName(String suffix) { return uidPrefix != null ? uidPrefix + "-" + suffix : suffix; } @@ -358,7 +369,12 @@ public DataStreamSink append() { input .process( new DynamicRecordProcessor<>( - generator, catalogLoader, immediateUpdate, cacheMaximumSize, cacheRefreshMs)) + generator, + catalogLoader, + immediateUpdate, + cacheMaximumSize, + cacheRefreshMs, + inputSchemasPerTableCacheMaximumSize)) .uid(prefixIfNotNull(uidPrefix, "-generator")) .name(operatorName("generator")) .returns(type); @@ -370,7 +386,12 @@ public DataStreamSink append() { DynamicRecordProcessor.DYNAMIC_TABLE_UPDATE_STREAM, new DynamicRecordInternalType(catalogLoader, true, cacheMaximumSize))) .keyBy((KeySelector) DynamicRecordInternal::tableName) - .map(new DynamicTableUpdateOperator(catalogLoader, cacheMaximumSize, cacheRefreshMs)) + .map( + new DynamicTableUpdateOperator( + catalogLoader, + cacheMaximumSize, + cacheRefreshMs, + inputSchemasPerTableCacheMaximumSize)) .uid(prefixIfNotNull(uidPrefix, "-updater")) .name(operatorName("Updater")) .returns(type) diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java index d6c33c95e75b..166217a0140e 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java @@ -22,7 +22,6 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.table.data.RowData; import org.apache.flink.util.Collector; @@ -43,6 +42,7 @@ class DynamicRecordProcessor extends ProcessFunction extends ProcessFunction foundSchema = + TableMetadataCache.ResolvedSchemaInfo foundSchema = exists ? tableCache.schema(data.tableIdentifier(), data.schema()) : TableMetadataCache.NOT_FOUND; @@ -107,16 +111,23 @@ public void collect(DynamicRecord data) { if (!exists || foundBranch == null || foundSpec == null - || foundSchema.f1 == CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED) { + || foundSchema.compareResult() == CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED) { if (immediateUpdate) { - Tuple3 newData = + Tuple2 newData = updater.update(data.tableIdentifier(), data.branch(), data.schema(), data.spec()); - emit(collector, data, newData.f0, newData.f1, newData.f2); + emit( + collector, + data, + newData.f0.resolvedTableSchema(), + newData.f0.recordConverter(), + newData.f1); } else { int writerKey = hashKeyGenerator.generateKey( data, - foundSchema.f0 != null ? foundSchema.f0 : data.schema(), + foundSchema.resolvedTableSchema() != null + ? foundSchema.resolvedTableSchema() + : data.schema(), foundSpec != null ? foundSpec : data.spec(), data.rowData()); context.output( @@ -132,7 +143,12 @@ public void collect(DynamicRecord data) { DynamicSinkUtil.getEqualityFieldIds(data.equalityFields(), data.schema()))); } } else { - emit(collector, data, foundSchema.f0, foundSchema.f1, foundSpec); + emit( + collector, + data, + foundSchema.resolvedTableSchema(), + foundSchema.recordConverter(), + foundSpec); } } @@ -140,12 +156,9 @@ private void emit( Collector out, DynamicRecord data, Schema schema, - CompareSchemasVisitor.Result result, + DataConverter recordConverter, PartitionSpec spec) { - RowData rowData = - result == CompareSchemasVisitor.Result.SAME - ? data.rowData() - : RowDataEvolver.convert(data.rowData(), data.schema(), schema); + RowData rowData = (RowData) recordConverter.convert(data.rowData()); int writerKey = hashKeyGenerator.generateKey(data, schema, spec, rowData); String tableName = data.tableIdentifier().toString(); out.collect( diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java index c37532714d48..6057d773c3f0 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java @@ -21,10 +21,9 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.table.data.RowData; import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.flink.CatalogLoader; @@ -41,13 +40,19 @@ class DynamicTableUpdateOperator private final CatalogLoader catalogLoader; private final int cacheMaximumSize; private final long cacheRefreshMs; + private final int inputSchemasPerTableCacheMaximumSize; + private transient TableUpdater updater; DynamicTableUpdateOperator( - CatalogLoader catalogLoader, int cacheMaximumSize, long cacheRefreshMs) { + CatalogLoader catalogLoader, + int cacheMaximumSize, + long cacheRefreshMs, + int inputSchemasPerTableCacheMaximumSize) { this.catalogLoader = catalogLoader; this.cacheMaximumSize = cacheMaximumSize; this.cacheRefreshMs = cacheRefreshMs; + this.inputSchemasPerTableCacheMaximumSize = inputSchemasPerTableCacheMaximumSize; } @Override @@ -56,22 +61,23 @@ public void open(OpenContext openContext) throws Exception { Catalog catalog = catalogLoader.loadCatalog(); this.updater = new TableUpdater( - new TableMetadataCache(catalog, cacheMaximumSize, cacheRefreshMs), catalog); + new TableMetadataCache( + catalog, cacheMaximumSize, cacheRefreshMs, inputSchemasPerTableCacheMaximumSize), + catalog); } @Override public DynamicRecordInternal map(DynamicRecordInternal data) throws Exception { - Tuple3 newData = + Tuple2 newData = updater.update( TableIdentifier.parse(data.tableName()), data.branch(), data.schema(), data.spec()); + TableMetadataCache.ResolvedSchemaInfo compareInfo = newData.f0; - data.setSchema(newData.f0); - data.setSpec(newData.f2); + data.setSchema(compareInfo.resolvedTableSchema()); + data.setSpec(newData.f1); - if (newData.f1 == CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED) { - RowData newRowData = RowDataEvolver.convert(data.rowData(), data.schema(), newData.f0); - data.setRowData(newRowData); - } + RowData newRowData = (RowData) newData.f0.recordConverter().convert(data.rowData()); + data.setRowData(newRowData); return data; } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/LRUCache.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/LRUCache.java new file mode 100644 index 000000000000..bb1e17405377 --- /dev/null +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/LRUCache.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.function.Consumer; + +/** + * A performant, fixed size least recently used (LRU) cache implementation. + * + *

This cache has O(1) time complexity for get/put operations and provides eviction notifications + * when entries are removed due to size constraints. It offers better performance than similarly + * configured Caffeine caches, making it ideal for hot path operations. + * + *

This implementation extends {@link LinkedHashMap} with access-order traversal and automated + * removal of least recently used entries when the maximum size is reached. + */ +@SuppressWarnings("checkstyle:IllegalType") +class LRUCache extends LinkedHashMap { + /** Defaults from {@link java.util.HashMap} */ + private static final int DEFAULT_INITIAL_CAPACITY = 1 << 4; + + private static final float DEFAULT_LOAD_FACTOR = 0.75f; + + private final int maximumSize; + private final Consumer> evictionCallback; + + LRUCache(int maximumSize, Consumer> evictionCallback) { + super(Math.min(maximumSize, DEFAULT_INITIAL_CAPACITY), DEFAULT_LOAD_FACTOR, true); + this.maximumSize = maximumSize; + this.evictionCallback = evictionCallback; + } + + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + boolean remove = size() > maximumSize; + if (remove) { + evictionCallback.accept(eldest); + } + + return remove; + } +} diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/RowDataEvolver.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/RowDataEvolver.java deleted file mode 100644 index fe670c54ebd2..000000000000 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/RowDataEvolver.java +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.flink.sink.dynamic; - -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.LocalTime; -import java.util.List; -import java.util.Map; -import org.apache.flink.table.data.ArrayData; -import org.apache.flink.table.data.DecimalData; -import org.apache.flink.table.data.GenericArrayData; -import org.apache.flink.table.data.GenericMapData; -import org.apache.flink.table.data.GenericRowData; -import org.apache.flink.table.data.MapData; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.data.TimestampData; -import org.apache.flink.table.types.logical.ArrayType; -import org.apache.flink.table.types.logical.DecimalType; -import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.MapType; -import org.apache.flink.table.types.logical.RowType; -import org.apache.iceberg.Schema; -import org.apache.iceberg.flink.FlinkSchemaUtil; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; - -/** - * A RowDataEvolver is responsible to change the input RowData to make it compatible with the target - * schema. This is done when - * - *

    - *
  1. The input schema has fewer fields than the target schema. - *
  2. The table types are wider than the input type. - *
  3. The field order differs for source and target schema. - *
- * - *

The resolution is as follows: - * - *

    - *
  1. In the first case, we would add a null values for the missing field (if the field is - * optional). - *
  2. In the second case, we would convert the data for the input field to a wider type, e.g. int - * (input type) => long (table type). - *
  3. In the third case, we would rearrange the input data to match the target table. - *
- */ -class RowDataEvolver { - private RowDataEvolver() {} - - public static RowData convert(RowData sourceData, Schema sourceSchema, Schema targetSchema) { - return convertStruct( - sourceData, FlinkSchemaUtil.convert(sourceSchema), FlinkSchemaUtil.convert(targetSchema)); - } - - private static Object convert(Object object, LogicalType sourceType, LogicalType targetType) { - if (object == null) { - return null; - } - - switch (targetType.getTypeRoot()) { - case BOOLEAN: - case INTEGER: - case FLOAT: - case VARCHAR: - case DATE: - case TIME_WITHOUT_TIME_ZONE: - case TIMESTAMP_WITH_LOCAL_TIME_ZONE: - case BINARY: - case VARBINARY: - return object; - case DOUBLE: - if (object instanceof Float) { - return ((Float) object).doubleValue(); - } else { - return object; - } - case BIGINT: - if (object instanceof Integer) { - return ((Integer) object).longValue(); - } else { - return object; - } - case DECIMAL: - DecimalType toDecimalType = (DecimalType) targetType; - DecimalData decimalData = (DecimalData) object; - if (((DecimalType) sourceType).getPrecision() == toDecimalType.getPrecision()) { - return object; - } else { - return DecimalData.fromBigDecimal( - decimalData.toBigDecimal(), toDecimalType.getPrecision(), toDecimalType.getScale()); - } - case TIMESTAMP_WITHOUT_TIME_ZONE: - if (object instanceof Integer) { - LocalDateTime dateTime = - LocalDateTime.of(LocalDate.ofEpochDay((Integer) object), LocalTime.MIN); - return TimestampData.fromLocalDateTime(dateTime); - } else { - return object; - } - case ROW: - return convertStruct((RowData) object, (RowType) sourceType, (RowType) targetType); - case ARRAY: - return convertArray((ArrayData) object, (ArrayType) sourceType, (ArrayType) targetType); - case MAP: - return convertMap((MapData) object, (MapType) sourceType, (MapType) targetType); - default: - throw new UnsupportedOperationException("Not a supported type: " + targetType); - } - } - - private static RowData convertStruct(RowData sourceData, RowType sourceType, RowType targetType) { - GenericRowData targetData = new GenericRowData(targetType.getFields().size()); - List targetFields = targetType.getFields(); - for (int i = 0; i < targetFields.size(); i++) { - RowType.RowField targetField = targetFields.get(i); - - int sourceFieldId = sourceType.getFieldIndex(targetField.getName()); - if (sourceFieldId == -1) { - if (targetField.getType().isNullable()) { - targetData.setField(i, null); - } else { - throw new IllegalArgumentException( - String.format( - "Field %s in target schema %s is non-nullable but does not exist in source schema.", - i + 1, targetType)); - } - } else { - RowData.FieldGetter getter = - RowData.createFieldGetter(sourceType.getTypeAt(sourceFieldId), sourceFieldId); - targetData.setField( - i, - convert( - getter.getFieldOrNull(sourceData), - sourceType.getFields().get(sourceFieldId).getType(), - targetField.getType())); - } - } - - return targetData; - } - - private static ArrayData convertArray( - ArrayData sourceData, ArrayType sourceType, ArrayType targetType) { - LogicalType fromElementType = sourceType.getElementType(); - LogicalType toElementType = targetType.getElementType(); - ArrayData.ElementGetter elementGetter = ArrayData.createElementGetter(fromElementType); - Object[] convertedArray = new Object[sourceData.size()]; - for (int i = 0; i < convertedArray.length; i++) { - convertedArray[i] = - convert(elementGetter.getElementOrNull(sourceData, i), fromElementType, toElementType); - } - - return new GenericArrayData(convertedArray); - } - - private static MapData convertMap(MapData sourceData, MapType sourceType, MapType targetType) { - LogicalType fromMapKeyType = sourceType.getKeyType(); - LogicalType fromMapValueType = sourceType.getValueType(); - LogicalType toMapKeyType = targetType.getKeyType(); - LogicalType toMapValueType = targetType.getValueType(); - ArrayData keyArray = sourceData.keyArray(); - ArrayData valueArray = sourceData.valueArray(); - ArrayData.ElementGetter keyGetter = ArrayData.createElementGetter(fromMapKeyType); - ArrayData.ElementGetter valueGetter = ArrayData.createElementGetter(fromMapValueType); - Map convertedMap = Maps.newLinkedHashMap(); - for (int i = 0; i < keyArray.size(); ++i) { - convertedMap.put( - convert(keyGetter.getElementOrNull(keyArray, i), fromMapKeyType, toMapKeyType), - convert(valueGetter.getElementOrNull(valueArray, i), fromMapValueType, toMapValueType)); - } - - return new GenericMapData(convertedMap); - } -} diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java index e9c77ea8c809..479982bc41ba 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java @@ -20,7 +20,6 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; -import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; import org.apache.flink.annotation.Internal; @@ -32,6 +31,7 @@ import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.flink.FlinkSchemaUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,19 +44,22 @@ class TableMetadataCache { private static final Logger LOG = LoggerFactory.getLogger(TableMetadataCache.class); - private static final int MAX_SCHEMA_COMPARISON_RESULTS_TO_CACHE = 10; private static final Tuple2 EXISTS = Tuple2.of(true, null); private static final Tuple2 NOT_EXISTS = Tuple2.of(false, null); - static final Tuple2 NOT_FOUND = - Tuple2.of(null, CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); + static final ResolvedSchemaInfo NOT_FOUND = + new ResolvedSchemaInfo( + null, CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED, DataConverter.identity()); private final Catalog catalog; private final long refreshMs; + private final int inputSchemasPerTableCacheMaximumSize; private final Cache cache; - TableMetadataCache(Catalog catalog, int maximumSize, long refreshMs) { + TableMetadataCache( + Catalog catalog, int maximumSize, long refreshMs, int inputSchemasPerTableCacheMaximumSize) { this.catalog = catalog; this.refreshMs = refreshMs; + this.inputSchemasPerTableCacheMaximumSize = inputSchemasPerTableCacheMaximumSize; this.cache = Caffeine.newBuilder().maximumSize(maximumSize).build(); } @@ -75,7 +78,7 @@ String branch(TableIdentifier identifier, String branch) { return branch(identifier, branch, true); } - Tuple2 schema(TableIdentifier identifier, Schema input) { + ResolvedSchemaInfo schema(TableIdentifier identifier, Schema input) { return schema(identifier, input, true); } @@ -86,7 +89,12 @@ PartitionSpec spec(TableIdentifier identifier, PartitionSpec spec) { void update(TableIdentifier identifier, Table table) { cache.put( identifier, - new CacheItem(true, table.refs().keySet(), new SchemaInfo(table.schemas()), table.specs())); + new CacheItem( + true, + table.refs().keySet(), + table.schemas(), + table.specs(), + inputSchemasPerTableCacheMaximumSize)); } private String branch(TableIdentifier identifier, String branch, boolean allowRefresh) { @@ -103,7 +111,7 @@ private String branch(TableIdentifier identifier, String branch, boolean allowRe } } - private Tuple2 schema( + private ResolvedSchemaInfo schema( TableIdentifier identifier, Schema input, boolean allowRefresh) { CacheItem cached = cache.getIfPresent(identifier); Schema compatible = null; @@ -112,19 +120,21 @@ private Tuple2 schema( // and a new schema. Performance is paramount as this code is on the hot path. Every other // way for comparing 2 schemas were performing worse than the // {@link CompareByNameVisitor#visit(Schema, Schema, boolean)}, so caching was useless. - Tuple2 lastResult = - cached.schema.lastResults.get(input); + ResolvedSchemaInfo lastResult = cached.inputSchemas.get(input); if (lastResult != null) { return lastResult; } - for (Map.Entry tableSchema : cached.schema.schemas.entrySet()) { + for (Map.Entry tableSchema : cached.tableSchemas.entrySet()) { CompareSchemasVisitor.Result result = CompareSchemasVisitor.visit(input, tableSchema.getValue(), true); if (result == CompareSchemasVisitor.Result.SAME) { - Tuple2 newResult = - Tuple2.of(tableSchema.getValue(), CompareSchemasVisitor.Result.SAME); - cached.schema.update(input, newResult); + ResolvedSchemaInfo newResult = + new ResolvedSchemaInfo( + tableSchema.getValue(), + CompareSchemasVisitor.Result.SAME, + DataConverter.identity()); + cached.inputSchemas.put(input, newResult); return newResult; } else if (compatible == null && result == CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED) { @@ -137,12 +147,16 @@ private Tuple2 schema( refreshTable(identifier); return schema(identifier, input, false); } else if (compatible != null) { - Tuple2 newResult = - Tuple2.of(compatible, CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED); - cached.schema.update(input, newResult); + ResolvedSchemaInfo newResult = + new ResolvedSchemaInfo( + compatible, + CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED, + DataConverter.get( + FlinkSchemaUtil.convert(input), FlinkSchemaUtil.convert(compatible))); + cached.inputSchemas.put(input, newResult); return newResult; } else if (cached != null && cached.tableExists) { - cached.schema.update(input, NOT_FOUND); + cached.inputSchemas.put(input, NOT_FOUND); return NOT_FOUND; } else { return NOT_FOUND; @@ -174,7 +188,7 @@ private Tuple2 refreshTable(TableIdentifier identifier) { return EXISTS; } catch (NoSuchTableException e) { LOG.debug("Table doesn't exist {}", identifier, e); - cache.put(identifier, new CacheItem(false, null, null, null)); + cache.put(identifier, new CacheItem(false, null, null, null, 1)); return Tuple2.of(false, e); } } @@ -194,63 +208,62 @@ static class CacheItem { private final boolean tableExists; private final Set branches; - private final SchemaInfo schema; + private final Map tableSchemas; private final Map specs; + private final LRUCache inputSchemas; private CacheItem( boolean tableExists, Set branches, - SchemaInfo schema, - Map specs) { + Map tableSchemas, + Map specs, + int inputSchemaCacheMaximumSize) { this.tableExists = tableExists; this.branches = branches; - this.schema = schema; + this.tableSchemas = tableSchemas; this.specs = specs; + this.inputSchemas = + new LRUCache<>(inputSchemaCacheMaximumSize, CacheItem::inputSchemaEvictionListener); + } + + private static void inputSchemaEvictionListener( + Map.Entry evictedEntry) { + LOG.warn( + "Performance degraded as records with different schema is generated for the same table. " + + "Likely the DynamicRecord.schema is not reused. " + + "Reuse the same instance if the record schema is the same to improve performance"); } @VisibleForTesting - SchemaInfo getSchemaInfo() { - return schema; + Map inputSchemas() { + return inputSchemas; } } - /** - * Stores precalculated results for {@link CompareSchemasVisitor#visit(Schema, Schema, boolean)} - * in the cache. - */ - static class SchemaInfo { - private final Map schemas; - private final Map> lastResults; + static class ResolvedSchemaInfo { + private final Schema resolvedTableSchema; + private final CompareSchemasVisitor.Result compareResult; + private final DataConverter recordConverter; - private SchemaInfo(Map schemas) { - this.schemas = schemas; - this.lastResults = new LimitedLinkedHashMap<>(); + ResolvedSchemaInfo( + Schema tableSchema, + CompareSchemasVisitor.Result compareResult, + DataConverter recordConverter) { + this.resolvedTableSchema = tableSchema; + this.compareResult = compareResult; + this.recordConverter = recordConverter; } - private void update( - Schema newLastSchema, Tuple2 newLastResult) { - lastResults.put(newLastSchema, newLastResult); + Schema resolvedTableSchema() { + return resolvedTableSchema; } - @VisibleForTesting - Tuple2 getLastResult(Schema schema) { - return lastResults.get(schema); + CompareSchemasVisitor.Result compareResult() { + return compareResult; } - } - - @SuppressWarnings("checkstyle:IllegalType") - private static class LimitedLinkedHashMap extends LinkedHashMap { - @Override - protected boolean removeEldestEntry(Map.Entry eldest) { - boolean remove = size() > MAX_SCHEMA_COMPARISON_RESULTS_TO_CACHE; - if (remove) { - LOG.warn( - "Performance degraded as records with different schema is generated for the same table. " - + "Likely the DynamicRecord.schema is not reused. " - + "Reuse the same instance if the record schema is the same to improve performance"); - } - return remove; + DataConverter recordConverter() { + return recordConverter; } } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java index 40bb66f65125..fdd182830b2c 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java @@ -32,6 +32,7 @@ import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.flink.FlinkSchemaUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,14 +55,14 @@ class TableUpdater { * @return a {@link Tuple3} of the new {@link Schema}, the status of the schema compared to the * requested one, and the new {@link PartitionSpec#specId()}. */ - Tuple3 update( + Tuple2 update( TableIdentifier tableIdentifier, String branch, Schema schema, PartitionSpec spec) { findOrCreateTable(tableIdentifier, schema, spec); findOrCreateBranch(tableIdentifier, branch); - Tuple2 newSchema = + TableMetadataCache.ResolvedSchemaInfo newSchemaInfo = findOrCreateSchema(tableIdentifier, schema); PartitionSpec newSpec = findOrCreateSpec(tableIdentifier, spec); - return Tuple3.of(newSchema.f0, newSchema.f1, newSpec); + return Tuple2.of(newSchemaInfo, newSpec); } private void findOrCreateTable(TableIdentifier identifier, Schema schema, PartitionSpec spec) { @@ -110,10 +111,10 @@ private void findOrCreateBranch(TableIdentifier identifier, String branch) { } } - private Tuple2 findOrCreateSchema( + private TableMetadataCache.ResolvedSchemaInfo findOrCreateSchema( TableIdentifier identifier, Schema schema) { - Tuple2 fromCache = cache.schema(identifier, schema); - if (fromCache.f1 != CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED) { + TableMetadataCache.ResolvedSchemaInfo fromCache = cache.schema(identifier, schema); + if (fromCache.compareResult() != CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED) { return fromCache; } else { Table table = catalog.loadTable(identifier); @@ -121,9 +122,16 @@ private Tuple2 findOrCreateSchema( CompareSchemasVisitor.Result result = CompareSchemasVisitor.visit(schema, tableSchema, true); switch (result) { case SAME: + cache.update(identifier, table); + return new TableMetadataCache.ResolvedSchemaInfo( + tableSchema, result, DataConverter.identity()); case DATA_CONVERSION_NEEDED: cache.update(identifier, table); - return Tuple2.of(tableSchema, result); + return new TableMetadataCache.ResolvedSchemaInfo( + tableSchema, + result, + DataConverter.get( + FlinkSchemaUtil.convert(schema), FlinkSchemaUtil.convert(tableSchema))); case SCHEMA_UPDATE_NEEDED: LOG.info( "Triggering schema update for table {} {} to {}", identifier, tableSchema, schema); @@ -133,16 +141,15 @@ private Tuple2 findOrCreateSchema( try { updateApi.commit(); cache.update(identifier, table); - Tuple2 comparisonAfterMigration = + TableMetadataCache.ResolvedSchemaInfo comparisonAfterMigration = cache.schema(identifier, schema); - Schema newSchema = comparisonAfterMigration.f0; + Schema newSchema = comparisonAfterMigration.resolvedTableSchema(); LOG.info("Table {} schema updated from {} to {}", identifier, tableSchema, newSchema); return comparisonAfterMigration; } catch (CommitFailedException e) { cache.invalidate(identifier); - Tuple2 newSchema = - cache.schema(identifier, schema); - if (newSchema.f1 != CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED) { + TableMetadataCache.ResolvedSchemaInfo newSchema = cache.schema(identifier, schema); + if (newSchema.compareResult() != CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED) { LOG.debug("Table {} schema updated concurrently to {}", identifier, schema); return newSchema; } else { diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java index 22b6476b9c25..01e2c440df67 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java @@ -52,13 +52,17 @@ class TestDynamicTableUpdateOperator { void testDynamicTableUpdateOperatorNewTable() throws Exception { int cacheMaximumSize = 10; int cacheRefreshMs = 1000; + int inputSchemaCacheMaximumSize = 10; Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier table = TableIdentifier.of(TABLE); assertThat(catalog.tableExists(table)).isFalse(); DynamicTableUpdateOperator operator = new DynamicTableUpdateOperator( - CATALOG_EXTENSION.catalogLoader(), cacheMaximumSize, cacheRefreshMs); + CATALOG_EXTENSION.catalogLoader(), + cacheMaximumSize, + cacheRefreshMs, + inputSchemaCacheMaximumSize); operator.open((OpenContext) null); DynamicRecordInternal input = @@ -81,12 +85,16 @@ void testDynamicTableUpdateOperatorNewTable() throws Exception { void testDynamicTableUpdateOperatorSchemaChange() throws Exception { int cacheMaximumSize = 10; int cacheRefreshMs = 1000; + int inputSchemaCacheMaximumSize = 10; Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier table = TableIdentifier.of(TABLE); DynamicTableUpdateOperator operator = new DynamicTableUpdateOperator( - CATALOG_EXTENSION.catalogLoader(), cacheMaximumSize, cacheRefreshMs); + CATALOG_EXTENSION.catalogLoader(), + cacheMaximumSize, + cacheRefreshMs, + inputSchemaCacheMaximumSize); operator.open((OpenContext) null); catalog.createTable(table, SCHEMA1); diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestLRUCache.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestLRUCache.java new file mode 100644 index 000000000000..59a020a496b5 --- /dev/null +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestLRUCache.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.jupiter.api.Test; + +class TestLRUCache { + private static final Consumer> NO_OP_CALLBACK = ignored -> {}; + + @Test + void testPut() { + LRUCache cache = new LRUCache<>(1, NO_OP_CALLBACK); + cache.put(1, 1); + + assertThat(cache.size()).isEqualTo(1); + assertThat(cache).containsExactlyEntriesOf(Map.of(1, 1)); + } + + @Test + void testGet() { + LRUCache cache = new LRUCache<>(1, NO_OP_CALLBACK); + cache.put(1, 123); + + assertThat(cache.size()).isEqualTo(1); + assertThat(cache.get(1)).isEqualTo(123); + } + + @Test + void testElementEviction() { + int maxSize = 2; + LRUCache cache = new LRUCache<>(maxSize, NO_OP_CALLBACK); + + cache.put(1, 1); + cache.put(2, 2); + Integer value = cache.get(1); + assertThat(value).isEqualTo(1); + + cache.put(3, 3); // "2" should be evicted + + assertThat(cache.size()).isEqualTo(2); + assertThat(cache).containsExactly(Map.entry(1, 1), Map.entry(3, 3)); + } + + @Test + void testEvictionCallback() { + int maxSize = 2; + TestEvictionCallback callback = new TestEvictionCallback(); + LRUCache cache = new LRUCache<>(maxSize, callback); + + cache.put(1, 1); + cache.put(2, 2); + Integer value = cache.get(1); + assertThat(value).isEqualTo(1); + + cache.put(3, 3); // "2" should be evicted + + assertThat(callback.evictedEntries).containsExactly(Map.entry(2, 2)); + } + + private static class TestEvictionCallback implements Consumer> { + private final List> evictedEntries = Lists.newArrayList(); + + @Override + public void accept(Map.Entry entry) { + evictedEntries.add(entry); + } + } +} diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestRowDataEvolver.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestRowDataConverter.java similarity index 88% rename from flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestRowDataEvolver.java rename to flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestRowDataConverter.java index 2553575f1893..73f36414f5eb 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestRowDataEvolver.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestRowDataConverter.java @@ -34,6 +34,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.flink.DataGenerator; import org.apache.iceberg.flink.DataGenerators; +import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.SimpleDataUtil; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.types.Types; @@ -42,7 +43,7 @@ import org.joda.time.Days; import org.junit.jupiter.api.Test; -class TestRowDataEvolver { +class TestRowDataConverter { static final Schema SCHEMA = new Schema( @@ -59,7 +60,7 @@ class TestRowDataEvolver { void testPrimitiveTypes() { DataGenerator generator = new DataGenerators.Primitives(); assertThat( - RowDataEvolver.convert( + convert( generator.generateFlinkRowData(), generator.icebergSchema(), generator.icebergSchema())) @@ -68,7 +69,7 @@ void testPrimitiveTypes() { @Test void testAddColumn() { - assertThat(RowDataEvolver.convert(SimpleDataUtil.createRowData(1, "a"), SCHEMA, SCHEMA2)) + assertThat(convert(SimpleDataUtil.createRowData(1, "a"), SCHEMA, SCHEMA2)) .isEqualTo(GenericRowData.of(1, StringData.fromString("a"), null)); } @@ -82,7 +83,7 @@ void testAddRequiredColumn() { assertThrows( IllegalArgumentException.class, - () -> RowDataEvolver.convert(GenericRowData.of(42), currentSchema, targetSchema)); + () -> convert(GenericRowData.of(42), currentSchema, targetSchema)); } @Test @@ -92,9 +93,7 @@ void testIntToLong() { Types.NestedField.optional(2, "id", Types.LongType.get()), Types.NestedField.optional(4, "data", Types.StringType.get())); - assertThat( - RowDataEvolver.convert( - SimpleDataUtil.createRowData(1, "a"), SimpleDataUtil.SCHEMA, schemaWithLong)) + assertThat(convert(SimpleDataUtil.createRowData(1, "a"), SimpleDataUtil.SCHEMA, schemaWithLong)) .isEqualTo(GenericRowData.of(1L, StringData.fromString("a"))); } @@ -105,7 +104,7 @@ void testFloatToDouble() { Schema schemaWithDouble = new Schema(Types.NestedField.optional(2, "float2double", Types.DoubleType.get())); - assertThat(RowDataEvolver.convert(GenericRowData.of(1.5f), schemaWithFloat, schemaWithDouble)) + assertThat(convert(GenericRowData.of(1.5f), schemaWithFloat, schemaWithDouble)) .isEqualTo(GenericRowData.of(1.5d)); } @@ -121,7 +120,7 @@ void testDateToTimestamp() { int days = Days.daysBetween(new DateTime(1970, 1, 1, 0, 0, 0, 0, DateTimeZone.UTC), time).getDays(); - assertThat(RowDataEvolver.convert(GenericRowData.of(days), schemaWithFloat, schemaWithDouble)) + assertThat(convert(GenericRowData.of(days), schemaWithFloat, schemaWithDouble)) .isEqualTo(GenericRowData.of(TimestampData.fromEpochMillis(time.getMillis()))); } @@ -133,7 +132,7 @@ void testIncreasePrecision() { new Schema(Types.NestedField.required(14, "decimal_field", Types.DecimalType.of(10, 2))); assertThat( - RowDataEvolver.convert( + convert( GenericRowData.of(DecimalData.fromBigDecimal(new BigDecimal("-1.50"), 9, 2)), before, after)) @@ -161,7 +160,7 @@ void testStructAddOptionalFields() { StringData.fromString("row_id_value"), GenericRowData.of(1, null, StringData.fromString("Jane"))); - assertThat(RowDataEvolver.convert(oldData, oldSchema, newSchema)).isEqualTo(newData); + assertThat(convert(oldData, oldSchema, newSchema)).isEqualTo(newData); } @Test @@ -188,7 +187,7 @@ void testStructAddRequiredFieldsWithOptionalRoot() { RowData expectedData = GenericRowData.of(StringData.fromString("row_id_value"), null); - assertThat(RowDataEvolver.convert(oldData, oldSchema, newSchema)).isEqualTo(expectedData); + assertThat(convert(oldData, oldSchema, newSchema)).isEqualTo(expectedData); } @Test @@ -208,9 +207,7 @@ void testStructAddRequiredFields() { required(103, "required", Types.StringType.get()), required(102, "name", Types.StringType.get())))); - assertThrows( - IllegalArgumentException.class, - () -> RowDataEvolver.convert(oldData, oldSchema, newSchema)); + assertThrows(IllegalArgumentException.class, () -> convert(oldData, oldSchema, newSchema)); } @Test @@ -233,7 +230,7 @@ void testMap() { ImmutableMap.of( StringData.fromString("Jane"), 1L, StringData.fromString("Joe"), 2L))); - assertThat(RowDataEvolver.convert(oldData, oldSchema, newSchema)).isEqualTo(newData); + assertThat(convert(oldData, oldSchema, newSchema)).isEqualTo(newData); } @Test @@ -251,6 +248,13 @@ void testArray() { GenericRowData.of( StringData.fromString("row_id_value"), new GenericArrayData(new Long[] {1L, 2L, 3L})); - assertThat(RowDataEvolver.convert(oldData, oldSchema, newSchema)).isEqualTo(newData); + assertThat(convert(oldData, oldSchema, newSchema)).isEqualTo(newData); + } + + private static RowData convert(RowData sourceData, Schema sourceSchema, Schema targetSchema) { + return (RowData) + DataConverter.get( + FlinkSchemaUtil.convert(sourceSchema), FlinkSchemaUtil.convert(targetSchema)) + .convert(sourceData); } } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java index cedae887041e..5d222a212fff 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java @@ -47,17 +47,19 @@ void testCaching() { Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable"); catalog.createTable(tableIdentifier, SCHEMA); - TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE); + TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); - Schema schema1 = cache.schema(tableIdentifier, SCHEMA).f0; + Schema schema1 = cache.schema(tableIdentifier, SCHEMA).resolvedTableSchema(); assertThat(schema1.sameSchema(SCHEMA)).isTrue(); - assertThat(cache.schema(tableIdentifier, SerializationUtils.clone(SCHEMA)).f0) + assertThat( + cache.schema(tableIdentifier, SerializationUtils.clone(SCHEMA)).resolvedTableSchema()) .isEqualTo(schema1); assertThat(cache.schema(tableIdentifier, SCHEMA2)).isEqualTo(TableMetadataCache.NOT_FOUND); - schema1 = cache.schema(tableIdentifier, SCHEMA).f0; - assertThat(cache.schema(tableIdentifier, SerializationUtils.clone(SCHEMA)).f0) + schema1 = cache.schema(tableIdentifier, SCHEMA).resolvedTableSchema(); + assertThat( + cache.schema(tableIdentifier, SerializationUtils.clone(SCHEMA)).resolvedTableSchema()) .isEqualTo(schema1); } @@ -66,17 +68,17 @@ void testCacheInvalidationAfterSchemaChange() { Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable"); catalog.createTable(tableIdentifier, SCHEMA); - TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE); + TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); TableUpdater tableUpdater = new TableUpdater(cache, catalog); - Schema schema1 = cache.schema(tableIdentifier, SCHEMA).f0; + Schema schema1 = cache.schema(tableIdentifier, SCHEMA).resolvedTableSchema(); assertThat(schema1.sameSchema(SCHEMA)).isTrue(); catalog.dropTable(tableIdentifier); catalog.createTable(tableIdentifier, SCHEMA2); tableUpdater.update(tableIdentifier, "main", SCHEMA2, PartitionSpec.unpartitioned()); - Schema schema2 = cache.schema(tableIdentifier, SCHEMA2).f0; + Schema schema2 = cache.schema(tableIdentifier, SCHEMA2).resolvedTableSchema(); assertThat(schema2.sameSchema(SCHEMA2)).isTrue(); } @@ -85,7 +87,7 @@ void testCachingDisabled() { Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable"); catalog.createTable(tableIdentifier, SCHEMA); - TableMetadataCache cache = new TableMetadataCache(catalog, 0, Long.MAX_VALUE); + TableMetadataCache cache = new TableMetadataCache(catalog, 0, Long.MAX_VALUE, 10); // Cleanup routine doesn't run after every write cache.getInternalCache().cleanUp(); diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java index f7a3596462f1..9099fd1f3ae6 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java @@ -21,7 +21,6 @@ import static org.assertj.core.api.Assertions.assertThat; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; @@ -48,22 +47,21 @@ public class TestTableUpdater extends TestFlinkIcebergSinkBase { void testTableCreation() { Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier tableIdentifier = TableIdentifier.parse("myTable"); - TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE); + TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); TableUpdater tableUpdater = new TableUpdater(cache, catalog); tableUpdater.update(tableIdentifier, "main", SCHEMA, PartitionSpec.unpartitioned()); assertThat(catalog.tableExists(tableIdentifier)).isTrue(); - Tuple2 cachedSchema = - cache.schema(tableIdentifier, SCHEMA); - assertThat(cachedSchema.f0.sameSchema(SCHEMA)).isTrue(); + TableMetadataCache.ResolvedSchemaInfo cachedSchema = cache.schema(tableIdentifier, SCHEMA); + assertThat(cachedSchema.resolvedTableSchema().sameSchema(SCHEMA)).isTrue(); } @Test void testTableAlreadyExists() { Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier tableIdentifier = TableIdentifier.parse("myTable"); - TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE); + TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); TableUpdater tableUpdater = new TableUpdater(cache, catalog); // Make the table non-existent in cache @@ -71,18 +69,18 @@ void testTableAlreadyExists() { // Create the table catalog.createTable(tableIdentifier, SCHEMA); // Make sure that the cache is invalidated and the table refreshed without an error - Tuple3 result = + Tuple2 result = tableUpdater.update(tableIdentifier, "main", SCHEMA, PartitionSpec.unpartitioned()); - assertThat(result.f0.sameSchema(SCHEMA)).isTrue(); - assertThat(result.f1).isEqualTo(CompareSchemasVisitor.Result.SAME); - assertThat(result.f2).isEqualTo(PartitionSpec.unpartitioned()); + assertThat(result.f0.resolvedTableSchema().sameSchema(SCHEMA)).isTrue(); + assertThat(result.f0.compareResult()).isEqualTo(CompareSchemasVisitor.Result.SAME); + assertThat(result.f1).isEqualTo(PartitionSpec.unpartitioned()); } @Test void testBranchCreationAndCaching() { Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier tableIdentifier = TableIdentifier.parse("myTable"); - TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE); + TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); TableUpdater tableUpdater = new TableUpdater(cache, catalog); catalog.createTable(tableIdentifier, SCHEMA); @@ -98,12 +96,11 @@ void testBranchCreationAndCaching() { void testSpecCreation() { Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier tableIdentifier = TableIdentifier.parse("myTable"); - TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE); + TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); TableUpdater tableUpdater = new TableUpdater(cache, catalog); PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("data", 10).build(); - Tuple3 result = - tableUpdater.update(tableIdentifier, "main", SCHEMA, spec); + tableUpdater.update(tableIdentifier, "main", SCHEMA, spec); Table table = catalog.loadTable(tableIdentifier); assertThat(table).isNotNull(); @@ -115,14 +112,18 @@ void testInvalidateOldCacheEntryOnUpdate() { Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable"); catalog.createTable(tableIdentifier, SCHEMA); - TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE); + TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); cache.schema(tableIdentifier, SCHEMA); TableUpdater tableUpdater = new TableUpdater(cache, catalog); Schema updated = - tableUpdater.update(tableIdentifier, "main", SCHEMA2, PartitionSpec.unpartitioned()).f0; - assertThat(updated.sameSchema(SCHEMA2)); - assertThat(cache.schema(tableIdentifier, SCHEMA2).f0.sameSchema(SCHEMA2)).isTrue(); + tableUpdater + .update(tableIdentifier, "main", SCHEMA2, PartitionSpec.unpartitioned()) + .f0 + .resolvedTableSchema(); + assertThat(updated.sameSchema(SCHEMA2)).isTrue(); + assertThat(cache.schema(tableIdentifier, SCHEMA2).resolvedTableSchema().sameSchema(SCHEMA2)) + .isTrue(); } @Test @@ -130,7 +131,7 @@ void testLastResultInvalidation() { Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable"); catalog.createTable(tableIdentifier, SCHEMA); - TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE); + TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); TableUpdater tableUpdater = new TableUpdater(cache, catalog); // Initialize cache @@ -141,20 +142,18 @@ void testLastResultInvalidation() { catalog.createTable(tableIdentifier, SCHEMA2); // Cache still stores the old information - assertThat(cache.schema(tableIdentifier, SCHEMA2).f1) + assertThat(cache.schema(tableIdentifier, SCHEMA2).compareResult()) .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); assertThat( - tableUpdater.update(tableIdentifier, "main", SCHEMA2, PartitionSpec.unpartitioned()).f1) + tableUpdater + .update(tableIdentifier, "main", SCHEMA2, PartitionSpec.unpartitioned()) + .f0 + .compareResult()) .isEqualTo(CompareSchemasVisitor.Result.SAME); // Last result cache should be cleared - assertThat( - cache - .getInternalCache() - .getIfPresent(tableIdentifier) - .getSchemaInfo() - .getLastResult(SCHEMA2)) + assertThat(cache.getInternalCache().getIfPresent(tableIdentifier).inputSchemas().get(SCHEMA2)) .isNull(); } }