diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java new file mode 100644 index 000000000000..bb0d32f8f644 --- /dev/null +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.Schema; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.schema.SchemaWithPartnerVisitor; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +/** + * Visitor class which compares an input schema to a table schema and emits a compatibility {@link + * Result}. + * + * + * + * The input schema fields are compared to the table schema via their names. + */ +public class CompareSchemasVisitor + extends SchemaWithPartnerVisitor { + + private final Schema tableSchema; + + private CompareSchemasVisitor(Schema tableSchema) { + this.tableSchema = tableSchema; + } + + public static Result visit(Schema dataSchema, Schema tableSchema) { + return visit(dataSchema, tableSchema, true); + } + + public static Result visit(Schema dataSchema, Schema tableSchema, boolean caseSensitive) { + return visit( + dataSchema, + -1, + new CompareSchemasVisitor(tableSchema), + new PartnerIdByNameAccessors(tableSchema, caseSensitive)); + } + + @Override + public Result schema(Schema dataSchema, Integer tableSchemaId, Result downstream) { + if (tableSchemaId == null) { + return Result.SCHEMA_UPDATE_NEEDED; + } + + return downstream; + } + + @Override + public Result struct(Types.StructType struct, Integer tableSchemaId, List fields) { + if (tableSchemaId == null) { + return Result.SCHEMA_UPDATE_NEEDED; + } + + Result result = fields.stream().reduce(Result::merge).orElse(Result.SCHEMA_UPDATE_NEEDED); + + if (result == Result.SCHEMA_UPDATE_NEEDED) { + return Result.SCHEMA_UPDATE_NEEDED; + } + + Type tableSchemaType = + tableSchemaId == -1 ? tableSchema.asStruct() : tableSchema.findField(tableSchemaId).type(); + if (!tableSchemaType.isStructType()) { + return Result.SCHEMA_UPDATE_NEEDED; + } + + if (struct.fields().size() != tableSchemaType.asStructType().fields().size()) { + return Result.DATA_CONVERSION_NEEDED; + } + + for (int i = 0; i < struct.fields().size(); ++i) { + if (!struct + .fields() + .get(i) + .name() + .equals(tableSchemaType.asStructType().fields().get(i).name())) { + return Result.DATA_CONVERSION_NEEDED; + } + } + + return result; + } + + @Override + public Result field(Types.NestedField field, Integer tableSchemaId, Result typeResult) { + if (tableSchemaId == null) { + return Result.SCHEMA_UPDATE_NEEDED; + } + + if (typeResult != Result.SAME) { + return typeResult; + } + + if (tableSchema.findField(tableSchemaId).isRequired() && field.isOptional()) { + return Result.SCHEMA_UPDATE_NEEDED; + } else { + return Result.SAME; + } + } + + @Override + public Result list(Types.ListType list, Integer tableSchemaId, Result elementsResult) { + if (tableSchemaId == null) { + return Result.SCHEMA_UPDATE_NEEDED; + } + + return elementsResult; + } + + @Override + public Result map( + Types.MapType map, Integer tableSchemaId, Result keyResult, Result valueResult) { + if (tableSchemaId == null) { + return Result.SCHEMA_UPDATE_NEEDED; + } + + return keyResult.merge(valueResult); + } + + @Override + @SuppressWarnings("checkstyle:CyclomaticComplexity") + public Result primitive(Type.PrimitiveType primitive, Integer tableSchemaId) { + if (tableSchemaId == null) { + return Result.SCHEMA_UPDATE_NEEDED; + } + + Type tableSchemaType = tableSchema.findField(tableSchemaId).type(); + if (!tableSchemaType.isPrimitiveType()) { + return Result.SCHEMA_UPDATE_NEEDED; + } + + Type.PrimitiveType tableSchemaPrimitiveType = tableSchemaType.asPrimitiveType(); + if (primitive.equals(tableSchemaPrimitiveType)) { + return Result.SAME; + } else if (primitive.equals(Types.IntegerType.get()) + && tableSchemaPrimitiveType.equals(Types.LongType.get())) { + return Result.DATA_CONVERSION_NEEDED; + } else if (primitive.equals(Types.FloatType.get()) + && tableSchemaPrimitiveType.equals(Types.DoubleType.get())) { + return Result.DATA_CONVERSION_NEEDED; + } else if (primitive.equals(Types.DateType.get()) + && tableSchemaPrimitiveType.equals(Types.TimestampType.withoutZone())) { + return Result.DATA_CONVERSION_NEEDED; + } else if (primitive.typeId() == Type.TypeID.DECIMAL + && tableSchemaPrimitiveType.typeId() == Type.TypeID.DECIMAL) { + Types.DecimalType dataType = (Types.DecimalType) primitive; + Types.DecimalType tableType = (Types.DecimalType) tableSchemaPrimitiveType; + return dataType.scale() == tableType.scale() && dataType.precision() < tableType.precision() + ? Result.DATA_CONVERSION_NEEDED + : Result.SCHEMA_UPDATE_NEEDED; + } else { + return Result.SCHEMA_UPDATE_NEEDED; + } + } + + static class PartnerIdByNameAccessors implements PartnerAccessors { + private final Schema tableSchema; + private boolean caseSensitive = true; + + PartnerIdByNameAccessors(Schema tableSchema) { + this.tableSchema = tableSchema; + } + + private PartnerIdByNameAccessors(Schema tableSchema, boolean caseSensitive) { + this(tableSchema); + this.caseSensitive = caseSensitive; + } + + @Override + public Integer fieldPartner(Integer tableSchemaFieldId, int fieldId, String name) { + Types.StructType struct; + if (tableSchemaFieldId == -1) { + struct = tableSchema.asStruct(); + } else { + struct = tableSchema.findField(tableSchemaFieldId).type().asStructType(); + } + + Types.NestedField field = + caseSensitive ? struct.field(name) : struct.caseInsensitiveField(name); + if (field != null) { + return field.fieldId(); + } + + return null; + } + + @Override + public Integer mapKeyPartner(Integer tableSchemaMapId) { + Types.NestedField mapField = tableSchema.findField(tableSchemaMapId); + if (mapField != null) { + return mapField.type().asMapType().fields().get(0).fieldId(); + } + + return null; + } + + @Override + public Integer mapValuePartner(Integer tableSchemaMapId) { + Types.NestedField mapField = tableSchema.findField(tableSchemaMapId); + if (mapField != null) { + return mapField.type().asMapType().fields().get(1).fieldId(); + } + + return null; + } + + @Override + public Integer listElementPartner(Integer tableSchemaListId) { + Types.NestedField listField = tableSchema.findField(tableSchemaListId); + if (listField != null) { + return listField.type().asListType().fields().get(0).fieldId(); + } + + return null; + } + } + + public enum Result { + SAME(0), + DATA_CONVERSION_NEEDED(1), + SCHEMA_UPDATE_NEEDED(2); + + private static final Map BY_ID = Maps.newHashMap(); + + static { + for (Result e : Result.values()) { + if (BY_ID.put(e.id, e) != null) { + throw new IllegalArgumentException("Duplicate id: " + e.id); + } + } + } + + private final int id; + + Result(int id) { + this.id = id; + } + + private Result merge(Result other) { + return BY_ID.get(Math.max(this.id, other.id)); + } + } +} diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/EvolveSchemaVisitor.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/EvolveSchemaVisitor.java new file mode 100644 index 000000000000..85fd5753919e --- /dev/null +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/EvolveSchemaVisitor.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import java.util.List; +import org.apache.iceberg.Schema; +import org.apache.iceberg.UpdateSchema; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.schema.SchemaWithPartnerVisitor; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +/** + * Visitor class that accumulates the set of changes needed to evolve an existing schema into the + * target schema. Changes are applied to an {@link UpdateSchema} operation. + * + *

We support: + * + *

    + *
  • Adding new columns + *
  • Widening the type of existing columsn + *
  • Reordering columns + *
+ * + * We don't support: + * + *
    + *
  • Dropping columns + *
  • Renaming columns + *
+ * + * The reason is that dropping columns would create issues with late / out of order data. Once we + * drop fields, we wouldn't be able to easily add them back later without losing the associated + * data. Renaming columns is not supported because we compare schemas by name, which doesn't allow + * for renaming without additional hints. + */ +public class EvolveSchemaVisitor extends SchemaWithPartnerVisitor { + + private final UpdateSchema api; + private final Schema existingSchema; + private final Schema targetSchema; + + private EvolveSchemaVisitor(UpdateSchema api, Schema existingSchema, Schema targetSchema) { + this.api = api; + this.existingSchema = existingSchema; + this.targetSchema = targetSchema; + } + + /** + * Adds changes needed to produce the target schema to an {@link UpdateSchema} operation. + * + *

Changes are accumulated to evolve the existingSchema into a targetSchema. + * + * @param api an UpdateSchema for adding changes + * @param existingSchema an existing schema + * @param targetSchema a new schema to compare with the existing + */ + public static void visit(UpdateSchema api, Schema existingSchema, Schema targetSchema) { + visit( + targetSchema, + -1, + new EvolveSchemaVisitor(api, existingSchema, targetSchema), + new CompareSchemasVisitor.PartnerIdByNameAccessors(existingSchema)); + } + + @Override + public Boolean struct(Types.StructType struct, Integer partnerId, List existingFields) { + if (partnerId == null) { + return true; + } + + // Add, update and order fields in the struct + Types.StructType partnerStruct = findFieldType(partnerId).asStructType(); + String after = null; + for (Types.NestedField targetField : struct.fields()) { + Types.NestedField nestedField = partnerStruct.field(targetField.name()); + final String columnName; + if (nestedField != null) { + updateColumn(nestedField, targetField); + columnName = this.existingSchema.findColumnName(nestedField.fieldId()); + } else { + addColumn(partnerId, targetField); + columnName = this.targetSchema.findColumnName(targetField.fieldId()); + } + + setPosition(columnName, after); + after = columnName; + } + + // Ensure that unused fields are made optional + for (Types.NestedField existingField : partnerStruct.fields()) { + if (struct.field(existingField.name()) == null) { + if (existingField.isRequired()) { + this.api.makeColumnOptional(this.existingSchema.findColumnName(existingField.fieldId())); + } + } + } + + return false; + } + + @Override + public Boolean field(Types.NestedField field, Integer partnerId, Boolean isFieldMissing) { + return partnerId == null; + } + + @Override + public Boolean list(Types.ListType list, Integer partnerId, Boolean isElementMissing) { + if (partnerId == null) { + return true; + } + + Preconditions.checkState( + !isElementMissing, "Error traversing schemas: element is missing, but list is present"); + + Types.ListType partnerList = findFieldType(partnerId).asListType(); + updateColumn(partnerList.fields().get(0), list.fields().get(0)); + + return false; + } + + @Override + public Boolean map( + Types.MapType map, Integer partnerId, Boolean isKeyMissing, Boolean isValueMissing) { + if (partnerId == null) { + return true; + } + + Preconditions.checkState( + !isKeyMissing, "Error traversing schemas: key is missing, but map is present"); + Preconditions.checkState( + !isValueMissing, "Error traversing schemas: value is missing, but map is present"); + + Types.MapType partnerMap = findFieldType(partnerId).asMapType(); + updateColumn(partnerMap.fields().get(0), map.fields().get(0)); + updateColumn(partnerMap.fields().get(1), map.fields().get(1)); + + return false; + } + + @Override + public Boolean primitive(Type.PrimitiveType primitive, Integer partnerId) { + return partnerId == null; + } + + private Type findFieldType(int fieldId) { + if (fieldId == -1) { + return existingSchema.asStruct(); + } else { + return existingSchema.findField(fieldId).type(); + } + } + + private void addColumn(int parentId, Types.NestedField field) { + String parentName = targetSchema.findColumnName(parentId); + api.addColumn(parentName, field.name(), field.type(), field.doc()); + } + + private void updateColumn(Types.NestedField existingField, Types.NestedField targetField) { + String existingColumnName = this.existingSchema.findColumnName(existingField.fieldId()); + + boolean needsOptionalUpdate = targetField.isOptional() && existingField.isRequired(); + boolean needsTypeUpdate = + targetField.type().isPrimitiveType() && !targetField.type().equals(existingField.type()); + boolean needsDocUpdate = + targetField.doc() != null && !targetField.doc().equals(existingField.doc()); + + if (needsOptionalUpdate) { + api.makeColumnOptional(existingColumnName); + } + + if (needsTypeUpdate) { + api.updateColumn(existingColumnName, targetField.type().asPrimitiveType()); + } + + if (needsDocUpdate) { + api.updateColumnDoc(existingColumnName, targetField.doc()); + } + } + + private void setPosition(String columnName, String after) { + if (after == null) { + this.api.moveFirst(columnName); + } else { + this.api.moveAfter(columnName, after); + } + } +} diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/PartitionSpecEvolution.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/PartitionSpecEvolution.java new file mode 100644 index 000000000000..90b6c7295cb7 --- /dev/null +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/PartitionSpecEvolution.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import java.util.List; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.Term; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** Checks compatibility of PartitionSpecs and evolves one into the other. */ +public class PartitionSpecEvolution { + + private PartitionSpecEvolution() {} + + /** + * Checks whether two PartitionSpecs are compatible with each other. Less strict than {@code + * PartitionSpec#compatible} in the sense that it tolerates differently named partition fields, as + * long as their transforms and field names corresponding to their source ids match. + */ + public static boolean checkCompatibility(PartitionSpec spec1, PartitionSpec spec2) { + if (spec1.equals(spec2)) { + return true; + } + + if (spec1.fields().size() != spec2.fields().size()) { + return false; + } + + for (int i = 0; i < spec1.fields().size(); i++) { + PartitionField field1 = spec1.fields().get(i); + PartitionField field2 = spec2.fields().get(i); + if (!specFieldsAreCompatible(field1, spec1.schema(), field2, spec2.schema())) { + return false; + } + } + + return true; + } + + static PartitionSpecChanges evolve(PartitionSpec currentSpec, PartitionSpec targetSpec) { + if (currentSpec.compatibleWith(targetSpec)) { + return new PartitionSpecChanges(); + } + + PartitionSpecChanges result = new PartitionSpecChanges(); + + int maxNumFields = Math.max(currentSpec.fields().size(), targetSpec.fields().size()); + for (int i = 0; i < maxNumFields; i++) { + PartitionField currentField = Iterables.get(currentSpec.fields(), i, null); + PartitionField targetField = Iterables.get(targetSpec.fields(), i, null); + + if (!specFieldsAreCompatible( + currentField, currentSpec.schema(), targetField, targetSpec.schema())) { + + if (currentField != null) { + result.remove(toTerm(currentField, currentSpec.schema())); + } + + if (targetField != null) { + result.add(toTerm(targetField, targetSpec.schema())); + } + } + } + + return result; + } + + static class PartitionSpecChanges { + private final List termsToAdd = Lists.newArrayList(); + private final List termsToRemove = Lists.newArrayList(); + + public void add(Term term) { + termsToAdd.add(term); + } + + public void remove(Term term) { + termsToRemove.add(term); + } + + public List termsToAdd() { + return termsToAdd; + } + + public List termsToRemove() { + return termsToRemove; + } + + public boolean isEmpty() { + return termsToAdd.isEmpty() && termsToRemove.isEmpty(); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(PartitionSpecEvolution.class) + .add("termsToAdd", termsToAdd) + .add("termsToRemove", termsToRemove) + .toString(); + } + } + + private static Term toTerm(PartitionField field, Schema schema) { + String sourceName = schema.idToName().get(field.sourceId()); + return Expressions.transform(sourceName, field.transform()); + } + + private static boolean specFieldsAreCompatible( + PartitionField field1, Schema schemaField1, PartitionField field2, Schema schemaField2) { + if (field1 == null || field2 == null) { + return false; + } + String firstFieldSourceName = schemaField1.idToName().get(field1.sourceId()); + String secondFieldSourceName = schemaField2.idToName().get(field2.sourceId()); + return firstFieldSourceName.equals(secondFieldSourceName) + && field1.transform().toString().equals(field2.transform().toString()); + } +} diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java new file mode 100644 index 000000000000..e9c77ea8c809 --- /dev/null +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * TableMetadataCache is responsible for caching table metadata to avoid hitting the catalog too + * frequently. We store table identifier, schema, partition spec, and a set of past schema + * comparison results of the active table schema against the last input schemas. + */ +@Internal +class TableMetadataCache { + + private static final Logger LOG = LoggerFactory.getLogger(TableMetadataCache.class); + private static final int MAX_SCHEMA_COMPARISON_RESULTS_TO_CACHE = 10; + private static final Tuple2 EXISTS = Tuple2.of(true, null); + private static final Tuple2 NOT_EXISTS = Tuple2.of(false, null); + static final Tuple2 NOT_FOUND = + Tuple2.of(null, CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); + + private final Catalog catalog; + private final long refreshMs; + private final Cache cache; + + TableMetadataCache(Catalog catalog, int maximumSize, long refreshMs) { + this.catalog = catalog; + this.refreshMs = refreshMs; + this.cache = Caffeine.newBuilder().maximumSize(maximumSize).build(); + } + + Tuple2 exists(TableIdentifier identifier) { + CacheItem cached = cache.getIfPresent(identifier); + if (cached != null && Boolean.TRUE.equals(cached.tableExists)) { + return EXISTS; + } else if (needsRefresh(cached, true)) { + return refreshTable(identifier); + } else { + return NOT_EXISTS; + } + } + + String branch(TableIdentifier identifier, String branch) { + return branch(identifier, branch, true); + } + + Tuple2 schema(TableIdentifier identifier, Schema input) { + return schema(identifier, input, true); + } + + PartitionSpec spec(TableIdentifier identifier, PartitionSpec spec) { + return spec(identifier, spec, true); + } + + void update(TableIdentifier identifier, Table table) { + cache.put( + identifier, + new CacheItem(true, table.refs().keySet(), new SchemaInfo(table.schemas()), table.specs())); + } + + private String branch(TableIdentifier identifier, String branch, boolean allowRefresh) { + CacheItem cached = cache.getIfPresent(identifier); + if (cached != null && cached.tableExists && cached.branches.contains(branch)) { + return branch; + } + + if (needsRefresh(cached, allowRefresh)) { + refreshTable(identifier); + return branch(identifier, branch, false); + } else { + return null; + } + } + + private Tuple2 schema( + TableIdentifier identifier, Schema input, boolean allowRefresh) { + CacheItem cached = cache.getIfPresent(identifier); + Schema compatible = null; + if (cached != null && cached.tableExists) { + // This only works if the {@link Schema#equals(Object)} returns true for the old schema + // and a new schema. Performance is paramount as this code is on the hot path. Every other + // way for comparing 2 schemas were performing worse than the + // {@link CompareByNameVisitor#visit(Schema, Schema, boolean)}, so caching was useless. + Tuple2 lastResult = + cached.schema.lastResults.get(input); + if (lastResult != null) { + return lastResult; + } + + for (Map.Entry tableSchema : cached.schema.schemas.entrySet()) { + CompareSchemasVisitor.Result result = + CompareSchemasVisitor.visit(input, tableSchema.getValue(), true); + if (result == CompareSchemasVisitor.Result.SAME) { + Tuple2 newResult = + Tuple2.of(tableSchema.getValue(), CompareSchemasVisitor.Result.SAME); + cached.schema.update(input, newResult); + return newResult; + } else if (compatible == null + && result == CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED) { + compatible = tableSchema.getValue(); + } + } + } + + if (needsRefresh(cached, allowRefresh)) { + refreshTable(identifier); + return schema(identifier, input, false); + } else if (compatible != null) { + Tuple2 newResult = + Tuple2.of(compatible, CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED); + cached.schema.update(input, newResult); + return newResult; + } else if (cached != null && cached.tableExists) { + cached.schema.update(input, NOT_FOUND); + return NOT_FOUND; + } else { + return NOT_FOUND; + } + } + + private PartitionSpec spec(TableIdentifier identifier, PartitionSpec spec, boolean allowRefresh) { + CacheItem cached = cache.getIfPresent(identifier); + if (cached != null && cached.tableExists) { + for (PartitionSpec tableSpec : cached.specs.values()) { + if (PartitionSpecEvolution.checkCompatibility(tableSpec, spec)) { + return tableSpec; + } + } + } + + if (needsRefresh(cached, allowRefresh)) { + refreshTable(identifier); + return spec(identifier, spec, false); + } else { + return null; + } + } + + private Tuple2 refreshTable(TableIdentifier identifier) { + try { + Table table = catalog.loadTable(identifier); + update(identifier, table); + return EXISTS; + } catch (NoSuchTableException e) { + LOG.debug("Table doesn't exist {}", identifier, e); + cache.put(identifier, new CacheItem(false, null, null, null)); + return Tuple2.of(false, e); + } + } + + private boolean needsRefresh(CacheItem cacheItem, boolean allowRefresh) { + return allowRefresh + && (cacheItem == null || cacheItem.created + refreshMs > System.currentTimeMillis()); + } + + public void invalidate(TableIdentifier identifier) { + cache.invalidate(identifier); + } + + /** Handles timeout for missing items only. Caffeine performance causes noticeable delays. */ + static class CacheItem { + private final long created = System.currentTimeMillis(); + + private final boolean tableExists; + private final Set branches; + private final SchemaInfo schema; + private final Map specs; + + private CacheItem( + boolean tableExists, + Set branches, + SchemaInfo schema, + Map specs) { + this.tableExists = tableExists; + this.branches = branches; + this.schema = schema; + this.specs = specs; + } + + @VisibleForTesting + SchemaInfo getSchemaInfo() { + return schema; + } + } + + /** + * Stores precalculated results for {@link CompareSchemasVisitor#visit(Schema, Schema, boolean)} + * in the cache. + */ + static class SchemaInfo { + private final Map schemas; + private final Map> lastResults; + + private SchemaInfo(Map schemas) { + this.schemas = schemas; + this.lastResults = new LimitedLinkedHashMap<>(); + } + + private void update( + Schema newLastSchema, Tuple2 newLastResult) { + lastResults.put(newLastSchema, newLastResult); + } + + @VisibleForTesting + Tuple2 getLastResult(Schema schema) { + return lastResults.get(schema); + } + } + + @SuppressWarnings("checkstyle:IllegalType") + private static class LimitedLinkedHashMap extends LinkedHashMap { + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + boolean remove = size() > MAX_SCHEMA_COMPARISON_RESULTS_TO_CACHE; + if (remove) { + LOG.warn( + "Performance degraded as records with different schema is generated for the same table. " + + "Likely the DynamicRecord.schema is not reused. " + + "Reuse the same instance if the record schema is the same to improve performance"); + } + + return remove; + } + } + + @VisibleForTesting + Cache getInternalCache() { + return cache; + } +} diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java new file mode 100644 index 000000000000..40bb66f65125 --- /dev/null +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.UpdatePartitionSpec; +import org.apache.iceberg.UpdateSchema; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Updates the Iceberg tables in case of schema, branch, or partition changes. */ +@Internal +class TableUpdater { + + private static final Logger LOG = LoggerFactory.getLogger(TableUpdater.class); + private final TableMetadataCache cache; + private final Catalog catalog; + + TableUpdater(TableMetadataCache cache, Catalog catalog) { + this.cache = cache; + this.catalog = catalog; + } + + /** + * Creates or updates a table to make sure that the given branch, schema, spec exists. + * + * @return a {@link Tuple3} of the new {@link Schema}, the status of the schema compared to the + * requested one, and the new {@link PartitionSpec#specId()}. + */ + Tuple3 update( + TableIdentifier tableIdentifier, String branch, Schema schema, PartitionSpec spec) { + findOrCreateTable(tableIdentifier, schema, spec); + findOrCreateBranch(tableIdentifier, branch); + Tuple2 newSchema = + findOrCreateSchema(tableIdentifier, schema); + PartitionSpec newSpec = findOrCreateSpec(tableIdentifier, spec); + return Tuple3.of(newSchema.f0, newSchema.f1, newSpec); + } + + private void findOrCreateTable(TableIdentifier identifier, Schema schema, PartitionSpec spec) { + Tuple2 exists = cache.exists(identifier); + if (Boolean.FALSE.equals(exists.f0)) { + if (exists.f1 instanceof NoSuchNamespaceException) { + SupportsNamespaces catalogWithNameSpace = (SupportsNamespaces) catalog; + LOG.info("Namespace {} not found during table search. Creating namespace", identifier); + try { + catalogWithNameSpace.createNamespace(identifier.namespace()); + } catch (AlreadyExistsException e) { + LOG.debug("Namespace {} created concurrently", identifier.namespace(), e); + } + } + + LOG.info("Table {} not found during table search. Creating table.", identifier); + try { + Table table = catalog.createTable(identifier, schema, spec); + cache.update(identifier, table); + } catch (AlreadyExistsException e) { + LOG.debug("Table {} created concurrently. Skipping creation.", identifier, e); + cache.invalidate(identifier); + findOrCreateTable(identifier, schema, spec); + } + } + } + + private void findOrCreateBranch(TableIdentifier identifier, String branch) { + String fromCache = cache.branch(identifier, branch); + if (fromCache == null) { + Table table = catalog.loadTable(identifier); + try { + table.manageSnapshots().createBranch(branch).commit(); + LOG.info("Branch {} for {} created", branch, identifier); + } catch (CommitFailedException e) { + table.refresh(); + if (table.refs().containsKey(branch)) { + LOG.debug("Branch {} concurrently created for {}.", branch, identifier); + } else { + LOG.error("Failed to create branch {} for {}.", branch, identifier, e); + throw e; + } + } + + cache.update(identifier, table); + } + } + + private Tuple2 findOrCreateSchema( + TableIdentifier identifier, Schema schema) { + Tuple2 fromCache = cache.schema(identifier, schema); + if (fromCache.f1 != CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED) { + return fromCache; + } else { + Table table = catalog.loadTable(identifier); + Schema tableSchema = table.schema(); + CompareSchemasVisitor.Result result = CompareSchemasVisitor.visit(schema, tableSchema, true); + switch (result) { + case SAME: + case DATA_CONVERSION_NEEDED: + cache.update(identifier, table); + return Tuple2.of(tableSchema, result); + case SCHEMA_UPDATE_NEEDED: + LOG.info( + "Triggering schema update for table {} {} to {}", identifier, tableSchema, schema); + UpdateSchema updateApi = table.updateSchema(); + EvolveSchemaVisitor.visit(updateApi, tableSchema, schema); + + try { + updateApi.commit(); + cache.update(identifier, table); + Tuple2 comparisonAfterMigration = + cache.schema(identifier, schema); + Schema newSchema = comparisonAfterMigration.f0; + LOG.info("Table {} schema updated from {} to {}", identifier, tableSchema, newSchema); + return comparisonAfterMigration; + } catch (CommitFailedException e) { + cache.invalidate(identifier); + Tuple2 newSchema = + cache.schema(identifier, schema); + if (newSchema.f1 != CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED) { + LOG.debug("Table {} schema updated concurrently to {}", identifier, schema); + return newSchema; + } else { + LOG.error( + "Schema update failed for {} from {} to {}", identifier, tableSchema, schema, e); + throw e; + } + } + default: + throw new IllegalArgumentException("Unknown comparison result"); + } + } + } + + private PartitionSpec findOrCreateSpec(TableIdentifier identifier, PartitionSpec targetSpec) { + PartitionSpec currentSpec = cache.spec(identifier, targetSpec); + if (currentSpec != null) { + return currentSpec; + } + + Table table = catalog.loadTable(identifier); + currentSpec = table.spec(); + + PartitionSpecEvolution.PartitionSpecChanges result = + PartitionSpecEvolution.evolve(currentSpec, targetSpec); + if (result.isEmpty()) { + LOG.info("Returning equivalent existing spec {} for {}", currentSpec, targetSpec); + return currentSpec; + } + + LOG.info( + "Spec for table {} has been altered. Updating from {} to {}", + identifier, + currentSpec, + targetSpec); + UpdatePartitionSpec updater = table.updateSpec(); + result.termsToRemove().forEach(updater::removeField); + result.termsToAdd().forEach(updater::addField); + + try { + updater.commit(); + cache.update(identifier, table); + } catch (CommitFailedException e) { + cache.invalidate(identifier); + PartitionSpec newSpec = cache.spec(identifier, targetSpec); + result = PartitionSpecEvolution.evolve(targetSpec, newSpec); + if (result.isEmpty()) { + LOG.debug("Table {} partition spec updated concurrently to {}", identifier, newSpec); + return newSpec; + } else { + LOG.error( + "Partition spec update failed for {} from {} to {}", + identifier, + currentSpec, + targetSpec, + e); + throw e; + } + } + return cache.spec(identifier, targetSpec); + } +} diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java new file mode 100644 index 000000000000..487b0ee7d94a --- /dev/null +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; + +import org.apache.iceberg.Schema; +import org.apache.iceberg.types.Types.IntegerType; +import org.apache.iceberg.types.Types.ListType; +import org.apache.iceberg.types.Types.LongType; +import org.apache.iceberg.types.Types.MapType; +import org.apache.iceberg.types.Types.StringType; +import org.apache.iceberg.types.Types.StructType; +import org.junit.jupiter.api.Test; + +class TestCompareSchemasVisitor { + + @Test + void testSchema() { + assertThat( + CompareSchemasVisitor.visit( + new Schema( + optional(1, "id", IntegerType.get(), "comment"), + optional(2, "data", StringType.get()), + optional(3, "extra", StringType.get())), + new Schema( + optional(1, "id", IntegerType.get(), "comment"), + optional(2, "data", StringType.get()), + optional(3, "extra", StringType.get())))) + .isEqualTo(CompareSchemasVisitor.Result.SAME); + } + + @Test + void testSchemaDifferentId() { + assertThat( + CompareSchemasVisitor.visit( + new Schema( + optional(0, "id", IntegerType.get()), + optional(1, "data", StringType.get()), + optional(2, "extra", StringType.get())), + new Schema( + optional(1, "id", IntegerType.get()), + optional(2, "data", StringType.get()), + optional(3, "extra", StringType.get())))) + .isEqualTo(CompareSchemasVisitor.Result.SAME); + } + + @Test + void testSchemaDifferent() { + assertThat( + CompareSchemasVisitor.visit( + new Schema( + optional(0, "id", IntegerType.get()), + optional(1, "data", StringType.get()), + optional(2, "extra", StringType.get())), + new Schema( + optional(0, "id", IntegerType.get()), optional(1, "data", StringType.get())))) + .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); + } + + @Test + void testSchemaWithMoreColumns() { + assertThat( + CompareSchemasVisitor.visit( + new Schema( + optional(0, "id", IntegerType.get()), optional(1, "data", StringType.get())), + new Schema( + optional(0, "id", IntegerType.get()), + optional(1, "data", StringType.get()), + optional(2, "extra", StringType.get())))) + .isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED); + } + + @Test + void testDifferentType() { + assertThat( + CompareSchemasVisitor.visit( + new Schema( + optional(1, "id", LongType.get()), optional(2, "extra", StringType.get())), + new Schema( + optional(1, "id", IntegerType.get()), optional(2, "extra", StringType.get())))) + .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); + } + + @Test + void testCompatibleType() { + assertThat( + CompareSchemasVisitor.visit( + new Schema( + optional(1, "id", IntegerType.get()), optional(2, "extra", StringType.get())), + new Schema( + optional(1, "id", LongType.get()), optional(2, "extra", StringType.get())))) + .isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED); + } + + @Test + void testWithRequiredChange() { + Schema dataSchema = + new Schema(optional(1, "id", IntegerType.get()), optional(2, "extra", StringType.get())); + Schema tableSchema = + new Schema(required(1, "id", IntegerType.get()), optional(2, "extra", StringType.get())); + assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema)) + .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); + assertThat(CompareSchemasVisitor.visit(tableSchema, dataSchema)) + .isEqualTo(CompareSchemasVisitor.Result.SAME); + } + + @Test + void testStructDifferentId() { + assertThat( + CompareSchemasVisitor.visit( + new Schema( + optional(1, "id", IntegerType.get()), + optional(2, "struct1", StructType.of(optional(3, "extra", IntegerType.get())))), + new Schema( + optional(0, "id", IntegerType.get()), + optional( + 1, "struct1", StructType.of(optional(2, "extra", IntegerType.get())))))) + .isEqualTo(CompareSchemasVisitor.Result.SAME); + } + + @Test + void testStructChanged() { + assertThat( + CompareSchemasVisitor.visit( + new Schema( + optional(0, "id", IntegerType.get()), + optional(1, "struct1", StructType.of(optional(2, "extra", LongType.get())))), + new Schema( + optional(1, "id", IntegerType.get()), + optional( + 2, "struct1", StructType.of(optional(3, "extra", IntegerType.get())))))) + .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); + } + + @Test + void testMapDifferentId() { + assertThat( + CompareSchemasVisitor.visit( + new Schema( + optional(1, "id", IntegerType.get()), + optional( + 2, "map1", MapType.ofOptional(3, 4, IntegerType.get(), StringType.get()))), + new Schema( + optional(0, "id", IntegerType.get()), + optional( + 1, "map1", MapType.ofOptional(2, 3, IntegerType.get(), StringType.get()))))) + .isEqualTo(CompareSchemasVisitor.Result.SAME); + } + + @Test + void testMapChanged() { + assertThat( + CompareSchemasVisitor.visit( + new Schema( + optional(1, "id", IntegerType.get()), + optional( + 2, "map1", MapType.ofOptional(3, 4, LongType.get(), StringType.get()))), + new Schema( + optional(1, "id", IntegerType.get()), + optional( + 2, "map1", MapType.ofOptional(3, 4, IntegerType.get(), StringType.get()))))) + .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); + } + + @Test + void testListDifferentId() { + assertThat( + CompareSchemasVisitor.visit( + new Schema( + optional(1, "id", IntegerType.get()), + optional(2, "list1", ListType.ofOptional(3, IntegerType.get()))), + new Schema( + optional(0, "id", IntegerType.get()), + optional(1, "list1", ListType.ofOptional(2, IntegerType.get()))))) + .isEqualTo(CompareSchemasVisitor.Result.SAME); + } + + @Test + void testListChanged() { + assertThat( + CompareSchemasVisitor.visit( + new Schema( + optional(0, "id", IntegerType.get()), + optional(1, "list1", ListType.ofOptional(2, LongType.get()))), + new Schema( + optional(1, "id", IntegerType.get()), + optional(2, "list1", ListType.ofOptional(3, IntegerType.get()))))) + .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); + } +} diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestEvolveSchemaVisitor.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestEvolveSchemaVisitor.java new file mode 100644 index 000000000000..58ee84529193 --- /dev/null +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestEvolveSchemaVisitor.java @@ -0,0 +1,623 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import static org.apache.iceberg.types.Types.NestedField.of; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.lang.reflect.Constructor; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.iceberg.Schema; +import org.apache.iceberg.UpdateSchema; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Type.PrimitiveType; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.DecimalType; +import org.apache.iceberg.types.Types.DoubleType; +import org.apache.iceberg.types.Types.FloatType; +import org.apache.iceberg.types.Types.IntegerType; +import org.apache.iceberg.types.Types.ListType; +import org.apache.iceberg.types.Types.LongType; +import org.apache.iceberg.types.Types.MapType; +import org.apache.iceberg.types.Types.StringType; +import org.apache.iceberg.types.Types.StructType; +import org.apache.iceberg.types.Types.TimeType; +import org.apache.iceberg.types.Types.UUIDType; +import org.junit.jupiter.api.Test; + +public class TestEvolveSchemaVisitor { + + private static List primitiveTypes() { + return Lists.newArrayList( + StringType.get(), + TimeType.get(), + Types.TimestampType.withoutZone(), + Types.TimestampType.withZone(), + UUIDType.get(), + Types.DateType.get(), + Types.BooleanType.get(), + Types.BinaryType.get(), + DoubleType.get(), + IntegerType.get(), + Types.FixedType.ofLength(10), + DecimalType.of(10, 2), + LongType.get(), + FloatType.get()); + } + + private static Types.NestedField[] primitiveFields( + Integer initialValue, List primitiveTypes) { + return primitiveFields(initialValue, primitiveTypes, true); + } + + private static Types.NestedField[] primitiveFields( + Integer initialValue, List primitiveTypes, boolean optional) { + AtomicInteger atomicInteger = new AtomicInteger(initialValue); + return primitiveTypes.stream() + .map( + type -> + of( + atomicInteger.incrementAndGet(), + optional, + type.toString(), + Types.fromPrimitiveString(type.toString()))) + .toArray(Types.NestedField[]::new); + } + + @Test + public void testAddTopLevelPrimitives() { + Schema targetSchema = new Schema(primitiveFields(0, primitiveTypes())); + UpdateSchema updateApi = loadUpdateApi(new Schema(), 0); + EvolveSchemaVisitor.visit(updateApi, new Schema(), targetSchema); + assertThat(targetSchema.asStruct()).isEqualTo(updateApi.apply().asStruct()); + } + + @Test + public void testMakeTopLevelPrimitivesOptional() { + Schema existingSchema = new Schema(primitiveFields(0, primitiveTypes(), false)); + assertThat(existingSchema.columns().stream().allMatch(Types.NestedField::isRequired)).isTrue(); + + UpdateSchema updateApi = loadUpdateApi(existingSchema, 0); + EvolveSchemaVisitor.visit(updateApi, existingSchema, new Schema()); + Schema newSchema = updateApi.apply(); + assertThat(newSchema.asStruct().fields().size()).isEqualTo(14); + assertThat(newSchema.columns().stream().allMatch(Types.NestedField::isOptional)).isTrue(); + } + + @Test + public void testIdentifyFieldsByName() { + Schema existingSchema = + new Schema(Types.NestedField.optional(42, "myField", Types.LongType.get())); + UpdateSchema updateApi = loadUpdateApi(existingSchema, 0); + Schema newSchema = + new Schema(Arrays.asList(Types.NestedField.optional(-1, "myField", Types.LongType.get()))); + EvolveSchemaVisitor.visit(updateApi, existingSchema, newSchema); + assertThat(updateApi.apply().sameSchema(existingSchema)).isTrue(); + } + + @Test + public void testChangeOrderTopLevelPrimitives() { + Schema existingSchema = + new Schema( + Arrays.asList(optional(1, "a", StringType.get()), optional(2, "b", StringType.get()))); + Schema targetSchema = + new Schema( + Arrays.asList(optional(2, "b", StringType.get()), optional(1, "a", StringType.get()))); + UpdateSchema updateApi = loadUpdateApi(existingSchema, 0); + EvolveSchemaVisitor.visit(updateApi, existingSchema, targetSchema); + assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); + } + + @Test + public void testAddTopLevelListOfPrimitives() { + for (PrimitiveType primitiveType : primitiveTypes()) { + Schema targetSchema = new Schema(optional(1, "aList", ListType.ofOptional(2, primitiveType))); + UpdateSchema updateApi = loadUpdateApi(new Schema(), 0); + EvolveSchemaVisitor.visit(updateApi, new Schema(), targetSchema); + assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); + } + } + + @Test + public void testMakeTopLevelListOfPrimitivesOptional() { + for (PrimitiveType primitiveType : primitiveTypes()) { + Schema existingSchema = + new Schema(optional(1, "aList", ListType.ofRequired(2, primitiveType))); + Schema targetSchema = new Schema(); + UpdateSchema updateApi = loadUpdateApi(existingSchema, 0); + EvolveSchemaVisitor.visit(updateApi, existingSchema, targetSchema); + Schema expectedSchema = + new Schema(optional(1, "aList", ListType.ofRequired(2, primitiveType))); + assertThat(updateApi.apply().asStruct()).isEqualTo(expectedSchema.asStruct()); + } + } + + @Test + public void testAddTopLevelMapOfPrimitives() { + for (PrimitiveType primitiveType : primitiveTypes()) { + Schema targetSchema = + new Schema(optional(1, "aMap", MapType.ofOptional(2, 3, primitiveType, primitiveType))); + UpdateSchema updateApi = loadUpdateApi(new Schema(), 0); + EvolveSchemaVisitor.visit(updateApi, new Schema(), targetSchema); + assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); + } + } + + @Test + public void testAddTopLevelStructOfPrimitives() { + for (PrimitiveType primitiveType : primitiveTypes()) { + Schema currentSchema = + new Schema( + optional(1, "aStruct", StructType.of(optional(2, "primitive", primitiveType)))); + UpdateSchema updateApi = loadUpdateApi(new Schema(), 0); + EvolveSchemaVisitor.visit(updateApi, new Schema(), currentSchema); + assertThat(updateApi.apply().asStruct()).isEqualTo(currentSchema.asStruct()); + } + } + + @Test + public void testAddNestedPrimitive() { + for (PrimitiveType primitiveType : primitiveTypes()) { + Schema currentSchema = new Schema(optional(1, "aStruct", StructType.of())); + Schema targetSchema = + new Schema( + optional(1, "aStruct", StructType.of(optional(2, "primitive", primitiveType)))); + UpdateSchema updateApi = loadUpdateApi(currentSchema, 1); + EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); + assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); + } + } + + @Test + public void testMakeNestedPrimitiveOptional() { + for (PrimitiveType primitiveType : primitiveTypes()) { + Schema currentSchema = + new Schema( + optional(1, "aStruct", StructType.of(required(2, "primitive", primitiveType)))); + Schema targetSchema = + new Schema( + optional(1, "aStruct", StructType.of(optional(2, "primitive", primitiveType)))); + UpdateSchema updateApi = loadUpdateApi(currentSchema, 1); + EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); + assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); + } + } + + @Test + public void testAddNestedPrimitives() { + Schema currentSchema = new Schema(optional(1, "aStruct", StructType.of())); + Schema targetSchema = + new Schema(optional(1, "aStruct", StructType.of(primitiveFields(1, primitiveTypes())))); + UpdateSchema updateApi = loadUpdateApi(currentSchema, 1); + EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); + assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); + } + + @Test + public void testAddNestedLists() { + Schema targetSchema = + new Schema( + optional( + 1, + "aList", + ListType.ofOptional( + 2, + ListType.ofOptional( + 3, + ListType.ofOptional( + 4, + ListType.ofOptional( + 5, + ListType.ofOptional( + 6, + ListType.ofOptional( + 7, + ListType.ofOptional( + 8, + ListType.ofOptional( + 9, + ListType.ofOptional( + 10, DecimalType.of(11, 20)))))))))))); + UpdateSchema updateApi = loadUpdateApi(new Schema(), 0); + EvolveSchemaVisitor.visit(updateApi, new Schema(), targetSchema); + assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); + } + + @Test + public void testAddNestedStruct() { + Schema targetSchema = + new Schema( + optional( + 1, + "struct1", + StructType.of( + optional( + 2, + "struct2", + StructType.of( + optional( + 3, + "struct3", + StructType.of( + optional( + 4, + "struct4", + StructType.of( + optional( + 5, + "struct5", + StructType.of( + optional( + 6, + "struct6", + StructType.of( + optional( + 7, + "aString", + StringType.get())))))))))))))); + UpdateSchema updateApi = loadUpdateApi(new Schema(), 0); + EvolveSchemaVisitor.visit(updateApi, new Schema(), targetSchema); + assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); + } + + @Test + public void testAddNestedMaps() { + Schema targetSchema = + new Schema( + optional( + 1, + "struct", + MapType.ofOptional( + 2, + 3, + StringType.get(), + MapType.ofOptional( + 4, + 5, + StringType.get(), + MapType.ofOptional( + 6, + 7, + StringType.get(), + MapType.ofOptional( + 8, + 9, + StringType.get(), + MapType.ofOptional( + 10, + 11, + StringType.get(), + MapType.ofOptional( + 12, 13, StringType.get(), StringType.get())))))))); + UpdateSchema updateApi = loadUpdateApi(new Schema(), 0); + EvolveSchemaVisitor.visit(updateApi, new Schema(), targetSchema); + assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); + } + + @Test + public void testDetectInvalidTopLevelList() { + Schema currentSchema = + new Schema(optional(1, "aList", ListType.ofOptional(2, StringType.get()))); + Schema targetSchema = new Schema(optional(1, "aList", ListType.ofOptional(2, LongType.get()))); + assertThatThrownBy( + () -> + EvolveSchemaVisitor.visit( + loadUpdateApi(currentSchema, 2), currentSchema, targetSchema)) + .hasMessage("Cannot change column type: aList.element: string -> long") + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + public void testDetectInvalidTopLevelMapValue() { + + Schema currentSchema = + new Schema( + optional(1, "aMap", MapType.ofOptional(2, 3, StringType.get(), StringType.get()))); + Schema targetSchema = + new Schema(optional(1, "aMap", MapType.ofOptional(2, 3, StringType.get(), LongType.get()))); + + assertThatThrownBy( + () -> + EvolveSchemaVisitor.visit( + loadUpdateApi(currentSchema, 3), currentSchema, targetSchema)) + .hasMessage("Cannot change column type: aMap.value: string -> long") + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + public void testDetectInvalidTopLevelMapKey() { + Schema currentSchema = + new Schema( + optional(1, "aMap", MapType.ofOptional(2, 3, StringType.get(), StringType.get()))); + Schema targetSchema = + new Schema(optional(1, "aMap", MapType.ofOptional(2, 3, UUIDType.get(), StringType.get()))); + assertThatThrownBy( + () -> + EvolveSchemaVisitor.visit( + loadUpdateApi(currentSchema, 3), currentSchema, targetSchema)) + .hasMessage("Cannot change column type: aMap.key: string -> uuid") + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + // int 32-bit signed integers -> Can promote to long + public void testTypePromoteIntegerToLong() { + Schema currentSchema = new Schema(required(1, "aCol", IntegerType.get())); + Schema targetSchema = new Schema(required(1, "aCol", LongType.get())); + + UpdateSchema updateApi = loadUpdateApi(currentSchema, 0); + EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); + Schema applied = updateApi.apply(); + assertThat(applied.asStruct().fields().size()).isEqualTo(1); + assertThat(applied.asStruct().fields().get(0).type()).isEqualTo(LongType.get()); + } + + @Test + // float 32-bit IEEE 754 floating point -> Can promote to double + public void testTypePromoteFloatToDouble() { + Schema currentSchema = new Schema(required(1, "aCol", FloatType.get())); + Schema targetSchema = new Schema(required(1, "aCol", DoubleType.get())); + + UpdateSchema updateApi = loadUpdateApi(currentSchema, 0); + EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); + Schema applied = updateApi.apply(); + assertThat(applied.asStruct().fields().size()).isEqualTo(1); + assertThat(applied.asStruct().fields().get(0).type()).isEqualTo(DoubleType.get()); + } + + @Test + public void testInvalidTypePromoteDoubleToFloat() { + Schema currentSchema = new Schema(required(1, "aCol", DoubleType.get())); + Schema targetSchema = new Schema(required(1, "aCol", FloatType.get())); + assertThatThrownBy( + () -> + EvolveSchemaVisitor.visit( + loadUpdateApi(currentSchema, 3), currentSchema, targetSchema)) + .hasMessage("Cannot change column type: aCol: double -> float") + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + // decimal(P,S) Fixed-point decimal; precision P, scale S -> Scale is fixed [1], precision must be + // 38 or less + public void testTypePromoteDecimalToFixedScaleWithWiderPrecision() { + Schema currentSchema = new Schema(required(1, "aCol", DecimalType.of(20, 1))); + Schema targetSchema = new Schema(required(1, "aCol", DecimalType.of(22, 1))); + + UpdateSchema updateApi = loadUpdateApi(currentSchema, 1); + EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); + assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); + } + + @Test + public void testAddPrimitiveToNestedStruct() { + Schema existingSchema = + new Schema( + required( + 1, + "struct1", + StructType.of( + optional( + 2, + "struct2", + StructType.of( + optional( + 3, + "list", + ListType.ofOptional( + 4, + StructType.of(optional(5, "number", IntegerType.get()))))))))); + + Schema targetSchema = + new Schema( + required( + 1, + "struct1", + StructType.of( + optional( + 2, + "struct2", + StructType.of( + optional( + 3, + "list", + ListType.ofOptional( + 4, + StructType.of( + optional(5, "number", LongType.get()), + optional(6, "time", TimeType.get()))))))))); + + UpdateSchema updateApi = loadUpdateApi(existingSchema, 5); + EvolveSchemaVisitor.visit(updateApi, existingSchema, targetSchema); + assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); + } + + @Test + public void testReplaceListWithPrimitive() { + Schema currentSchema = + new Schema(optional(1, "aColumn", ListType.ofOptional(2, StringType.get()))); + Schema targetSchema = new Schema(optional(1, "aColumn", StringType.get())); + assertThatThrownBy( + () -> + EvolveSchemaVisitor.visit( + loadUpdateApi(currentSchema, 3), currentSchema, targetSchema)) + .hasMessage("Cannot change column type: aColumn: list -> string") + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + public void addNewTopLevelStruct() { + Schema currentSchema = + new Schema( + optional( + 1, + "map1", + MapType.ofOptional( + 2, + 3, + StringType.get(), + ListType.ofOptional( + 4, StructType.of(optional(5, "string1", StringType.get())))))); + + Schema targetSchema = + new Schema( + optional( + 1, + "map1", + MapType.ofOptional( + 2, + 3, + StringType.get(), + ListType.ofOptional( + 4, StructType.of(optional(5, "string1", StringType.get()))))), + optional( + 6, + "struct1", + StructType.of( + optional(7, "d1", StructType.of(optional(8, "d2", StringType.get())))))); + + UpdateSchema updateApi = loadUpdateApi(currentSchema, 5); + EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); + assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); + } + + @Test + public void testAppendNestedStruct() { + Schema currentSchema = + new Schema( + required( + 1, + "s1", + StructType.of( + optional( + 2, + "s2", + StructType.of( + optional( + 3, "s3", StructType.of(optional(4, "s4", StringType.get())))))))); + + Schema targetSchema = + new Schema( + required( + 1, + "s1", + StructType.of( + optional( + 2, + "s2", + StructType.of( + optional(3, "s3", StructType.of(optional(4, "s4", StringType.get()))), + optional( + 5, + "repeat", + StructType.of( + optional( + 6, + "s1", + StructType.of( + optional( + 7, + "s2", + StructType.of( + optional( + 8, + "s3", + StructType.of( + optional( + 9, + "s4", + StringType.get())))))))))))))); + + UpdateSchema updateApi = loadUpdateApi(currentSchema, 4); + EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); + assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); + } + + @Test + public void testMakeNestedStructOptional() { + Schema currentSchema = getNestedSchemaWithOptionalModifier(false); + Schema targetSchema = + new Schema( + required( + 1, + "s1", + StructType.of( + optional( + 2, + "s2", + StructType.of( + optional( + 3, "s3", StructType.of(optional(4, "s4", StringType.get())))))))); + UpdateSchema updateApi = loadUpdateApi(currentSchema, 9); + EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); + assertThat(getNestedSchemaWithOptionalModifier(true).asStruct()) + .isEqualTo(updateApi.apply().asStruct()); + } + + private static Schema getNestedSchemaWithOptionalModifier(boolean nestedIsOptional) { + return new Schema( + required( + 1, + "s1", + StructType.of( + optional( + 2, + "s2", + StructType.of( + optional(3, "s3", StructType.of(optional(4, "s4", StringType.get()))), + of( + 5, + nestedIsOptional, + "repeat", + StructType.of( + optional( + 6, + "s1", + StructType.of( + optional( + 7, + "s2", + StructType.of( + optional( + 8, + "s3", + StructType.of( + optional( + 9, "s4", StringType.get())))))))))))))); + } + + private static UpdateSchema loadUpdateApi(Schema schema, int lastColumnId) { + try { + Constructor constructor = + TestEvolveSchemaVisitor.class + .getClassLoader() + .loadClass("org.apache.iceberg.SchemaUpdate") + .getDeclaredConstructor(Schema.class, int.class); + constructor.setAccessible(true); + return (UpdateSchema) constructor.newInstance(schema, lastColumnId); + } catch (Exception e) { + throw new RuntimeException("Failed to instantiate SchemaUpdate class", e); + } + } +} diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestPartitionSpecEvolution.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestPartitionSpecEvolution.java new file mode 100644 index 000000000000..3e7025de6f91 --- /dev/null +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestPartitionSpecEvolution.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; + +public class TestPartitionSpecEvolution { + + @Test + void testCompatible() { + Schema schema = + new Schema( + Types.NestedField.required(0, "id", Types.IntegerType.get()), + Types.NestedField.required(1, "data", Types.StringType.get())); + + PartitionSpec spec1 = PartitionSpec.builderFor(schema).bucket("id", 10).build(); + PartitionSpec spec2 = PartitionSpec.builderFor(schema).bucket("id", 10).build(); + + // Happy case, source ids and names match + assertThat(PartitionSpecEvolution.checkCompatibility(spec1, spec2)).isTrue(); + } + + @Test + void testNotCompatibleDifferentTransform() { + Schema schema = + new Schema( + Types.NestedField.required(0, "id", Types.IntegerType.get()), + Types.NestedField.required(1, "data", Types.StringType.get())); + + PartitionSpec spec1 = PartitionSpec.builderFor(schema).bucket("id", 10).build(); + // Same spec als spec1 but different number of buckets + PartitionSpec spec2 = PartitionSpec.builderFor(schema).bucket("id", 23).build(); + + assertThat(PartitionSpecEvolution.checkCompatibility(spec1, spec2)).isFalse(); + } + + @Test + void testNotCompatibleMoreFields() { + Schema schema = + new Schema( + Types.NestedField.required(0, "id", Types.IntegerType.get()), + Types.NestedField.required(1, "data", Types.StringType.get())); + + PartitionSpec spec1 = PartitionSpec.builderFor(schema).bucket("id", 10).build(); + // Additional field + PartitionSpec spec2 = + PartitionSpec.builderFor(schema).bucket("id", 10).truncate("data", 1).build(); + + assertThat(PartitionSpecEvolution.checkCompatibility(spec1, spec2)).isFalse(); + } + + @Test + void testCompatibleWithNonMatchingSourceIds() { + Schema schema1 = + new Schema( + // Use zero-based field ids + Types.NestedField.required(0, "id", Types.IntegerType.get()), + Types.NestedField.required(1, "data", Types.StringType.get())); + + PartitionSpec spec1 = PartitionSpec.builderFor(schema1).bucket("id", 10).build(); + + Schema schema2 = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())); + + // Same spec als spec1 but bound to a different schema + PartitionSpec spec2 = PartitionSpec.builderFor(schema2).bucket("id", 10).build(); + + // Compatible because the source names match + assertThat(PartitionSpecEvolution.checkCompatibility(spec1, spec2)).isTrue(); + } + + @Test + void testPartitionSpecEvolution() { + Schema schema1 = + new Schema( + Types.NestedField.required(0, "id", Types.IntegerType.get()), + Types.NestedField.required(1, "data", Types.StringType.get())); + + PartitionSpec spec1 = PartitionSpec.builderFor(schema1).bucket("id", 10).build(); + + Schema schema2 = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())); + + // Change num buckets + PartitionSpec spec2 = PartitionSpec.builderFor(schema2).bucket("id", 23).build(); + + assertThat(PartitionSpecEvolution.checkCompatibility(spec1, spec2)).isFalse(); + PartitionSpecEvolution.PartitionSpecChanges result = + PartitionSpecEvolution.evolve(spec1, spec2); + + assertThat(result.termsToAdd().toString()).isEqualTo("[bucket[23](ref(name=\"id\"))]"); + assertThat(result.termsToRemove().toString()).isEqualTo("[bucket[10](ref(name=\"id\"))]"); + } + + @Test + void testPartitionSpecEvolutionAddField() { + Schema schema = + new Schema( + Types.NestedField.required(0, "id", Types.IntegerType.get()), + Types.NestedField.required(1, "data", Types.StringType.get())); + + PartitionSpec spec1 = PartitionSpec.builderFor(schema).build(); + // Add field + PartitionSpec spec2 = PartitionSpec.builderFor(schema).bucket("id", 23).build(); + + assertThat(PartitionSpecEvolution.checkCompatibility(spec1, spec2)).isFalse(); + PartitionSpecEvolution.PartitionSpecChanges result = + PartitionSpecEvolution.evolve(spec1, spec2); + + assertThat(result.termsToAdd().toString()).isEqualTo("[bucket[23](ref(name=\"id\"))]"); + assertThat(result.termsToRemove().toString()).isEqualTo("[]"); + } + + @Test + void testPartitionSpecEvolutionRemoveField() { + Schema schema = + new Schema( + Types.NestedField.required(0, "id", Types.IntegerType.get()), + Types.NestedField.required(1, "data", Types.StringType.get())); + + PartitionSpec spec1 = PartitionSpec.builderFor(schema).bucket("id", 23).build(); + // Remove field + PartitionSpec spec2 = PartitionSpec.builderFor(schema).build(); + + assertThat(PartitionSpecEvolution.checkCompatibility(spec1, spec2)).isFalse(); + PartitionSpecEvolution.PartitionSpecChanges result = + PartitionSpecEvolution.evolve(spec1, spec2); + + assertThat(result.termsToAdd().toString()).isEqualTo("[]"); + assertThat(result.termsToRemove().toString()).isEqualTo("[bucket[23](ref(name=\"id\"))]"); + } + + @Test + void testPartitionSpecEvolutionWithNestedFields() { + Schema schema1 = + new Schema( + Types.NestedField.required(0, "id", Types.IntegerType.get()), + Types.NestedField.required( + 1, + "data", + Types.StructType.of(Types.NestedField.required(2, "str", Types.StringType.get())))); + + PartitionSpec spec1 = PartitionSpec.builderFor(schema1).bucket("data.str", 10).build(); + + Schema schema2 = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required( + 2, + "data", + Types.StructType.of(Types.NestedField.required(3, "str", Types.StringType.get())))); + + // Change num buckets + PartitionSpec spec2 = PartitionSpec.builderFor(schema2).bucket("data.str", 23).build(); + + assertThat(PartitionSpecEvolution.checkCompatibility(spec1, spec2)).isFalse(); + PartitionSpecEvolution.PartitionSpecChanges result = + PartitionSpecEvolution.evolve(spec1, spec2); + + assertThat(result.termsToAdd().toString()).isEqualTo("[bucket[23](ref(name=\"data.str\"))]"); + assertThat(result.termsToRemove().toString()).isEqualTo("[bucket[10](ref(name=\"data.str\"))]"); + } +} diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java new file mode 100644 index 000000000000..cedae887041e --- /dev/null +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.apache.commons.lang3.SerializationUtils; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; + +public class TestTableMetadataCache extends TestFlinkIcebergSinkBase { + + static final Schema SCHEMA = + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get())); + + static final Schema SCHEMA2 = + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get()), + Types.NestedField.optional(3, "extra", Types.StringType.get())); + + @Test + void testCaching() { + Catalog catalog = CATALOG_EXTENSION.catalog(); + TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable"); + catalog.createTable(tableIdentifier, SCHEMA); + TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE); + + Schema schema1 = cache.schema(tableIdentifier, SCHEMA).f0; + assertThat(schema1.sameSchema(SCHEMA)).isTrue(); + assertThat(cache.schema(tableIdentifier, SerializationUtils.clone(SCHEMA)).f0) + .isEqualTo(schema1); + + assertThat(cache.schema(tableIdentifier, SCHEMA2)).isEqualTo(TableMetadataCache.NOT_FOUND); + + schema1 = cache.schema(tableIdentifier, SCHEMA).f0; + assertThat(cache.schema(tableIdentifier, SerializationUtils.clone(SCHEMA)).f0) + .isEqualTo(schema1); + } + + @Test + void testCacheInvalidationAfterSchemaChange() { + Catalog catalog = CATALOG_EXTENSION.catalog(); + TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable"); + catalog.createTable(tableIdentifier, SCHEMA); + TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE); + TableUpdater tableUpdater = new TableUpdater(cache, catalog); + + Schema schema1 = cache.schema(tableIdentifier, SCHEMA).f0; + assertThat(schema1.sameSchema(SCHEMA)).isTrue(); + + catalog.dropTable(tableIdentifier); + catalog.createTable(tableIdentifier, SCHEMA2); + tableUpdater.update(tableIdentifier, "main", SCHEMA2, PartitionSpec.unpartitioned()); + + Schema schema2 = cache.schema(tableIdentifier, SCHEMA2).f0; + assertThat(schema2.sameSchema(SCHEMA2)).isTrue(); + } + + @Test + void testCachingDisabled() { + Catalog catalog = CATALOG_EXTENSION.catalog(); + TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable"); + catalog.createTable(tableIdentifier, SCHEMA); + TableMetadataCache cache = new TableMetadataCache(catalog, 0, Long.MAX_VALUE); + + // Cleanup routine doesn't run after every write + cache.getInternalCache().cleanUp(); + assertThat(cache.getInternalCache().estimatedSize()).isEqualTo(0); + } +} diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java new file mode 100644 index 000000000000..f7a3596462f1 --- /dev/null +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; + +public class TestTableUpdater extends TestFlinkIcebergSinkBase { + + static final Schema SCHEMA = + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get())); + + static final Schema SCHEMA2 = + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get()), + Types.NestedField.optional(3, "extra", Types.StringType.get())); + + @Test + void testTableCreation() { + Catalog catalog = CATALOG_EXTENSION.catalog(); + TableIdentifier tableIdentifier = TableIdentifier.parse("myTable"); + TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE); + TableUpdater tableUpdater = new TableUpdater(cache, catalog); + + tableUpdater.update(tableIdentifier, "main", SCHEMA, PartitionSpec.unpartitioned()); + assertThat(catalog.tableExists(tableIdentifier)).isTrue(); + + Tuple2 cachedSchema = + cache.schema(tableIdentifier, SCHEMA); + assertThat(cachedSchema.f0.sameSchema(SCHEMA)).isTrue(); + } + + @Test + void testTableAlreadyExists() { + Catalog catalog = CATALOG_EXTENSION.catalog(); + TableIdentifier tableIdentifier = TableIdentifier.parse("myTable"); + TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE); + TableUpdater tableUpdater = new TableUpdater(cache, catalog); + + // Make the table non-existent in cache + cache.exists(tableIdentifier); + // Create the table + catalog.createTable(tableIdentifier, SCHEMA); + // Make sure that the cache is invalidated and the table refreshed without an error + Tuple3 result = + tableUpdater.update(tableIdentifier, "main", SCHEMA, PartitionSpec.unpartitioned()); + assertThat(result.f0.sameSchema(SCHEMA)).isTrue(); + assertThat(result.f1).isEqualTo(CompareSchemasVisitor.Result.SAME); + assertThat(result.f2).isEqualTo(PartitionSpec.unpartitioned()); + } + + @Test + void testBranchCreationAndCaching() { + Catalog catalog = CATALOG_EXTENSION.catalog(); + TableIdentifier tableIdentifier = TableIdentifier.parse("myTable"); + TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE); + TableUpdater tableUpdater = new TableUpdater(cache, catalog); + + catalog.createTable(tableIdentifier, SCHEMA); + tableUpdater.update(tableIdentifier, "myBranch", SCHEMA, PartitionSpec.unpartitioned()); + TableMetadataCache.CacheItem cacheItem = cache.getInternalCache().getIfPresent(tableIdentifier); + assertThat(cacheItem).isNotNull(); + + tableUpdater.update(tableIdentifier, "myBranch", SCHEMA, PartitionSpec.unpartitioned()); + assertThat(cache.getInternalCache().getIfPresent(tableIdentifier)).isEqualTo(cacheItem); + } + + @Test + void testSpecCreation() { + Catalog catalog = CATALOG_EXTENSION.catalog(); + TableIdentifier tableIdentifier = TableIdentifier.parse("myTable"); + TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE); + TableUpdater tableUpdater = new TableUpdater(cache, catalog); + + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("data", 10).build(); + Tuple3 result = + tableUpdater.update(tableIdentifier, "main", SCHEMA, spec); + + Table table = catalog.loadTable(tableIdentifier); + assertThat(table).isNotNull(); + assertThat(table.spec()).isEqualTo(spec); + } + + @Test + void testInvalidateOldCacheEntryOnUpdate() { + Catalog catalog = CATALOG_EXTENSION.catalog(); + TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable"); + catalog.createTable(tableIdentifier, SCHEMA); + TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE); + cache.schema(tableIdentifier, SCHEMA); + TableUpdater tableUpdater = new TableUpdater(cache, catalog); + + Schema updated = + tableUpdater.update(tableIdentifier, "main", SCHEMA2, PartitionSpec.unpartitioned()).f0; + assertThat(updated.sameSchema(SCHEMA2)); + assertThat(cache.schema(tableIdentifier, SCHEMA2).f0.sameSchema(SCHEMA2)).isTrue(); + } + + @Test + void testLastResultInvalidation() { + Catalog catalog = CATALOG_EXTENSION.catalog(); + TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable"); + catalog.createTable(tableIdentifier, SCHEMA); + TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE); + TableUpdater tableUpdater = new TableUpdater(cache, catalog); + + // Initialize cache + tableUpdater.update(tableIdentifier, "main", SCHEMA, PartitionSpec.unpartitioned()); + + // Update table behind the scenes + catalog.dropTable(tableIdentifier); + catalog.createTable(tableIdentifier, SCHEMA2); + + // Cache still stores the old information + assertThat(cache.schema(tableIdentifier, SCHEMA2).f1) + .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); + + assertThat( + tableUpdater.update(tableIdentifier, "main", SCHEMA2, PartitionSpec.unpartitioned()).f1) + .isEqualTo(CompareSchemasVisitor.Result.SAME); + + // Last result cache should be cleared + assertThat( + cache + .getInternalCache() + .getIfPresent(tableIdentifier) + .getSchemaInfo() + .getLastResult(SCHEMA2)) + .isNull(); + } +}