From 1804b0ac4ff97c3c943463725e91a1e24b0f8c44 Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Mon, 12 May 2025 13:19:33 +0200 Subject: [PATCH 01/16] Flink: Required Iceberg core changes to rewrite PartitionSpecs and test schema updates --- .../java/org/apache/iceberg/expressions/NamedReference.java | 2 +- .../java/org/apache/iceberg/expressions/UnboundTransform.java | 2 +- core/src/main/java/org/apache/iceberg/SchemaUpdate.java | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/expressions/NamedReference.java b/api/src/main/java/org/apache/iceberg/expressions/NamedReference.java index cc5ba3ceaf4f..a05370f347dc 100644 --- a/api/src/main/java/org/apache/iceberg/expressions/NamedReference.java +++ b/api/src/main/java/org/apache/iceberg/expressions/NamedReference.java @@ -26,7 +26,7 @@ public class NamedReference implements UnboundTerm, Reference { private final String name; - NamedReference(String name) { + public NamedReference(String name) { Preconditions.checkNotNull(name, "Name cannot be null"); this.name = name; } diff --git a/api/src/main/java/org/apache/iceberg/expressions/UnboundTransform.java b/api/src/main/java/org/apache/iceberg/expressions/UnboundTransform.java index cae84733c8d5..05f52cfff31e 100644 --- a/api/src/main/java/org/apache/iceberg/expressions/UnboundTransform.java +++ b/api/src/main/java/org/apache/iceberg/expressions/UnboundTransform.java @@ -26,7 +26,7 @@ public class UnboundTransform implements UnboundTerm, Term { private final NamedReference ref; private final Transform transform; - UnboundTransform(NamedReference ref, Transform transform) { + public UnboundTransform(NamedReference ref, Transform transform) { this.ref = ref; this.transform = transform; } diff --git a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java index 8f2bfe184cab..17a9979f3064 100644 --- a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java +++ b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java @@ -46,7 +46,7 @@ import org.slf4j.LoggerFactory; /** Schema evolution API implementation. */ -class SchemaUpdate implements UpdateSchema { +public class SchemaUpdate implements UpdateSchema { private static final Logger LOG = LoggerFactory.getLogger(SchemaUpdate.class); private static final int TABLE_ROOT_ID = -1; @@ -71,7 +71,7 @@ class SchemaUpdate implements UpdateSchema { } /** For testing only. */ - SchemaUpdate(Schema schema, int lastColumnId) { + public SchemaUpdate(Schema schema, int lastColumnId) { this(null, null, schema, lastColumnId); } From 61fedb11399ee344f9621362521e8a5071e91c05 Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Mon, 12 May 2025 13:19:41 +0200 Subject: [PATCH 02/16] Flink: Add table update code for schema comparison and evolution This adds the classes around schema / spec comparison and evolution. A breakdown of the classes follows: # CompareSchemasVisitor Compares the user-provided schema against the current table schema. # EvolveSchemaVisitor Computes the changes required to the table schema to be compatible with the user-provided schema. # ParititonSpecEvolution Code for checking compatibility with the user-provided PartitionSpec and computing a set of changes to rewrite the PartitionSpec. # TableDataCache Cache which holds all relevant metadata of a table like its name, branch, schema, partition spec. Also holds a cache of past comparison results for a given table's schema and the user-provided input schema. # Table Updater Core logic to compare and create/update a table given a user-provided input schema. Broken out of #12424, depends on #12996. --- .../sink/dynamic/CompareSchemasVisitor.java | 254 ++++++++ .../sink/dynamic/EvolveSchemaVisitor.java | 184 ++++++ .../sink/dynamic/PartitionSpecEvolution.java | 139 ++++ .../flink/sink/dynamic/TableDataCache.java | 258 ++++++++ .../flink/sink/dynamic/TableUpdater.java | 205 ++++++ .../dynamic/TestCompareSchemasVisitor.java | 209 ++++++ .../sink/dynamic/TestEvolveSchemaVisitor.java | 608 ++++++++++++++++++ .../dynamic/TestPartitionSpecEvolution.java | 188 ++++++ .../sink/dynamic/TestTableDataCache.java | 94 +++ .../flink/sink/dynamic/TestTableUpdater.java | 91 +++ 10 files changed, 2230 insertions(+) create mode 100644 flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java create mode 100644 flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/EvolveSchemaVisitor.java create mode 100644 flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/PartitionSpecEvolution.java create mode 100644 flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableDataCache.java create mode 100644 flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java create mode 100644 flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java create mode 100644 flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestEvolveSchemaVisitor.java create mode 100644 flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestPartitionSpecEvolution.java create mode 100644 flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableDataCache.java create mode 100644 flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java new file mode 100644 index 000000000000..99abc95d7102 --- /dev/null +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.Schema; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.schema.SchemaWithPartnerVisitor; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +/** Visitor class which compares two schemas and decides whether they are compatible. */ +public class CompareSchemasVisitor + extends SchemaWithPartnerVisitor { + + private final Schema tableSchema; + + private CompareSchemasVisitor(Schema tableSchema) { + this.tableSchema = tableSchema; + } + + public static Result visit(Schema dataSchema, Schema tableSchema) { + return visit(dataSchema, tableSchema, true); + } + + public static Result visit(Schema dataSchema, Schema tableSchema, boolean caseSensitive) { + return visit( + dataSchema, + -1, + new CompareSchemasVisitor(tableSchema), + new PartnerIdByNameAccessors(tableSchema, caseSensitive)); + } + + @Override + public Result schema(Schema dataSchema, Integer tableSchemaId, Result downstream) { + if (tableSchemaId == null) { + return Result.INCOMPATIBLE; + } + + return downstream; + } + + @Override + public Result struct(Types.StructType struct, Integer tableSchemaId, List fields) { + if (tableSchemaId == null) { + return Result.INCOMPATIBLE; + } + + Result result = fields.stream().reduce(Result::merge).orElse(Result.INCOMPATIBLE); + + if (result == Result.INCOMPATIBLE) { + return Result.INCOMPATIBLE; + } + + Type tableSchemaType = + tableSchemaId == -1 ? tableSchema.asStruct() : tableSchema.findField(tableSchemaId).type(); + if (!tableSchemaType.isStructType()) { + return Result.INCOMPATIBLE; + } + + if (struct.fields().size() != tableSchemaType.asStructType().fields().size()) { + return Result.CONVERSION_NEEDED; + } + + for (int i = 0; i < struct.fields().size(); ++i) { + if (!struct + .fields() + .get(i) + .name() + .equals(tableSchemaType.asStructType().fields().get(i).name())) { + return Result.CONVERSION_NEEDED; + } + } + + return result; + } + + @Override + public Result field(Types.NestedField field, Integer tableSchemaId, Result typeResult) { + if (tableSchemaId == null) { + return Result.INCOMPATIBLE; + } + + if (typeResult != Result.SAME) { + return typeResult; + } + + if (tableSchema.findField(tableSchemaId).isRequired() && field.isOptional()) { + return Result.INCOMPATIBLE; + } else { + return Result.SAME; + } + } + + @Override + public Result list(Types.ListType list, Integer tableSchemaId, Result elementsResult) { + if (tableSchemaId == null) { + return Result.INCOMPATIBLE; + } + + return elementsResult; + } + + @Override + public Result map( + Types.MapType map, Integer tableSchemaId, Result keyResult, Result valueResult) { + if (tableSchemaId == null) { + return Result.INCOMPATIBLE; + } + + return keyResult.merge(valueResult); + } + + @Override + @SuppressWarnings("checkstyle:CyclomaticComplexity") + public Result primitive(Type.PrimitiveType primitive, Integer tableSchemaId) { + if (tableSchemaId == null) { + return Result.INCOMPATIBLE; + } + + Type tableSchemaType = tableSchema.findField(tableSchemaId).type(); + if (!tableSchemaType.isPrimitiveType()) { + return Result.INCOMPATIBLE; + } + + Type.PrimitiveType tableSchemaPrimitiveType = tableSchemaType.asPrimitiveType(); + if (primitive.equals(tableSchemaPrimitiveType)) { + return Result.SAME; + } else if (primitive.equals(Types.IntegerType.get()) + && tableSchemaPrimitiveType.equals(Types.LongType.get())) { + return Result.CONVERSION_NEEDED; + } else if (primitive.equals(Types.FloatType.get()) + && tableSchemaPrimitiveType.equals(Types.DoubleType.get())) { + return Result.CONVERSION_NEEDED; + } else if (primitive.equals(Types.DateType.get()) + && tableSchemaPrimitiveType.equals(Types.TimestampType.withoutZone())) { + return Result.CONVERSION_NEEDED; + } else if (primitive.typeId() == Type.TypeID.DECIMAL + && tableSchemaPrimitiveType.typeId() == Type.TypeID.DECIMAL) { + Types.DecimalType dataType = (Types.DecimalType) primitive; + Types.DecimalType tableType = (Types.DecimalType) tableSchemaPrimitiveType; + return dataType.scale() == tableType.scale() && dataType.precision() < tableType.precision() + ? Result.CONVERSION_NEEDED + : Result.INCOMPATIBLE; + } else { + return Result.INCOMPATIBLE; + } + } + + static class PartnerIdByNameAccessors implements PartnerAccessors { + private final Schema tableSchema; + private boolean caseSensitive = true; + + PartnerIdByNameAccessors(Schema tableSchema) { + this.tableSchema = tableSchema; + } + + private PartnerIdByNameAccessors(Schema tableSchema, boolean caseSensitive) { + this(tableSchema); + this.caseSensitive = caseSensitive; + } + + @Override + public Integer fieldPartner(Integer tableSchemaFieldId, int fieldId, String name) { + Types.StructType struct; + if (tableSchemaFieldId == -1) { + struct = tableSchema.asStruct(); + } else { + struct = tableSchema.findField(tableSchemaFieldId).type().asStructType(); + } + + Types.NestedField field = + caseSensitive ? struct.field(name) : struct.caseInsensitiveField(name); + if (field != null) { + return field.fieldId(); + } + + return null; + } + + @Override + public Integer mapKeyPartner(Integer tableSchemaMapId) { + Types.NestedField mapField = tableSchema.findField(tableSchemaMapId); + if (mapField != null) { + return mapField.type().asMapType().fields().get(0).fieldId(); + } + + return null; + } + + @Override + public Integer mapValuePartner(Integer tableSchemaMapId) { + Types.NestedField mapField = tableSchema.findField(tableSchemaMapId); + if (mapField != null) { + return mapField.type().asMapType().fields().get(1).fieldId(); + } + + return null; + } + + @Override + public Integer listElementPartner(Integer tableSchemaListId) { + Types.NestedField listField = tableSchema.findField(tableSchemaListId); + if (listField != null) { + return listField.type().asListType().fields().get(0).fieldId(); + } + + return null; + } + } + + public enum Result { + SAME(0), + CONVERSION_NEEDED(1), + INCOMPATIBLE(2); + + private static final Map BY_ID = Maps.newHashMap(); + + static { + for (Result e : Result.values()) { + if (BY_ID.put(e.id, e) != null) { + throw new IllegalArgumentException("Duplicate id: " + e.id); + } + } + } + + private final int id; + + Result(int id) { + this.id = id; + } + + private Result merge(Result other) { + return BY_ID.get(Math.max(this.id, other.id)); + } + } +} diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/EvolveSchemaVisitor.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/EvolveSchemaVisitor.java new file mode 100644 index 000000000000..6b020a5bda73 --- /dev/null +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/EvolveSchemaVisitor.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import java.util.List; +import org.apache.iceberg.Schema; +import org.apache.iceberg.UpdateSchema; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.schema.SchemaWithPartnerVisitor; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +/** + * Visitor class that accumulates the set of changes needed to evolve an existing schema into the + * target schema. Changes are applied to an {@link UpdateSchema} operation. + */ +public class EvolveSchemaVisitor extends SchemaWithPartnerVisitor { + + private final UpdateSchema api; + private final Schema existingSchema; + private final Schema targetSchema; + + private EvolveSchemaVisitor(UpdateSchema api, Schema existingSchema, Schema targetSchema) { + this.api = api; + this.existingSchema = existingSchema; + this.targetSchema = targetSchema; + } + + /** + * Adds changes needed to produce the target schema to an {@link UpdateSchema} operation. + * + *

Changes are accumulated to evolve the existingSchema into a targetSchema. + * + * @param api an UpdateSchema for adding changes + * @param existingSchema an existing schema + * @param targetSchema a new schema to compare with the existing + */ + public static void visit(UpdateSchema api, Schema existingSchema, Schema targetSchema) { + visit( + targetSchema, + -1, + new EvolveSchemaVisitor(api, existingSchema, targetSchema), + new CompareSchemasVisitor.PartnerIdByNameAccessors(existingSchema)); + } + + @Override + public Boolean struct(Types.StructType struct, Integer partnerId, List existingFields) { + if (partnerId == null) { + return true; + } + + // Add, update and order fields in the struct + Types.StructType partnerStruct = findFieldType(partnerId).asStructType(); + String after = null; + for (Types.NestedField targetField : struct.fields()) { + Types.NestedField nestedField = partnerStruct.field(targetField.name()); + final String columnName; + if (nestedField != null) { + updateColumn(nestedField, targetField); + columnName = this.existingSchema.findColumnName(nestedField.fieldId()); + } else { + addColumn(partnerId, targetField); + columnName = this.targetSchema.findColumnName(targetField.fieldId()); + } + + setPosition(columnName, after); + after = columnName; + } + + // Ensure that unused fields are made optional + for (Types.NestedField existingField : partnerStruct.fields()) { + if (struct.field(existingField.name()) == null) { + if (existingField.isRequired()) { + this.api.makeColumnOptional(this.existingSchema.findColumnName(existingField.fieldId())); + } + } + } + + return false; + } + + @Override + public Boolean field(Types.NestedField field, Integer partnerId, Boolean isFieldMissing) { + return partnerId == null; + } + + @Override + public Boolean list(Types.ListType list, Integer partnerId, Boolean isElementMissing) { + if (partnerId == null) { + return true; + } + + Preconditions.checkState( + !isElementMissing, "Error traversing schemas: element is missing, but list is present"); + + Types.ListType partnerList = findFieldType(partnerId).asListType(); + updateColumn(partnerList.fields().get(0), list.fields().get(0)); + + return false; + } + + @Override + public Boolean map( + Types.MapType map, Integer partnerId, Boolean isKeyMissing, Boolean isValueMissing) { + if (partnerId == null) { + return true; + } + + Preconditions.checkState( + !isKeyMissing, "Error traversing schemas: key is missing, but map is present"); + Preconditions.checkState( + !isValueMissing, "Error traversing schemas: value is missing, but map is present"); + + Types.MapType partnerMap = findFieldType(partnerId).asMapType(); + updateColumn(partnerMap.fields().get(0), map.fields().get(0)); + updateColumn(partnerMap.fields().get(1), map.fields().get(1)); + + return false; + } + + @Override + public Boolean primitive(Type.PrimitiveType primitive, Integer partnerId) { + return partnerId == null; + } + + private Type findFieldType(int fieldId) { + if (fieldId == -1) { + return existingSchema.asStruct(); + } else { + return existingSchema.findField(fieldId).type(); + } + } + + private void addColumn(int parentId, Types.NestedField field) { + String parentName = targetSchema.findColumnName(parentId); + api.addColumn(parentName, field.name(), field.type(), field.doc()); + } + + private void updateColumn(Types.NestedField existingField, Types.NestedField targetField) { + String existingColumnName = this.existingSchema.findColumnName(existingField.fieldId()); + + boolean needsOptionalUpdate = targetField.isOptional() && existingField.isRequired(); + boolean needsTypeUpdate = + targetField.type().isPrimitiveType() && !targetField.type().equals(existingField.type()); + boolean needsDocUpdate = + targetField.doc() != null && !targetField.doc().equals(existingField.doc()); + + if (needsOptionalUpdate) { + api.makeColumnOptional(existingColumnName); + } + + if (needsTypeUpdate) { + api.updateColumn(existingColumnName, targetField.type().asPrimitiveType()); + } + + if (needsDocUpdate) { + api.updateColumnDoc(existingColumnName, targetField.doc()); + } + } + + private void setPosition(String columnName, String after) { + if (after == null) { + this.api.moveFirst(columnName); + } else { + this.api.moveAfter(columnName, after); + } + } +} diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/PartitionSpecEvolution.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/PartitionSpecEvolution.java new file mode 100644 index 000000000000..760340686a8c --- /dev/null +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/PartitionSpecEvolution.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import java.util.List; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.expressions.NamedReference; +import org.apache.iceberg.expressions.Term; +import org.apache.iceberg.expressions.UnboundTransform; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** Checks compatibility of PartitionSpecs and evolves one into the other. */ +public class PartitionSpecEvolution { + + private PartitionSpecEvolution() {} + + /** + * Checks whether two PartitionSpecs are compatible with each other. Less strict than {@code + * PartitionSpec#compatible} in the sense that it tolerates differently named partition fields, as + * long as their transforms and field names corresponding to their source ids match. + */ + public static boolean checkCompatibility(PartitionSpec spec1, PartitionSpec spec2) { + if (spec1.equals(spec2)) { + return true; + } + + if (spec1.fields().size() != spec2.fields().size()) { + return false; + } + + for (int i = 0; i < spec1.fields().size(); i++) { + PartitionField field1 = spec1.fields().get(i); + PartitionField field2 = spec2.fields().get(i); + if (!specFieldsAreCompatible(field1, spec1.schema(), field2, spec2.schema())) { + return false; + } + } + + return true; + } + + static PartitionSpecChanges evolve(PartitionSpec currentSpec, PartitionSpec targetSpec) { + if (currentSpec.compatibleWith(targetSpec)) { + return new PartitionSpecChanges(); + } + + PartitionSpecChanges result = new PartitionSpecChanges(); + + int maxNumFields = Math.max(currentSpec.fields().size(), targetSpec.fields().size()); + for (int i = 0; i < maxNumFields; i++) { + PartitionField currentField = Iterables.get(currentSpec.fields(), i, null); + PartitionField targetField = Iterables.get(targetSpec.fields(), i, null); + + if (!specFieldsAreCompatible( + currentField, currentSpec.schema(), targetField, targetSpec.schema())) { + + if (currentField != null) { + result.remove(toTerm(currentField, currentSpec.schema())); + } + + if (targetField != null) { + result.add(toTerm(targetField, targetSpec.schema())); + } + } + } + + return result; + } + + static class PartitionSpecChanges { + private final List termsToAdd = Lists.newArrayList(); + private final List termsToRemove = Lists.newArrayList(); + + public void add(Term term) { + termsToAdd.add(term); + } + + public void remove(Term term) { + termsToRemove.add(term); + } + + public List termsToAdd() { + return termsToAdd; + } + + public List termsToRemove() { + return termsToRemove; + } + + public boolean isEmpty() { + return termsToAdd.isEmpty() && termsToRemove.isEmpty(); + } + + @Override + public String toString() { + return "PartitionSpecChanges{" + + "termsToAdd=" + + termsToAdd + + ", termsToRemove=" + + termsToRemove + + '}'; + } + } + + private static Term toTerm(PartitionField field, Schema schema) { + String sourceName = schema.idToName().get(field.sourceId()); + return new UnboundTransform<>(new NamedReference<>(sourceName), field.transform()); + } + + private static boolean specFieldsAreCompatible( + PartitionField field1, Schema schemaField1, PartitionField field2, Schema schemaField2) { + if (field1 == null || field2 == null) { + return false; + } + String firstFieldSourceName = schemaField1.idToName().get(field1.sourceId()); + String secondFieldSourceName = schemaField2.idToName().get(field2.sourceId()); + return firstFieldSourceName.equals(secondFieldSourceName) + && field1.transform().toString().equals(field2.transform().toString()); + } +} diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableDataCache.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableDataCache.java new file mode 100644 index 000000000000..fa494635efff --- /dev/null +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableDataCache.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Internal +class TableDataCache { + + private static final Logger LOG = LoggerFactory.getLogger(TableDataCache.class); + private static final int MAX_SIZE = 10; + private static final Tuple2 EXISTS = Tuple2.of(true, null); + private static final Tuple2 NOT_EXISTS = Tuple2.of(false, null); + static final Tuple2 NOT_FOUND = + Tuple2.of(null, CompareSchemasVisitor.Result.INCOMPATIBLE); + + private final Catalog catalog; + private final long refreshMs; + private final Cache cache; + + TableDataCache(Catalog catalog, int maximumSize, long refreshMs) { + this.catalog = catalog; + this.refreshMs = refreshMs; + this.cache = Caffeine.newBuilder().maximumSize(maximumSize).build(); + } + + Tuple2 exists(TableIdentifier identifier) { + CacheItem cached = cache.getIfPresent(identifier); + if (cached != null && Boolean.TRUE.equals(cached.tableExists)) { + return EXISTS; + } else if (needsRefresh(cached, true)) { + return refreshTable(identifier); + } else { + return NOT_EXISTS; + } + } + + String branch(TableIdentifier identifier, String branch) { + return branch(identifier, branch, true); + } + + Tuple2 schema(TableIdentifier identifier, Schema input) { + return schema(identifier, input, true); + } + + PartitionSpec spec(TableIdentifier identifier, PartitionSpec spec) { + return spec(identifier, spec, true); + } + + void update(TableIdentifier identifier, Table table) { + cache.put( + identifier, + new CacheItem(true, table.refs().keySet(), new SchemaInfo(table.schemas()), table.specs())); + } + + private String branch(TableIdentifier identifier, String branch, boolean allowRefresh) { + CacheItem cached = cache.getIfPresent(identifier); + if (cached != null && cached.tableExists && cached.branches.contains(branch)) { + return branch; + } + + if (needsRefresh(cached, allowRefresh)) { + refreshTable(identifier); + return branch(identifier, branch, false); + } else { + return null; + } + } + + private Tuple2 schema( + TableIdentifier identifier, Schema input, boolean allowRefresh) { + CacheItem cached = cache.getIfPresent(identifier); + Schema compatible = null; + if (cached != null && cached.tableExists) { + // This only works if the {@link Schema#equals(Object)} returns true for the old schema + // and a new schema. Performance is paramount as this code is on the hot path. Every other + // way for comparing 2 schemas were performing worse than the + // {@link CompareByNameVisitor#visit(Schema, Schema, boolean)}, so caching was useless. + Tuple2 lastResult = + cached.schema.lastResults.get(input); + if (lastResult != null) { + return lastResult; + } + + for (Map.Entry tableSchema : cached.schema.schemas.entrySet()) { + CompareSchemasVisitor.Result result = + CompareSchemasVisitor.visit(input, tableSchema.getValue(), true); + if (result == CompareSchemasVisitor.Result.SAME) { + Tuple2 newResult = + Tuple2.of(tableSchema.getValue(), CompareSchemasVisitor.Result.SAME); + cached.schema.update(input, newResult); + return newResult; + } else if (compatible == null && result == CompareSchemasVisitor.Result.CONVERSION_NEEDED) { + compatible = tableSchema.getValue(); + } + } + } + + if (needsRefresh(cached, allowRefresh)) { + refreshTable(identifier); + return schema(identifier, input, false); + } else if (compatible != null) { + Tuple2 newResult = + Tuple2.of(compatible, CompareSchemasVisitor.Result.CONVERSION_NEEDED); + cached.schema.update(input, newResult); + return newResult; + } else if (cached != null && cached.tableExists) { + cached.schema.update(input, NOT_FOUND); + return NOT_FOUND; + } else { + return NOT_FOUND; + } + } + + private PartitionSpec spec(TableIdentifier identifier, PartitionSpec spec, boolean allowRefresh) { + CacheItem cached = cache.getIfPresent(identifier); + if (cached != null && cached.tableExists) { + for (PartitionSpec tableSpec : cached.specs.values()) { + if (PartitionSpecEvolution.checkCompatibility(tableSpec, spec)) { + return tableSpec; + } + } + } + + if (needsRefresh(cached, allowRefresh)) { + refreshTable(identifier); + return spec(identifier, spec, false); + } else { + return null; + } + } + + private Tuple2 refreshTable(TableIdentifier identifier) { + try { + Table table = catalog.loadTable(identifier); + cache.put( + identifier, + new CacheItem( + true, table.refs().keySet(), new SchemaInfo(table.schemas()), table.specs())); + return EXISTS; + } catch (NoSuchTableException e) { + LOG.debug("Table doesn't exist {}", identifier, e); + cache.put(identifier, new CacheItem(false, null, null, null)); + return Tuple2.of(false, e); + } + } + + private boolean needsRefresh(CacheItem cacheItem, boolean allowRefresh) { + return allowRefresh + && (cacheItem == null || cacheItem.created + refreshMs > System.currentTimeMillis()); + } + + public void invalidate(TableIdentifier identifier) { + cache.invalidate(identifier); + } + + /** Handles timeout for missing items only. Caffeine performance causes noticeable delays. */ + static class CacheItem { + private final long created = System.currentTimeMillis(); + + private final boolean tableExists; + private final Set branches; + private final SchemaInfo schema; + private final Map specs; + + private CacheItem( + boolean tableExists, + Set branches, + SchemaInfo schema, + Map specs) { + this.tableExists = tableExists; + this.branches = branches; + this.schema = schema; + this.specs = specs; + } + + @VisibleForTesting + SchemaInfo getSchemaInfo() { + return schema; + } + } + + /** + * Stores precalculated results for {@link CompareSchemasVisitor#visit(Schema, Schema, boolean)} + * in the cache. + */ + static class SchemaInfo { + private final Map schemas; + private final Map> lastResults; + + private SchemaInfo(Map schemas) { + this.schemas = schemas; + this.lastResults = new LimitedLinkedHashMap<>(); + } + + private void update( + Schema newLastSchema, Tuple2 newLastResult) { + lastResults.put(newLastSchema, newLastResult); + } + + @VisibleForTesting + Tuple2 getLastResult(Schema schema) { + return lastResults.get(schema); + } + } + + @SuppressWarnings("checkstyle:IllegalType") + private static class LimitedLinkedHashMap extends LinkedHashMap { + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + boolean remove = size() > MAX_SIZE; + if (remove) { + LOG.warn( + "Performance degraded as records with different schema is generated for the same table. " + + "Likely the DynamicRecord.schema is not reused. " + + "Reuse the same instance if the record schema is the same to improve performance"); + } + + return remove; + } + } + + @VisibleForTesting + Cache getInternalCache() { + return cache; + } +} diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java new file mode 100644 index 000000000000..e95f4be0a2ae --- /dev/null +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.UpdatePartitionSpec; +import org.apache.iceberg.UpdateSchema; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Internal +class TableUpdater { + + private static final Logger LOG = LoggerFactory.getLogger(TableUpdater.class); + private final TableDataCache cache; + private final Catalog catalog; + + TableUpdater(TableDataCache cache, Catalog catalog) { + this.cache = cache; + this.catalog = catalog; + } + + /** + * Creates or updates a table to make sure that the given branch, schema, spec exists. + * + * @return a {@link Tuple3} of the new {@link Schema}, the status of the schema compared to the + * requested one, and the new {@link PartitionSpec#specId()}. + */ + Tuple3 update( + TableIdentifier tableIdentifier, String branch, Schema schema, PartitionSpec spec) { + findOrCreateTable(tableIdentifier, schema, spec); + findOrCreateBranch(tableIdentifier, branch); + Tuple2 newSchema = + findOrCreateSchema(tableIdentifier, schema); + PartitionSpec newSpec = findOrCreateSpec(tableIdentifier, spec); + return Tuple3.of(newSchema.f0, newSchema.f1, newSpec); + } + + private void findOrCreateTable(TableIdentifier identifier, Schema schema, PartitionSpec spec) { + Tuple2 exists = cache.exists(identifier); + if (Boolean.FALSE.equals(exists.f0)) { + if (exists.f1 instanceof NoSuchNamespaceException) { + SupportsNamespaces catalogWithNameSpace = (SupportsNamespaces) catalog; + LOG.info("Namespace {} not found during table search. Creating namespace", identifier); + try { + catalogWithNameSpace.createNamespace(identifier.namespace()); + } catch (AlreadyExistsException e) { + LOG.debug("Namespace {} created concurrently", identifier.namespace(), e); + } + + createTable(identifier, schema, spec); + } else { + LOG.info("Table {} not found during table search. Creating table.", identifier); + createTable(identifier, schema, spec); + } + } + } + + private void createTable(TableIdentifier identifier, Schema schema, PartitionSpec spec) { + try { + Table table = catalog.createTable(identifier, schema, spec); + cache.update(identifier, table); + } catch (AlreadyExistsException e) { + LOG.info("Table {} created concurrently. Skipping creation.", identifier, e); + } + } + + private void findOrCreateBranch(TableIdentifier identifier, String branch) { + String fromCache = cache.branch(identifier, branch); + if (fromCache == null) { + try { + // TODO: Which snapshot should be used to create the branch? + catalog.loadTable(identifier).manageSnapshots().createBranch(branch).commit(); + LOG.info("Branch {} for {} created", branch, identifier); + } catch (Exception e) { + LOG.info( + "Failed to create branch {} for {}. Maybe created concurrently?", + branch, + identifier, + e); + } + } + } + + private Tuple2 findOrCreateSchema( + TableIdentifier identifier, Schema schema) { + Tuple2 fromCache = cache.schema(identifier, schema); + if (fromCache.f1 != CompareSchemasVisitor.Result.INCOMPATIBLE) { + return fromCache; + } else { + Table table = catalog.loadTable(identifier); + Schema tableSchema = table.schema(); + CompareSchemasVisitor.Result result = CompareSchemasVisitor.visit(schema, tableSchema, true); + switch (result) { + case SAME: + case CONVERSION_NEEDED: + cache.update(identifier, table); + return Tuple2.of(tableSchema, result); + case INCOMPATIBLE: + LOG.info( + "Triggering schema update for table {} {} to {}", identifier, tableSchema, schema); + UpdateSchema updateApi = table.updateSchema(); + EvolveSchemaVisitor.visit(updateApi, tableSchema, schema); + + try { + updateApi.commit(); + cache.invalidate(identifier); + Tuple2 comparisonAfterMigration = + cache.schema(identifier, schema); + Schema newSchema = comparisonAfterMigration.f0; + LOG.info("Table {} schema updated from {} to {}", identifier, tableSchema, newSchema); + return comparisonAfterMigration; + } catch (CommitFailedException e) { + LOG.info( + "Schema update failed for {} from {} to {}", identifier, tableSchema, schema, e); + Tuple2 newSchema = + cache.schema(identifier, schema); + if (newSchema.f1 != CompareSchemasVisitor.Result.INCOMPATIBLE) { + LOG.info("Table {} schema updated concurrently to {}", identifier, schema); + return newSchema; + } else { + throw e; + } + } + default: + throw new IllegalArgumentException("Unknown comparison result"); + } + } + } + + private PartitionSpec findOrCreateSpec(TableIdentifier identifier, PartitionSpec targetSpec) { + PartitionSpec currentSpec = cache.spec(identifier, targetSpec); + if (currentSpec != null) { + return currentSpec; + } + + Table table = catalog.loadTable(identifier); + currentSpec = table.spec(); + + PartitionSpecEvolution.PartitionSpecChanges result = + PartitionSpecEvolution.evolve(currentSpec, targetSpec); + if (result.isEmpty()) { + LOG.info("Returning equivalent existing spec {} for {}", currentSpec, targetSpec); + return currentSpec; + } + + LOG.info( + "Spec for table {} has been altered. Updating from {} to {}", + identifier, + currentSpec, + targetSpec); + UpdatePartitionSpec updater = table.updateSpec(); + result.termsToRemove().forEach(updater::removeField); + result.termsToAdd().forEach(updater::addField); + + try { + updater.commit(); + } catch (CommitFailedException e) { + LOG.info( + "Partition spec update failed for {} from {} to {}", + identifier, + currentSpec, + targetSpec, + e); + PartitionSpec newSpec = cache.spec(identifier, targetSpec); + result = PartitionSpecEvolution.evolve(targetSpec, newSpec); + if (result.isEmpty()) { + LOG.info("Table {} partition spec updated concurrently to {}", identifier, newSpec); + return newSpec; + } else { + throw e; + } + } + + cache.invalidate(identifier); + return cache.spec(identifier, targetSpec); + } +} diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java new file mode 100644 index 000000000000..6edebaecce2e --- /dev/null +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; + +import org.apache.iceberg.Schema; +import org.apache.iceberg.types.Types.IntegerType; +import org.apache.iceberg.types.Types.ListType; +import org.apache.iceberg.types.Types.LongType; +import org.apache.iceberg.types.Types.MapType; +import org.apache.iceberg.types.Types.StringType; +import org.apache.iceberg.types.Types.StructType; +import org.junit.jupiter.api.Test; + +class TestCompareSchemasVisitor { + + @Test + void testSchema() { + assertThat( + CompareSchemasVisitor.visit( + new Schema( + optional(1, "id", IntegerType.get(), "comment"), + optional(2, "data", StringType.get()), + optional(3, "extra", StringType.get())), + new Schema( + optional(1, "id", IntegerType.get(), "comment"), + optional(2, "data", StringType.get()), + optional(3, "extra", StringType.get())))) + .isEqualTo(CompareSchemasVisitor.Result.SAME); + } + + @Test + void testSchemaDifferentId() { + assertThat( + CompareSchemasVisitor.visit( + new Schema( + optional(0, "id", IntegerType.get()), + optional(1, "data", StringType.get()), + optional(2, "extra", StringType.get())), + new Schema( + optional(1, "id", IntegerType.get()), + optional(2, "data", StringType.get()), + optional(3, "extra", StringType.get())))) + .isEqualTo(CompareSchemasVisitor.Result.SAME); + } + + @Test + void testSchemaDifferent() { + assertThat( + CompareSchemasVisitor.visit( + new Schema( + optional(0, "id", IntegerType.get()), + optional(1, "data", StringType.get()), + optional(2, "extra", StringType.get())), + new Schema( + optional(0, "id", IntegerType.get()), optional(1, "data", StringType.get())))) + .isEqualTo(CompareSchemasVisitor.Result.INCOMPATIBLE); + } + + @Test + void testSchemaWithMoreColumns() { + assertThat( + CompareSchemasVisitor.visit( + new Schema( + optional(0, "id", IntegerType.get()), optional(1, "data", StringType.get())), + new Schema( + optional(0, "id", IntegerType.get()), + optional(1, "data", StringType.get()), + optional(2, "extra", StringType.get())))) + .isEqualTo(CompareSchemasVisitor.Result.CONVERSION_NEEDED); + } + + @Test + void testDifferentType() { + assertThat( + CompareSchemasVisitor.visit( + new Schema( + optional(1, "id", LongType.get()), optional(2, "extra", StringType.get())), + new Schema( + optional(1, "id", IntegerType.get()), optional(2, "extra", StringType.get())))) + .isEqualTo(CompareSchemasVisitor.Result.INCOMPATIBLE); + } + + @Test + void testCompatibleType() { + assertThat( + CompareSchemasVisitor.visit( + new Schema( + optional(1, "id", IntegerType.get()), optional(2, "extra", StringType.get())), + new Schema( + optional(1, "id", LongType.get()), optional(2, "extra", StringType.get())))) + .isEqualTo(CompareSchemasVisitor.Result.CONVERSION_NEEDED); + } + + @Test + void testWithRequiredChange() { + Schema dataSchema = + new Schema(optional(1, "id", IntegerType.get()), optional(2, "extra", StringType.get())); + Schema tableSchema = + new Schema(required(1, "id", IntegerType.get()), optional(2, "extra", StringType.get())); + assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema)) + .isEqualTo(CompareSchemasVisitor.Result.INCOMPATIBLE); + assertThat(CompareSchemasVisitor.visit(tableSchema, dataSchema)) + .isEqualTo(CompareSchemasVisitor.Result.SAME); + } + + @Test + void testStructDifferentId() { + assertThat( + CompareSchemasVisitor.visit( + new Schema( + optional(1, "id", IntegerType.get()), + optional(2, "struct1", StructType.of(optional(3, "extra", IntegerType.get())))), + new Schema( + optional(0, "id", IntegerType.get()), + optional( + 1, "struct1", StructType.of(optional(2, "extra", IntegerType.get())))))) + .isEqualTo(CompareSchemasVisitor.Result.SAME); + } + + @Test + void testStructChanged() { + assertThat( + CompareSchemasVisitor.visit( + new Schema( + optional(0, "id", IntegerType.get()), + optional(1, "struct1", StructType.of(optional(2, "extra", LongType.get())))), + new Schema( + optional(1, "id", IntegerType.get()), + optional( + 2, "struct1", StructType.of(optional(3, "extra", IntegerType.get())))))) + .isEqualTo(CompareSchemasVisitor.Result.INCOMPATIBLE); + } + + @Test + void testMapDifferentId() { + assertThat( + CompareSchemasVisitor.visit( + new Schema( + optional(1, "id", IntegerType.get()), + optional( + 2, "map1", MapType.ofOptional(3, 4, IntegerType.get(), StringType.get()))), + new Schema( + optional(0, "id", IntegerType.get()), + optional( + 1, "map1", MapType.ofOptional(2, 3, IntegerType.get(), StringType.get()))))) + .isEqualTo(CompareSchemasVisitor.Result.SAME); + } + + @Test + void testMapChanged() { + assertThat( + CompareSchemasVisitor.visit( + new Schema( + optional(1, "id", IntegerType.get()), + optional( + 2, "map1", MapType.ofOptional(3, 4, LongType.get(), StringType.get()))), + new Schema( + optional(1, "id", IntegerType.get()), + optional( + 2, "map1", MapType.ofOptional(3, 4, IntegerType.get(), StringType.get()))))) + .isEqualTo(CompareSchemasVisitor.Result.INCOMPATIBLE); + } + + @Test + void testListDifferentId() { + assertThat( + CompareSchemasVisitor.visit( + new Schema( + optional(1, "id", IntegerType.get()), + optional(2, "list1", ListType.ofOptional(3, IntegerType.get()))), + new Schema( + optional(0, "id", IntegerType.get()), + optional(1, "list1", ListType.ofOptional(2, IntegerType.get()))))) + .isEqualTo(CompareSchemasVisitor.Result.SAME); + } + + @Test + void testListChanged() { + assertThat( + CompareSchemasVisitor.visit( + new Schema( + optional(0, "id", IntegerType.get()), + optional(1, "list1", ListType.ofOptional(2, LongType.get()))), + new Schema( + optional(1, "id", IntegerType.get()), + optional(2, "list1", ListType.ofOptional(3, IntegerType.get()))))) + .isEqualTo(CompareSchemasVisitor.Result.INCOMPATIBLE); + } +} diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestEvolveSchemaVisitor.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestEvolveSchemaVisitor.java new file mode 100644 index 000000000000..d9bcaff182a7 --- /dev/null +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestEvolveSchemaVisitor.java @@ -0,0 +1,608 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import static org.apache.iceberg.types.Types.NestedField.of; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaUpdate; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Type.PrimitiveType; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.DecimalType; +import org.apache.iceberg.types.Types.DoubleType; +import org.apache.iceberg.types.Types.FloatType; +import org.apache.iceberg.types.Types.IntegerType; +import org.apache.iceberg.types.Types.ListType; +import org.apache.iceberg.types.Types.LongType; +import org.apache.iceberg.types.Types.MapType; +import org.apache.iceberg.types.Types.StringType; +import org.apache.iceberg.types.Types.StructType; +import org.apache.iceberg.types.Types.TimeType; +import org.apache.iceberg.types.Types.UUIDType; +import org.junit.jupiter.api.Test; + +public class TestEvolveSchemaVisitor { + + private static List primitiveTypes() { + return Lists.newArrayList( + StringType.get(), + TimeType.get(), + Types.TimestampType.withoutZone(), + Types.TimestampType.withZone(), + UUIDType.get(), + Types.DateType.get(), + Types.BooleanType.get(), + Types.BinaryType.get(), + DoubleType.get(), + IntegerType.get(), + Types.FixedType.ofLength(10), + DecimalType.of(10, 2), + LongType.get(), + FloatType.get()); + } + + private static Types.NestedField[] primitiveFields( + Integer initialValue, List primitiveTypes) { + return primitiveFields(initialValue, primitiveTypes, true); + } + + private static Types.NestedField[] primitiveFields( + Integer initialValue, List primitiveTypes, boolean optional) { + AtomicInteger atomicInteger = new AtomicInteger(initialValue); + return primitiveTypes.stream() + .map( + type -> + of( + atomicInteger.incrementAndGet(), + optional, + type.toString(), + Types.fromPrimitiveString(type.toString()))) + .toArray(Types.NestedField[]::new); + } + + @Test + public void testAddTopLevelPrimitives() { + Schema targetSchema = new Schema(primitiveFields(0, primitiveTypes())); + SchemaUpdate updateApi = new SchemaUpdate(new Schema(), 0); + EvolveSchemaVisitor.visit(updateApi, new Schema(), targetSchema); + assertThat(targetSchema.asStruct()).isEqualTo(updateApi.apply().asStruct()); + } + + @Test + public void testMakeTopLevelPrimitivesOptional() { + Schema existingSchema = new Schema(primitiveFields(0, primitiveTypes(), false)); + assertThat(existingSchema.columns().stream().allMatch(Types.NestedField::isRequired)).isTrue(); + + SchemaUpdate updateApi = new SchemaUpdate(existingSchema, 0); + EvolveSchemaVisitor.visit(updateApi, existingSchema, new Schema()); + Schema newSchema = updateApi.apply(); + assertThat(newSchema.asStruct().fields().size()).isEqualTo(14); + assertThat(newSchema.columns().stream().allMatch(Types.NestedField::isOptional)).isTrue(); + } + + @Test + public void testIdentifyFieldsByName() { + Schema existingSchema = + new Schema(Types.NestedField.optional(42, "myField", Types.LongType.get())); + SchemaUpdate updateApi = new SchemaUpdate(existingSchema, 0); + Schema newSchema = + new Schema(Arrays.asList(Types.NestedField.optional(-1, "myField", Types.LongType.get()))); + EvolveSchemaVisitor.visit(updateApi, existingSchema, newSchema); + assertThat(updateApi.apply().sameSchema(existingSchema)).isTrue(); + } + + @Test + public void testChangeOrderTopLevelPrimitives() { + Schema existingSchema = + new Schema( + Arrays.asList(optional(1, "a", StringType.get()), optional(2, "b", StringType.get()))); + Schema targetSchema = + new Schema( + Arrays.asList(optional(2, "b", StringType.get()), optional(1, "a", StringType.get()))); + SchemaUpdate updateApi = new SchemaUpdate(existingSchema, 0); + EvolveSchemaVisitor.visit(updateApi, existingSchema, targetSchema); + assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); + } + + @Test + public void testAddTopLevelListOfPrimitives() { + for (PrimitiveType primitiveType : primitiveTypes()) { + Schema targetSchema = new Schema(optional(1, "aList", ListType.ofOptional(2, primitiveType))); + SchemaUpdate updateApi = new SchemaUpdate(new Schema(), 0); + EvolveSchemaVisitor.visit(updateApi, new Schema(), targetSchema); + assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); + } + } + + @Test + public void testMakeTopLevelListOfPrimitivesOptional() { + for (PrimitiveType primitiveType : primitiveTypes()) { + Schema existingSchema = + new Schema(optional(1, "aList", ListType.ofRequired(2, primitiveType))); + Schema targetSchema = new Schema(); + SchemaUpdate updateApi = new SchemaUpdate(existingSchema, 0); + EvolveSchemaVisitor.visit(updateApi, existingSchema, targetSchema); + Schema expectedSchema = + new Schema(optional(1, "aList", ListType.ofRequired(2, primitiveType))); + assertThat(updateApi.apply().asStruct()).isEqualTo(expectedSchema.asStruct()); + } + } + + @Test + public void testAddTopLevelMapOfPrimitives() { + for (PrimitiveType primitiveType : primitiveTypes()) { + Schema targetSchema = + new Schema(optional(1, "aMap", MapType.ofOptional(2, 3, primitiveType, primitiveType))); + SchemaUpdate updateApi = new SchemaUpdate(new Schema(), 0); + EvolveSchemaVisitor.visit(updateApi, new Schema(), targetSchema); + assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); + } + } + + @Test + public void testAddTopLevelStructOfPrimitives() { + for (PrimitiveType primitiveType : primitiveTypes()) { + Schema currentSchema = + new Schema( + optional(1, "aStruct", StructType.of(optional(2, "primitive", primitiveType)))); + SchemaUpdate updateApi = new SchemaUpdate(new Schema(), 0); + EvolveSchemaVisitor.visit(updateApi, new Schema(), currentSchema); + assertThat(updateApi.apply().asStruct()).isEqualTo(currentSchema.asStruct()); + } + } + + @Test + public void testAddNestedPrimitive() { + for (PrimitiveType primitiveType : primitiveTypes()) { + Schema currentSchema = new Schema(optional(1, "aStruct", StructType.of())); + Schema targetSchema = + new Schema( + optional(1, "aStruct", StructType.of(optional(2, "primitive", primitiveType)))); + SchemaUpdate updateApi = new SchemaUpdate(currentSchema, 1); + EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); + assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); + } + } + + @Test + public void testMakeNestedPrimitiveOptional() { + for (PrimitiveType primitiveType : primitiveTypes()) { + Schema currentSchema = + new Schema( + optional(1, "aStruct", StructType.of(required(2, "primitive", primitiveType)))); + Schema targetSchema = + new Schema( + optional(1, "aStruct", StructType.of(optional(2, "primitive", primitiveType)))); + SchemaUpdate updateApi = new SchemaUpdate(currentSchema, 1); + EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); + assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); + } + } + + @Test + public void testAddNestedPrimitives() { + Schema currentSchema = new Schema(optional(1, "aStruct", StructType.of())); + Schema targetSchema = + new Schema(optional(1, "aStruct", StructType.of(primitiveFields(1, primitiveTypes())))); + SchemaUpdate updateApi = new SchemaUpdate(currentSchema, 1); + EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); + assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); + } + + @Test + public void testAddNestedLists() { + Schema targetSchema = + new Schema( + optional( + 1, + "aList", + ListType.ofOptional( + 2, + ListType.ofOptional( + 3, + ListType.ofOptional( + 4, + ListType.ofOptional( + 5, + ListType.ofOptional( + 6, + ListType.ofOptional( + 7, + ListType.ofOptional( + 8, + ListType.ofOptional( + 9, + ListType.ofOptional( + 10, DecimalType.of(11, 20)))))))))))); + SchemaUpdate updateApi = new SchemaUpdate(new Schema(), 0); + EvolveSchemaVisitor.visit(updateApi, new Schema(), targetSchema); + assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); + } + + @Test + public void testAddNestedStruct() { + Schema targetSchema = + new Schema( + optional( + 1, + "struct1", + StructType.of( + optional( + 2, + "struct2", + StructType.of( + optional( + 3, + "struct3", + StructType.of( + optional( + 4, + "struct4", + StructType.of( + optional( + 5, + "struct5", + StructType.of( + optional( + 6, + "struct6", + StructType.of( + optional( + 7, + "aString", + StringType.get())))))))))))))); + SchemaUpdate updateApi = new SchemaUpdate(new Schema(), 0); + EvolveSchemaVisitor.visit(updateApi, new Schema(), targetSchema); + assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); + } + + @Test + public void testAddNestedMaps() { + Schema targetSchema = + new Schema( + optional( + 1, + "struct", + MapType.ofOptional( + 2, + 3, + StringType.get(), + MapType.ofOptional( + 4, + 5, + StringType.get(), + MapType.ofOptional( + 6, + 7, + StringType.get(), + MapType.ofOptional( + 8, + 9, + StringType.get(), + MapType.ofOptional( + 10, + 11, + StringType.get(), + MapType.ofOptional( + 12, 13, StringType.get(), StringType.get())))))))); + SchemaUpdate updateApi = new SchemaUpdate(new Schema(), 0); + EvolveSchemaVisitor.visit(updateApi, new Schema(), targetSchema); + assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); + } + + @Test + public void testDetectInvalidTopLevelList() { + Schema currentSchema = + new Schema(optional(1, "aList", ListType.ofOptional(2, StringType.get()))); + Schema targetSchema = new Schema(optional(1, "aList", ListType.ofOptional(2, LongType.get()))); + assertThatThrownBy( + () -> + EvolveSchemaVisitor.visit( + new SchemaUpdate(currentSchema, 2), currentSchema, targetSchema)) + .hasMessage("Cannot change column type: aList.element: string -> long") + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + public void testDetectInvalidTopLevelMapValue() { + + Schema currentSchema = + new Schema( + optional(1, "aMap", MapType.ofOptional(2, 3, StringType.get(), StringType.get()))); + Schema targetSchema = + new Schema(optional(1, "aMap", MapType.ofOptional(2, 3, StringType.get(), LongType.get()))); + + assertThatThrownBy( + () -> + EvolveSchemaVisitor.visit( + new SchemaUpdate(currentSchema, 3), currentSchema, targetSchema)) + .hasMessage("Cannot change column type: aMap.value: string -> long") + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + public void testDetectInvalidTopLevelMapKey() { + Schema currentSchema = + new Schema( + optional(1, "aMap", MapType.ofOptional(2, 3, StringType.get(), StringType.get()))); + Schema targetSchema = + new Schema(optional(1, "aMap", MapType.ofOptional(2, 3, UUIDType.get(), StringType.get()))); + assertThatThrownBy( + () -> + EvolveSchemaVisitor.visit( + new SchemaUpdate(currentSchema, 3), currentSchema, targetSchema)) + .hasMessage("Cannot change column type: aMap.key: string -> uuid") + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + // int 32-bit signed integers -> Can promote to long + public void testTypePromoteIntegerToLong() { + Schema currentSchema = new Schema(required(1, "aCol", IntegerType.get())); + Schema targetSchema = new Schema(required(1, "aCol", LongType.get())); + + SchemaUpdate updateApi = new SchemaUpdate(currentSchema, 0); + EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); + Schema applied = updateApi.apply(); + assertThat(applied.asStruct().fields().size()).isEqualTo(1); + assertThat(applied.asStruct().fields().get(0).type()).isEqualTo(LongType.get()); + } + + @Test + // float 32-bit IEEE 754 floating point -> Can promote to double + public void testTypePromoteFloatToDouble() { + Schema currentSchema = new Schema(required(1, "aCol", FloatType.get())); + Schema targetSchema = new Schema(required(1, "aCol", DoubleType.get())); + + SchemaUpdate updateApi = new SchemaUpdate(currentSchema, 0); + EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); + Schema applied = updateApi.apply(); + assertThat(applied.asStruct().fields().size()).isEqualTo(1); + assertThat(applied.asStruct().fields().get(0).type()).isEqualTo(DoubleType.get()); + } + + @Test + public void testInvalidTypePromoteDoubleToFloat() { + Schema currentSchema = new Schema(required(1, "aCol", DoubleType.get())); + Schema targetSchema = new Schema(required(1, "aCol", FloatType.get())); + assertThatThrownBy( + () -> + EvolveSchemaVisitor.visit( + new SchemaUpdate(currentSchema, 3), currentSchema, targetSchema)) + .hasMessage("Cannot change column type: aCol: double -> float") + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + // decimal(P,S) Fixed-point decimal; precision P, scale S -> Scale is fixed [1], precision must be + // 38 or less + public void testTypePromoteDecimalToFixedScaleWithWiderPrecision() { + Schema currentSchema = new Schema(required(1, "aCol", DecimalType.of(20, 1))); + Schema targetSchema = new Schema(required(1, "aCol", DecimalType.of(22, 1))); + + SchemaUpdate updateApi = new SchemaUpdate(currentSchema, 1); + EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); + assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); + } + + @Test + public void testAddPrimitiveToNestedStruct() { + Schema existingSchema = + new Schema( + required( + 1, + "struct1", + StructType.of( + optional( + 2, + "struct2", + StructType.of( + optional( + 3, + "list", + ListType.ofOptional( + 4, + StructType.of(optional(5, "number", IntegerType.get()))))))))); + + Schema targetSchema = + new Schema( + required( + 1, + "struct1", + StructType.of( + optional( + 2, + "struct2", + StructType.of( + optional( + 3, + "list", + ListType.ofOptional( + 4, + StructType.of( + optional(5, "number", LongType.get()), + optional(6, "time", TimeType.get()))))))))); + + SchemaUpdate updateApi = new SchemaUpdate(existingSchema, 5); + EvolveSchemaVisitor.visit(updateApi, existingSchema, targetSchema); + assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); + } + + @Test + public void testReplaceListWithPrimitive() { + Schema currentSchema = + new Schema(optional(1, "aColumn", ListType.ofOptional(2, StringType.get()))); + Schema targetSchema = new Schema(optional(1, "aColumn", StringType.get())); + assertThatThrownBy( + () -> + EvolveSchemaVisitor.visit( + new SchemaUpdate(currentSchema, 3), currentSchema, targetSchema)) + .hasMessage("Cannot change column type: aColumn: list -> string") + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + public void addNewTopLevelStruct() { + Schema currentSchema = + new Schema( + optional( + 1, + "map1", + MapType.ofOptional( + 2, + 3, + StringType.get(), + ListType.ofOptional( + 4, StructType.of(optional(5, "string1", StringType.get())))))); + + Schema targetSchema = + new Schema( + optional( + 1, + "map1", + MapType.ofOptional( + 2, + 3, + StringType.get(), + ListType.ofOptional( + 4, StructType.of(optional(5, "string1", StringType.get()))))), + optional( + 6, + "struct1", + StructType.of( + optional(7, "d1", StructType.of(optional(8, "d2", StringType.get())))))); + + SchemaUpdate updateApi = new SchemaUpdate(currentSchema, 5); + EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); + assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); + } + + @Test + public void testAppendNestedStruct() { + Schema currentSchema = + new Schema( + required( + 1, + "s1", + StructType.of( + optional( + 2, + "s2", + StructType.of( + optional( + 3, "s3", StructType.of(optional(4, "s4", StringType.get())))))))); + + Schema targetSchema = + new Schema( + required( + 1, + "s1", + StructType.of( + optional( + 2, + "s2", + StructType.of( + optional(3, "s3", StructType.of(optional(4, "s4", StringType.get()))), + optional( + 5, + "repeat", + StructType.of( + optional( + 6, + "s1", + StructType.of( + optional( + 7, + "s2", + StructType.of( + optional( + 8, + "s3", + StructType.of( + optional( + 9, + "s4", + StringType.get())))))))))))))); + + SchemaUpdate updateApi = new SchemaUpdate(currentSchema, 4); + EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); + assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); + } + + @Test + public void testMakeNestedStructOptional() { + Schema currentSchema = getNestedSchemaWithOptionalModifier(false); + Schema targetSchema = + new Schema( + required( + 1, + "s1", + StructType.of( + optional( + 2, + "s2", + StructType.of( + optional( + 3, "s3", StructType.of(optional(4, "s4", StringType.get())))))))); + SchemaUpdate updateApi = new SchemaUpdate(currentSchema, 9); + EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); + assertThat(getNestedSchemaWithOptionalModifier(true).asStruct()) + .isEqualTo(updateApi.apply().asStruct()); + } + + private static Schema getNestedSchemaWithOptionalModifier(boolean nestedIsOptional) { + return new Schema( + required( + 1, + "s1", + StructType.of( + optional( + 2, + "s2", + StructType.of( + optional(3, "s3", StructType.of(optional(4, "s4", StringType.get()))), + of( + 5, + nestedIsOptional, + "repeat", + StructType.of( + optional( + 6, + "s1", + StructType.of( + optional( + 7, + "s2", + StructType.of( + optional( + 8, + "s3", + StructType.of( + optional( + 9, "s4", StringType.get())))))))))))))); + } +} diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestPartitionSpecEvolution.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestPartitionSpecEvolution.java new file mode 100644 index 000000000000..3e7025de6f91 --- /dev/null +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestPartitionSpecEvolution.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; + +public class TestPartitionSpecEvolution { + + @Test + void testCompatible() { + Schema schema = + new Schema( + Types.NestedField.required(0, "id", Types.IntegerType.get()), + Types.NestedField.required(1, "data", Types.StringType.get())); + + PartitionSpec spec1 = PartitionSpec.builderFor(schema).bucket("id", 10).build(); + PartitionSpec spec2 = PartitionSpec.builderFor(schema).bucket("id", 10).build(); + + // Happy case, source ids and names match + assertThat(PartitionSpecEvolution.checkCompatibility(spec1, spec2)).isTrue(); + } + + @Test + void testNotCompatibleDifferentTransform() { + Schema schema = + new Schema( + Types.NestedField.required(0, "id", Types.IntegerType.get()), + Types.NestedField.required(1, "data", Types.StringType.get())); + + PartitionSpec spec1 = PartitionSpec.builderFor(schema).bucket("id", 10).build(); + // Same spec als spec1 but different number of buckets + PartitionSpec spec2 = PartitionSpec.builderFor(schema).bucket("id", 23).build(); + + assertThat(PartitionSpecEvolution.checkCompatibility(spec1, spec2)).isFalse(); + } + + @Test + void testNotCompatibleMoreFields() { + Schema schema = + new Schema( + Types.NestedField.required(0, "id", Types.IntegerType.get()), + Types.NestedField.required(1, "data", Types.StringType.get())); + + PartitionSpec spec1 = PartitionSpec.builderFor(schema).bucket("id", 10).build(); + // Additional field + PartitionSpec spec2 = + PartitionSpec.builderFor(schema).bucket("id", 10).truncate("data", 1).build(); + + assertThat(PartitionSpecEvolution.checkCompatibility(spec1, spec2)).isFalse(); + } + + @Test + void testCompatibleWithNonMatchingSourceIds() { + Schema schema1 = + new Schema( + // Use zero-based field ids + Types.NestedField.required(0, "id", Types.IntegerType.get()), + Types.NestedField.required(1, "data", Types.StringType.get())); + + PartitionSpec spec1 = PartitionSpec.builderFor(schema1).bucket("id", 10).build(); + + Schema schema2 = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())); + + // Same spec als spec1 but bound to a different schema + PartitionSpec spec2 = PartitionSpec.builderFor(schema2).bucket("id", 10).build(); + + // Compatible because the source names match + assertThat(PartitionSpecEvolution.checkCompatibility(spec1, spec2)).isTrue(); + } + + @Test + void testPartitionSpecEvolution() { + Schema schema1 = + new Schema( + Types.NestedField.required(0, "id", Types.IntegerType.get()), + Types.NestedField.required(1, "data", Types.StringType.get())); + + PartitionSpec spec1 = PartitionSpec.builderFor(schema1).bucket("id", 10).build(); + + Schema schema2 = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())); + + // Change num buckets + PartitionSpec spec2 = PartitionSpec.builderFor(schema2).bucket("id", 23).build(); + + assertThat(PartitionSpecEvolution.checkCompatibility(spec1, spec2)).isFalse(); + PartitionSpecEvolution.PartitionSpecChanges result = + PartitionSpecEvolution.evolve(spec1, spec2); + + assertThat(result.termsToAdd().toString()).isEqualTo("[bucket[23](ref(name=\"id\"))]"); + assertThat(result.termsToRemove().toString()).isEqualTo("[bucket[10](ref(name=\"id\"))]"); + } + + @Test + void testPartitionSpecEvolutionAddField() { + Schema schema = + new Schema( + Types.NestedField.required(0, "id", Types.IntegerType.get()), + Types.NestedField.required(1, "data", Types.StringType.get())); + + PartitionSpec spec1 = PartitionSpec.builderFor(schema).build(); + // Add field + PartitionSpec spec2 = PartitionSpec.builderFor(schema).bucket("id", 23).build(); + + assertThat(PartitionSpecEvolution.checkCompatibility(spec1, spec2)).isFalse(); + PartitionSpecEvolution.PartitionSpecChanges result = + PartitionSpecEvolution.evolve(spec1, spec2); + + assertThat(result.termsToAdd().toString()).isEqualTo("[bucket[23](ref(name=\"id\"))]"); + assertThat(result.termsToRemove().toString()).isEqualTo("[]"); + } + + @Test + void testPartitionSpecEvolutionRemoveField() { + Schema schema = + new Schema( + Types.NestedField.required(0, "id", Types.IntegerType.get()), + Types.NestedField.required(1, "data", Types.StringType.get())); + + PartitionSpec spec1 = PartitionSpec.builderFor(schema).bucket("id", 23).build(); + // Remove field + PartitionSpec spec2 = PartitionSpec.builderFor(schema).build(); + + assertThat(PartitionSpecEvolution.checkCompatibility(spec1, spec2)).isFalse(); + PartitionSpecEvolution.PartitionSpecChanges result = + PartitionSpecEvolution.evolve(spec1, spec2); + + assertThat(result.termsToAdd().toString()).isEqualTo("[]"); + assertThat(result.termsToRemove().toString()).isEqualTo("[bucket[23](ref(name=\"id\"))]"); + } + + @Test + void testPartitionSpecEvolutionWithNestedFields() { + Schema schema1 = + new Schema( + Types.NestedField.required(0, "id", Types.IntegerType.get()), + Types.NestedField.required( + 1, + "data", + Types.StructType.of(Types.NestedField.required(2, "str", Types.StringType.get())))); + + PartitionSpec spec1 = PartitionSpec.builderFor(schema1).bucket("data.str", 10).build(); + + Schema schema2 = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required( + 2, + "data", + Types.StructType.of(Types.NestedField.required(3, "str", Types.StringType.get())))); + + // Change num buckets + PartitionSpec spec2 = PartitionSpec.builderFor(schema2).bucket("data.str", 23).build(); + + assertThat(PartitionSpecEvolution.checkCompatibility(spec1, spec2)).isFalse(); + PartitionSpecEvolution.PartitionSpecChanges result = + PartitionSpecEvolution.evolve(spec1, spec2); + + assertThat(result.termsToAdd().toString()).isEqualTo("[bucket[23](ref(name=\"data.str\"))]"); + assertThat(result.termsToRemove().toString()).isEqualTo("[bucket[10](ref(name=\"data.str\"))]"); + } +} diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableDataCache.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableDataCache.java new file mode 100644 index 000000000000..3a55a9ca763a --- /dev/null +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableDataCache.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.apache.commons.lang3.SerializationUtils; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; + +public class TestTableDataCache extends TestFlinkIcebergSinkBase { + + static final Schema SCHEMA = + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get())); + + static final Schema SCHEMA2 = + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get()), + Types.NestedField.optional(3, "extra", Types.StringType.get())); + + @Test + void testCaching() { + Catalog catalog = CATALOG_EXTENSION.catalog(); + TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable"); + catalog.createTable(tableIdentifier, SCHEMA); + TableDataCache cache = new TableDataCache(catalog, 10, Long.MAX_VALUE); + + Schema schema1 = cache.schema(tableIdentifier, SCHEMA).f0; + assertThat(schema1.sameSchema(SCHEMA)).isTrue(); + assertThat(cache.schema(tableIdentifier, SerializationUtils.clone(SCHEMA)).f0) + .isEqualTo(schema1); + + assertThat(cache.schema(tableIdentifier, SCHEMA2)).isEqualTo(TableDataCache.NOT_FOUND); + + schema1 = cache.schema(tableIdentifier, SCHEMA).f0; + assertThat(cache.schema(tableIdentifier, SerializationUtils.clone(SCHEMA)).f0) + .isEqualTo(schema1); + } + + @Test + void testCacheInvalidationAfterSchemaChange() { + Catalog catalog = CATALOG_EXTENSION.catalog(); + TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable"); + catalog.createTable(tableIdentifier, SCHEMA); + TableDataCache cache = new TableDataCache(catalog, 10, Long.MAX_VALUE); + TableUpdater tableUpdater = new TableUpdater(cache, catalog); + + Schema schema1 = cache.schema(tableIdentifier, SCHEMA).f0; + assertThat(schema1.sameSchema(SCHEMA)).isTrue(); + + catalog.dropTable(tableIdentifier); + catalog.createTable(tableIdentifier, SCHEMA2); + tableUpdater.update(tableIdentifier, "main", SCHEMA2, PartitionSpec.unpartitioned()); + + Schema schema2 = cache.schema(tableIdentifier, SCHEMA2).f0; + assertThat(schema2.sameSchema(SCHEMA2)).isTrue(); + } + + @Test + void testCachingDisabled() { + Catalog catalog = CATALOG_EXTENSION.catalog(); + TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable"); + catalog.createTable(tableIdentifier, SCHEMA); + TableDataCache cache = new TableDataCache(catalog, 0, Long.MAX_VALUE); + + // Cleanup routine doesn't run after every write + cache.getInternalCache().cleanUp(); + assertThat(cache.getInternalCache().estimatedSize()).isEqualTo(0); + } +} diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java new file mode 100644 index 000000000000..d880dbb003a1 --- /dev/null +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; + +public class TestTableUpdater extends TestFlinkIcebergSinkBase { + + static final Schema SCHEMA = + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get())); + + static final Schema SCHEMA2 = + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get()), + Types.NestedField.optional(3, "extra", Types.StringType.get())); + + @Test + void testInvalidateOldCacheEntryOnUpdate() { + Catalog catalog = CATALOG_EXTENSION.catalog(); + TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable"); + catalog.createTable(tableIdentifier, SCHEMA); + TableDataCache cache = new TableDataCache(catalog, 10, Long.MAX_VALUE); + cache.schema(tableIdentifier, SCHEMA); + TableUpdater tableUpdater = new TableUpdater(cache, catalog); + + Schema updated = + tableUpdater.update(tableIdentifier, "main", SCHEMA2, PartitionSpec.unpartitioned()).f0; + assertThat(updated.sameSchema(SCHEMA2)); + assertThat(cache.schema(tableIdentifier, SCHEMA2).f0.sameSchema(SCHEMA2)).isTrue(); + } + + @Test + void testLastResultInvalidation() { + Catalog catalog = CATALOG_EXTENSION.catalog(); + TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable"); + catalog.createTable(tableIdentifier, SCHEMA); + TableDataCache cache = new TableDataCache(catalog, 10, Long.MAX_VALUE); + TableUpdater tableUpdater = new TableUpdater(cache, catalog); + + // Initialize cache + tableUpdater.update(tableIdentifier, "main", SCHEMA, PartitionSpec.unpartitioned()); + + // Update table behind the scenes + catalog.dropTable(tableIdentifier); + catalog.createTable(tableIdentifier, SCHEMA2); + + // Cache still stores the old information + assertThat(cache.schema(tableIdentifier, SCHEMA2).f1) + .isEqualTo(CompareSchemasVisitor.Result.INCOMPATIBLE); + + assertThat( + tableUpdater.update(tableIdentifier, "main", SCHEMA2, PartitionSpec.unpartitioned()).f1) + .isEqualTo(CompareSchemasVisitor.Result.SAME); + + // Last result cache should be cleared + assertThat( + cache + .getInternalCache() + .getIfPresent(tableIdentifier) + .getSchemaInfo() + .getLastResult(SCHEMA2)) + .isNull(); + } +} From 5a7f0850dfcbeae357d4843c5503857b95f0c117 Mon Sep 17 00:00:00 2001 From: Max Michels Date: Tue, 20 May 2025 15:04:25 +0200 Subject: [PATCH 03/16] Address PR comments --- .../sink/dynamic/CompareSchemasVisitor.java | 14 ++++++++++++- .../sink/dynamic/EvolveSchemaVisitor.java | 20 +++++++++++++++++++ .../sink/dynamic/PartitionSpecEvolution.java | 11 +++++----- ...DataCache.java => TableMetadataCache.java} | 11 +++++++--- .../flink/sink/dynamic/TableUpdater.java | 19 +++++++++--------- ...Cache.java => TestTableMetadataCache.java} | 10 +++++----- .../flink/sink/dynamic/TestTableUpdater.java | 4 ++-- 7 files changed, 63 insertions(+), 26 deletions(-) rename flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/{TableDataCache.java => TableMetadataCache.java} (95%) rename flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/{TestTableDataCache.java => TestTableMetadataCache.java} (90%) diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java index 99abc95d7102..b1b37e451d43 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java @@ -26,7 +26,19 @@ import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; -/** Visitor class which compares two schemas and decides whether they are compatible. */ +/** + * Visitor class which compares an input schema to a table schema and emits a compatibility {@link + * Result}. + * + *

    + *
  • SAME: The two schemas are semantically identical + *
  • CONVERSION_NEEDED: We can evolve the data associated with the input schema to match the + * table schema. + *
  • INCOMPATIBLE: We need to migrate the table schema to match the input schema. + *
+ * + * The input schema fields are compared to the table schema via their names. + */ public class CompareSchemasVisitor extends SchemaWithPartnerVisitor { diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/EvolveSchemaVisitor.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/EvolveSchemaVisitor.java index 6b020a5bda73..85fd5753919e 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/EvolveSchemaVisitor.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/EvolveSchemaVisitor.java @@ -29,6 +29,26 @@ /** * Visitor class that accumulates the set of changes needed to evolve an existing schema into the * target schema. Changes are applied to an {@link UpdateSchema} operation. + * + *

We support: + * + *

    + *
  • Adding new columns + *
  • Widening the type of existing columsn + *
  • Reordering columns + *
+ * + * We don't support: + * + *
    + *
  • Dropping columns + *
  • Renaming columns + *
+ * + * The reason is that dropping columns would create issues with late / out of order data. Once we + * drop fields, we wouldn't be able to easily add them back later without losing the associated + * data. Renaming columns is not supported because we compare schemas by name, which doesn't allow + * for renaming without additional hints. */ public class EvolveSchemaVisitor extends SchemaWithPartnerVisitor { diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/PartitionSpecEvolution.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/PartitionSpecEvolution.java index 760340686a8c..9cb4a99a11e1 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/PartitionSpecEvolution.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/PartitionSpecEvolution.java @@ -25,6 +25,7 @@ import org.apache.iceberg.expressions.NamedReference; import org.apache.iceberg.expressions.Term; import org.apache.iceberg.expressions.UnboundTransform; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -112,12 +113,10 @@ public boolean isEmpty() { @Override public String toString() { - return "PartitionSpecChanges{" - + "termsToAdd=" - + termsToAdd - + ", termsToRemove=" - + termsToRemove - + '}'; + return MoreObjects.toStringHelper(PartitionSpecEvolution.class) + .add("termsToAdd", termsToAdd) + .add("termsToRemove", termsToRemove) + .toString(); } } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableDataCache.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java similarity index 95% rename from flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableDataCache.java rename to flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java index fa494635efff..76d4cfde354d 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableDataCache.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java @@ -35,10 +35,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * TableMetadataCache is responsible for caching table metadata to avoid hitting the catalog too + * frequently. We store table identifier, schema, partition spec, and a set of past schema + * comparison results of the active table schema against the last input schemas. + */ @Internal -class TableDataCache { +class TableMetadataCache { - private static final Logger LOG = LoggerFactory.getLogger(TableDataCache.class); + private static final Logger LOG = LoggerFactory.getLogger(TableMetadataCache.class); private static final int MAX_SIZE = 10; private static final Tuple2 EXISTS = Tuple2.of(true, null); private static final Tuple2 NOT_EXISTS = Tuple2.of(false, null); @@ -49,7 +54,7 @@ class TableDataCache { private final long refreshMs; private final Cache cache; - TableDataCache(Catalog catalog, int maximumSize, long refreshMs) { + TableMetadataCache(Catalog catalog, int maximumSize, long refreshMs) { this.catalog = catalog; this.refreshMs = refreshMs; this.cache = Caffeine.newBuilder().maximumSize(maximumSize).build(); diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java index e95f4be0a2ae..488dac617259 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java @@ -39,10 +39,10 @@ class TableUpdater { private static final Logger LOG = LoggerFactory.getLogger(TableUpdater.class); - private final TableDataCache cache; + private final TableMetadataCache cache; private final Catalog catalog; - TableUpdater(TableDataCache cache, Catalog catalog) { + TableUpdater(TableMetadataCache cache, Catalog catalog) { this.cache = cache; this.catalog = catalog; } @@ -183,18 +183,19 @@ private PartitionSpec findOrCreateSpec(TableIdentifier identifier, PartitionSpec try { updater.commit(); } catch (CommitFailedException e) { - LOG.info( - "Partition spec update failed for {} from {} to {}", - identifier, - currentSpec, - targetSpec, - e); + cache.invalidate(identifier); PartitionSpec newSpec = cache.spec(identifier, targetSpec); result = PartitionSpecEvolution.evolve(targetSpec, newSpec); if (result.isEmpty()) { - LOG.info("Table {} partition spec updated concurrently to {}", identifier, newSpec); + LOG.debug("Table {} partition spec updated concurrently to {}", identifier, newSpec); return newSpec; } else { + LOG.error( + "Partition spec update failed for {} from {} to {}", + identifier, + currentSpec, + targetSpec, + e); throw e; } } diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableDataCache.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java similarity index 90% rename from flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableDataCache.java rename to flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java index 3a55a9ca763a..cedae887041e 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableDataCache.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java @@ -29,7 +29,7 @@ import org.apache.iceberg.types.Types; import org.junit.jupiter.api.Test; -public class TestTableDataCache extends TestFlinkIcebergSinkBase { +public class TestTableMetadataCache extends TestFlinkIcebergSinkBase { static final Schema SCHEMA = new Schema( @@ -47,14 +47,14 @@ void testCaching() { Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable"); catalog.createTable(tableIdentifier, SCHEMA); - TableDataCache cache = new TableDataCache(catalog, 10, Long.MAX_VALUE); + TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE); Schema schema1 = cache.schema(tableIdentifier, SCHEMA).f0; assertThat(schema1.sameSchema(SCHEMA)).isTrue(); assertThat(cache.schema(tableIdentifier, SerializationUtils.clone(SCHEMA)).f0) .isEqualTo(schema1); - assertThat(cache.schema(tableIdentifier, SCHEMA2)).isEqualTo(TableDataCache.NOT_FOUND); + assertThat(cache.schema(tableIdentifier, SCHEMA2)).isEqualTo(TableMetadataCache.NOT_FOUND); schema1 = cache.schema(tableIdentifier, SCHEMA).f0; assertThat(cache.schema(tableIdentifier, SerializationUtils.clone(SCHEMA)).f0) @@ -66,7 +66,7 @@ void testCacheInvalidationAfterSchemaChange() { Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable"); catalog.createTable(tableIdentifier, SCHEMA); - TableDataCache cache = new TableDataCache(catalog, 10, Long.MAX_VALUE); + TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE); TableUpdater tableUpdater = new TableUpdater(cache, catalog); Schema schema1 = cache.schema(tableIdentifier, SCHEMA).f0; @@ -85,7 +85,7 @@ void testCachingDisabled() { Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable"); catalog.createTable(tableIdentifier, SCHEMA); - TableDataCache cache = new TableDataCache(catalog, 0, Long.MAX_VALUE); + TableMetadataCache cache = new TableMetadataCache(catalog, 0, Long.MAX_VALUE); // Cleanup routine doesn't run after every write cache.getInternalCache().cleanUp(); diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java index d880dbb003a1..2837a022c970 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java @@ -46,7 +46,7 @@ void testInvalidateOldCacheEntryOnUpdate() { Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable"); catalog.createTable(tableIdentifier, SCHEMA); - TableDataCache cache = new TableDataCache(catalog, 10, Long.MAX_VALUE); + TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE); cache.schema(tableIdentifier, SCHEMA); TableUpdater tableUpdater = new TableUpdater(cache, catalog); @@ -61,7 +61,7 @@ void testLastResultInvalidation() { Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable"); catalog.createTable(tableIdentifier, SCHEMA); - TableDataCache cache = new TableDataCache(catalog, 10, Long.MAX_VALUE); + TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE); TableUpdater tableUpdater = new TableUpdater(cache, catalog); // Initialize cache From 3ccaf8648ddb5f8bf951f114063f56bcde19c48d Mon Sep 17 00:00:00 2001 From: Max Michels Date: Wed, 21 May 2025 13:06:42 +0200 Subject: [PATCH 04/16] Avoid core class visibility changes --- .../iceberg/expressions/NamedReference.java | 2 +- .../iceberg/expressions/UnboundTransform.java | 2 +- .../java/org/apache/iceberg/SchemaUpdate.java | 4 +- .../sink/dynamic/PartitionSpecEvolution.java | 5 +- .../sink/dynamic/TestEvolveSchemaVisitor.java | 69 +++++++++++-------- 5 files changed, 48 insertions(+), 34 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/expressions/NamedReference.java b/api/src/main/java/org/apache/iceberg/expressions/NamedReference.java index a05370f347dc..cc5ba3ceaf4f 100644 --- a/api/src/main/java/org/apache/iceberg/expressions/NamedReference.java +++ b/api/src/main/java/org/apache/iceberg/expressions/NamedReference.java @@ -26,7 +26,7 @@ public class NamedReference implements UnboundTerm, Reference { private final String name; - public NamedReference(String name) { + NamedReference(String name) { Preconditions.checkNotNull(name, "Name cannot be null"); this.name = name; } diff --git a/api/src/main/java/org/apache/iceberg/expressions/UnboundTransform.java b/api/src/main/java/org/apache/iceberg/expressions/UnboundTransform.java index 05f52cfff31e..cae84733c8d5 100644 --- a/api/src/main/java/org/apache/iceberg/expressions/UnboundTransform.java +++ b/api/src/main/java/org/apache/iceberg/expressions/UnboundTransform.java @@ -26,7 +26,7 @@ public class UnboundTransform implements UnboundTerm, Term { private final NamedReference ref; private final Transform transform; - public UnboundTransform(NamedReference ref, Transform transform) { + UnboundTransform(NamedReference ref, Transform transform) { this.ref = ref; this.transform = transform; } diff --git a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java index 17a9979f3064..8f2bfe184cab 100644 --- a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java +++ b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java @@ -46,7 +46,7 @@ import org.slf4j.LoggerFactory; /** Schema evolution API implementation. */ -public class SchemaUpdate implements UpdateSchema { +class SchemaUpdate implements UpdateSchema { private static final Logger LOG = LoggerFactory.getLogger(SchemaUpdate.class); private static final int TABLE_ROOT_ID = -1; @@ -71,7 +71,7 @@ public class SchemaUpdate implements UpdateSchema { } /** For testing only. */ - public SchemaUpdate(Schema schema, int lastColumnId) { + SchemaUpdate(Schema schema, int lastColumnId) { this(null, null, schema, lastColumnId); } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/PartitionSpecEvolution.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/PartitionSpecEvolution.java index 9cb4a99a11e1..90b6c7295cb7 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/PartitionSpecEvolution.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/PartitionSpecEvolution.java @@ -22,9 +22,8 @@ import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; -import org.apache.iceberg.expressions.NamedReference; +import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.expressions.Term; -import org.apache.iceberg.expressions.UnboundTransform; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -122,7 +121,7 @@ public String toString() { private static Term toTerm(PartitionField field, Schema schema) { String sourceName = schema.idToName().get(field.sourceId()); - return new UnboundTransform<>(new NamedReference<>(sourceName), field.transform()); + return Expressions.transform(sourceName, field.transform()); } private static boolean specFieldsAreCompatible( diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestEvolveSchemaVisitor.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestEvolveSchemaVisitor.java index d9bcaff182a7..58ee84529193 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestEvolveSchemaVisitor.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestEvolveSchemaVisitor.java @@ -24,11 +24,12 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import java.lang.reflect.Constructor; import java.util.Arrays; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import org.apache.iceberg.Schema; -import org.apache.iceberg.SchemaUpdate; +import org.apache.iceberg.UpdateSchema; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Type.PrimitiveType; import org.apache.iceberg.types.Types; @@ -87,7 +88,7 @@ private static Types.NestedField[] primitiveFields( @Test public void testAddTopLevelPrimitives() { Schema targetSchema = new Schema(primitiveFields(0, primitiveTypes())); - SchemaUpdate updateApi = new SchemaUpdate(new Schema(), 0); + UpdateSchema updateApi = loadUpdateApi(new Schema(), 0); EvolveSchemaVisitor.visit(updateApi, new Schema(), targetSchema); assertThat(targetSchema.asStruct()).isEqualTo(updateApi.apply().asStruct()); } @@ -97,7 +98,7 @@ public void testMakeTopLevelPrimitivesOptional() { Schema existingSchema = new Schema(primitiveFields(0, primitiveTypes(), false)); assertThat(existingSchema.columns().stream().allMatch(Types.NestedField::isRequired)).isTrue(); - SchemaUpdate updateApi = new SchemaUpdate(existingSchema, 0); + UpdateSchema updateApi = loadUpdateApi(existingSchema, 0); EvolveSchemaVisitor.visit(updateApi, existingSchema, new Schema()); Schema newSchema = updateApi.apply(); assertThat(newSchema.asStruct().fields().size()).isEqualTo(14); @@ -108,7 +109,7 @@ public void testMakeTopLevelPrimitivesOptional() { public void testIdentifyFieldsByName() { Schema existingSchema = new Schema(Types.NestedField.optional(42, "myField", Types.LongType.get())); - SchemaUpdate updateApi = new SchemaUpdate(existingSchema, 0); + UpdateSchema updateApi = loadUpdateApi(existingSchema, 0); Schema newSchema = new Schema(Arrays.asList(Types.NestedField.optional(-1, "myField", Types.LongType.get()))); EvolveSchemaVisitor.visit(updateApi, existingSchema, newSchema); @@ -123,7 +124,7 @@ public void testChangeOrderTopLevelPrimitives() { Schema targetSchema = new Schema( Arrays.asList(optional(2, "b", StringType.get()), optional(1, "a", StringType.get()))); - SchemaUpdate updateApi = new SchemaUpdate(existingSchema, 0); + UpdateSchema updateApi = loadUpdateApi(existingSchema, 0); EvolveSchemaVisitor.visit(updateApi, existingSchema, targetSchema); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } @@ -132,7 +133,7 @@ public void testChangeOrderTopLevelPrimitives() { public void testAddTopLevelListOfPrimitives() { for (PrimitiveType primitiveType : primitiveTypes()) { Schema targetSchema = new Schema(optional(1, "aList", ListType.ofOptional(2, primitiveType))); - SchemaUpdate updateApi = new SchemaUpdate(new Schema(), 0); + UpdateSchema updateApi = loadUpdateApi(new Schema(), 0); EvolveSchemaVisitor.visit(updateApi, new Schema(), targetSchema); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } @@ -144,7 +145,7 @@ public void testMakeTopLevelListOfPrimitivesOptional() { Schema existingSchema = new Schema(optional(1, "aList", ListType.ofRequired(2, primitiveType))); Schema targetSchema = new Schema(); - SchemaUpdate updateApi = new SchemaUpdate(existingSchema, 0); + UpdateSchema updateApi = loadUpdateApi(existingSchema, 0); EvolveSchemaVisitor.visit(updateApi, existingSchema, targetSchema); Schema expectedSchema = new Schema(optional(1, "aList", ListType.ofRequired(2, primitiveType))); @@ -157,7 +158,7 @@ public void testAddTopLevelMapOfPrimitives() { for (PrimitiveType primitiveType : primitiveTypes()) { Schema targetSchema = new Schema(optional(1, "aMap", MapType.ofOptional(2, 3, primitiveType, primitiveType))); - SchemaUpdate updateApi = new SchemaUpdate(new Schema(), 0); + UpdateSchema updateApi = loadUpdateApi(new Schema(), 0); EvolveSchemaVisitor.visit(updateApi, new Schema(), targetSchema); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } @@ -169,7 +170,7 @@ public void testAddTopLevelStructOfPrimitives() { Schema currentSchema = new Schema( optional(1, "aStruct", StructType.of(optional(2, "primitive", primitiveType)))); - SchemaUpdate updateApi = new SchemaUpdate(new Schema(), 0); + UpdateSchema updateApi = loadUpdateApi(new Schema(), 0); EvolveSchemaVisitor.visit(updateApi, new Schema(), currentSchema); assertThat(updateApi.apply().asStruct()).isEqualTo(currentSchema.asStruct()); } @@ -182,7 +183,7 @@ public void testAddNestedPrimitive() { Schema targetSchema = new Schema( optional(1, "aStruct", StructType.of(optional(2, "primitive", primitiveType)))); - SchemaUpdate updateApi = new SchemaUpdate(currentSchema, 1); + UpdateSchema updateApi = loadUpdateApi(currentSchema, 1); EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } @@ -197,7 +198,7 @@ public void testMakeNestedPrimitiveOptional() { Schema targetSchema = new Schema( optional(1, "aStruct", StructType.of(optional(2, "primitive", primitiveType)))); - SchemaUpdate updateApi = new SchemaUpdate(currentSchema, 1); + UpdateSchema updateApi = loadUpdateApi(currentSchema, 1); EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } @@ -208,7 +209,7 @@ public void testAddNestedPrimitives() { Schema currentSchema = new Schema(optional(1, "aStruct", StructType.of())); Schema targetSchema = new Schema(optional(1, "aStruct", StructType.of(primitiveFields(1, primitiveTypes())))); - SchemaUpdate updateApi = new SchemaUpdate(currentSchema, 1); + UpdateSchema updateApi = loadUpdateApi(currentSchema, 1); EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } @@ -238,7 +239,7 @@ public void testAddNestedLists() { 9, ListType.ofOptional( 10, DecimalType.of(11, 20)))))))))))); - SchemaUpdate updateApi = new SchemaUpdate(new Schema(), 0); + UpdateSchema updateApi = loadUpdateApi(new Schema(), 0); EvolveSchemaVisitor.visit(updateApi, new Schema(), targetSchema); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } @@ -275,7 +276,7 @@ public void testAddNestedStruct() { 7, "aString", StringType.get())))))))))))))); - SchemaUpdate updateApi = new SchemaUpdate(new Schema(), 0); + UpdateSchema updateApi = loadUpdateApi(new Schema(), 0); EvolveSchemaVisitor.visit(updateApi, new Schema(), targetSchema); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } @@ -309,7 +310,7 @@ public void testAddNestedMaps() { StringType.get(), MapType.ofOptional( 12, 13, StringType.get(), StringType.get())))))))); - SchemaUpdate updateApi = new SchemaUpdate(new Schema(), 0); + UpdateSchema updateApi = loadUpdateApi(new Schema(), 0); EvolveSchemaVisitor.visit(updateApi, new Schema(), targetSchema); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } @@ -322,7 +323,7 @@ public void testDetectInvalidTopLevelList() { assertThatThrownBy( () -> EvolveSchemaVisitor.visit( - new SchemaUpdate(currentSchema, 2), currentSchema, targetSchema)) + loadUpdateApi(currentSchema, 2), currentSchema, targetSchema)) .hasMessage("Cannot change column type: aList.element: string -> long") .isInstanceOf(IllegalArgumentException.class); } @@ -339,7 +340,7 @@ public void testDetectInvalidTopLevelMapValue() { assertThatThrownBy( () -> EvolveSchemaVisitor.visit( - new SchemaUpdate(currentSchema, 3), currentSchema, targetSchema)) + loadUpdateApi(currentSchema, 3), currentSchema, targetSchema)) .hasMessage("Cannot change column type: aMap.value: string -> long") .isInstanceOf(IllegalArgumentException.class); } @@ -354,7 +355,7 @@ public void testDetectInvalidTopLevelMapKey() { assertThatThrownBy( () -> EvolveSchemaVisitor.visit( - new SchemaUpdate(currentSchema, 3), currentSchema, targetSchema)) + loadUpdateApi(currentSchema, 3), currentSchema, targetSchema)) .hasMessage("Cannot change column type: aMap.key: string -> uuid") .isInstanceOf(IllegalArgumentException.class); } @@ -365,7 +366,7 @@ public void testTypePromoteIntegerToLong() { Schema currentSchema = new Schema(required(1, "aCol", IntegerType.get())); Schema targetSchema = new Schema(required(1, "aCol", LongType.get())); - SchemaUpdate updateApi = new SchemaUpdate(currentSchema, 0); + UpdateSchema updateApi = loadUpdateApi(currentSchema, 0); EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); Schema applied = updateApi.apply(); assertThat(applied.asStruct().fields().size()).isEqualTo(1); @@ -378,7 +379,7 @@ public void testTypePromoteFloatToDouble() { Schema currentSchema = new Schema(required(1, "aCol", FloatType.get())); Schema targetSchema = new Schema(required(1, "aCol", DoubleType.get())); - SchemaUpdate updateApi = new SchemaUpdate(currentSchema, 0); + UpdateSchema updateApi = loadUpdateApi(currentSchema, 0); EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); Schema applied = updateApi.apply(); assertThat(applied.asStruct().fields().size()).isEqualTo(1); @@ -392,7 +393,7 @@ public void testInvalidTypePromoteDoubleToFloat() { assertThatThrownBy( () -> EvolveSchemaVisitor.visit( - new SchemaUpdate(currentSchema, 3), currentSchema, targetSchema)) + loadUpdateApi(currentSchema, 3), currentSchema, targetSchema)) .hasMessage("Cannot change column type: aCol: double -> float") .isInstanceOf(IllegalArgumentException.class); } @@ -404,7 +405,7 @@ public void testTypePromoteDecimalToFixedScaleWithWiderPrecision() { Schema currentSchema = new Schema(required(1, "aCol", DecimalType.of(20, 1))); Schema targetSchema = new Schema(required(1, "aCol", DecimalType.of(22, 1))); - SchemaUpdate updateApi = new SchemaUpdate(currentSchema, 1); + UpdateSchema updateApi = loadUpdateApi(currentSchema, 1); EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } @@ -447,7 +448,7 @@ public void testAddPrimitiveToNestedStruct() { optional(5, "number", LongType.get()), optional(6, "time", TimeType.get()))))))))); - SchemaUpdate updateApi = new SchemaUpdate(existingSchema, 5); + UpdateSchema updateApi = loadUpdateApi(existingSchema, 5); EvolveSchemaVisitor.visit(updateApi, existingSchema, targetSchema); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } @@ -460,7 +461,7 @@ public void testReplaceListWithPrimitive() { assertThatThrownBy( () -> EvolveSchemaVisitor.visit( - new SchemaUpdate(currentSchema, 3), currentSchema, targetSchema)) + loadUpdateApi(currentSchema, 3), currentSchema, targetSchema)) .hasMessage("Cannot change column type: aColumn: list -> string") .isInstanceOf(IllegalArgumentException.class); } @@ -496,7 +497,7 @@ public void addNewTopLevelStruct() { StructType.of( optional(7, "d1", StructType.of(optional(8, "d2", StringType.get())))))); - SchemaUpdate updateApi = new SchemaUpdate(currentSchema, 5); + UpdateSchema updateApi = loadUpdateApi(currentSchema, 5); EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } @@ -548,7 +549,7 @@ public void testAppendNestedStruct() { "s4", StringType.get())))))))))))))); - SchemaUpdate updateApi = new SchemaUpdate(currentSchema, 4); + UpdateSchema updateApi = loadUpdateApi(currentSchema, 4); EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } @@ -568,7 +569,7 @@ public void testMakeNestedStructOptional() { StructType.of( optional( 3, "s3", StructType.of(optional(4, "s4", StringType.get())))))))); - SchemaUpdate updateApi = new SchemaUpdate(currentSchema, 9); + UpdateSchema updateApi = loadUpdateApi(currentSchema, 9); EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); assertThat(getNestedSchemaWithOptionalModifier(true).asStruct()) .isEqualTo(updateApi.apply().asStruct()); @@ -605,4 +606,18 @@ private static Schema getNestedSchemaWithOptionalModifier(boolean nestedIsOption optional( 9, "s4", StringType.get())))))))))))))); } + + private static UpdateSchema loadUpdateApi(Schema schema, int lastColumnId) { + try { + Constructor constructor = + TestEvolveSchemaVisitor.class + .getClassLoader() + .loadClass("org.apache.iceberg.SchemaUpdate") + .getDeclaredConstructor(Schema.class, int.class); + constructor.setAccessible(true); + return (UpdateSchema) constructor.newInstance(schema, lastColumnId); + } catch (Exception e) { + throw new RuntimeException("Failed to instantiate SchemaUpdate class", e); + } + } } From 3fc14f187471e749b8814e91c769bf9e4cfaebe7 Mon Sep 17 00:00:00 2001 From: Max Michels Date: Wed, 21 May 2025 15:28:49 +0200 Subject: [PATCH 05/16] Use clearer terminology --- .../sink/dynamic/CompareSchemasVisitor.java | 44 +++++++++---------- .../sink/dynamic/TableMetadataCache.java | 7 +-- .../flink/sink/dynamic/TableUpdater.java | 8 ++-- .../dynamic/TestCompareSchemasVisitor.java | 16 +++---- .../flink/sink/dynamic/TestTableUpdater.java | 2 +- 5 files changed, 39 insertions(+), 38 deletions(-) diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java index b1b37e451d43..4334cd7d60c2 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java @@ -63,7 +63,7 @@ public static Result visit(Schema dataSchema, Schema tableSchema, boolean caseSe @Override public Result schema(Schema dataSchema, Integer tableSchemaId, Result downstream) { if (tableSchemaId == null) { - return Result.INCOMPATIBLE; + return Result.SCHEMA_UPDATE_NEEDED; } return downstream; @@ -72,23 +72,23 @@ public Result schema(Schema dataSchema, Integer tableSchemaId, Result downstream @Override public Result struct(Types.StructType struct, Integer tableSchemaId, List fields) { if (tableSchemaId == null) { - return Result.INCOMPATIBLE; + return Result.SCHEMA_UPDATE_NEEDED; } - Result result = fields.stream().reduce(Result::merge).orElse(Result.INCOMPATIBLE); + Result result = fields.stream().reduce(Result::merge).orElse(Result.SCHEMA_UPDATE_NEEDED); - if (result == Result.INCOMPATIBLE) { - return Result.INCOMPATIBLE; + if (result == Result.SCHEMA_UPDATE_NEEDED) { + return Result.SCHEMA_UPDATE_NEEDED; } Type tableSchemaType = tableSchemaId == -1 ? tableSchema.asStruct() : tableSchema.findField(tableSchemaId).type(); if (!tableSchemaType.isStructType()) { - return Result.INCOMPATIBLE; + return Result.SCHEMA_UPDATE_NEEDED; } if (struct.fields().size() != tableSchemaType.asStructType().fields().size()) { - return Result.CONVERSION_NEEDED; + return Result.DATA_ADAPTION_NEEDED; } for (int i = 0; i < struct.fields().size(); ++i) { @@ -97,7 +97,7 @@ public Result struct(Types.StructType struct, Integer tableSchemaId, List BY_ID = Maps.newHashMap(); diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java index 76d4cfde354d..c42d4f8ee8ea 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java @@ -48,7 +48,7 @@ class TableMetadataCache { private static final Tuple2 EXISTS = Tuple2.of(true, null); private static final Tuple2 NOT_EXISTS = Tuple2.of(false, null); static final Tuple2 NOT_FOUND = - Tuple2.of(null, CompareSchemasVisitor.Result.INCOMPATIBLE); + Tuple2.of(null, CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); private final Catalog catalog; private final long refreshMs; @@ -126,7 +126,8 @@ private Tuple2 schema( Tuple2.of(tableSchema.getValue(), CompareSchemasVisitor.Result.SAME); cached.schema.update(input, newResult); return newResult; - } else if (compatible == null && result == CompareSchemasVisitor.Result.CONVERSION_NEEDED) { + } else if (compatible == null + && result == CompareSchemasVisitor.Result.DATA_ADAPTION_NEEDED) { compatible = tableSchema.getValue(); } } @@ -137,7 +138,7 @@ private Tuple2 schema( return schema(identifier, input, false); } else if (compatible != null) { Tuple2 newResult = - Tuple2.of(compatible, CompareSchemasVisitor.Result.CONVERSION_NEEDED); + Tuple2.of(compatible, CompareSchemasVisitor.Result.DATA_ADAPTION_NEEDED); cached.schema.update(input, newResult); return newResult; } else if (cached != null && cached.tableExists) { diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java index 488dac617259..b71db8ab7dde 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java @@ -112,7 +112,7 @@ private void findOrCreateBranch(TableIdentifier identifier, String branch) { private Tuple2 findOrCreateSchema( TableIdentifier identifier, Schema schema) { Tuple2 fromCache = cache.schema(identifier, schema); - if (fromCache.f1 != CompareSchemasVisitor.Result.INCOMPATIBLE) { + if (fromCache.f1 != CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED) { return fromCache; } else { Table table = catalog.loadTable(identifier); @@ -120,10 +120,10 @@ private Tuple2 findOrCreateSchema( CompareSchemasVisitor.Result result = CompareSchemasVisitor.visit(schema, tableSchema, true); switch (result) { case SAME: - case CONVERSION_NEEDED: + case DATA_ADAPTION_NEEDED: cache.update(identifier, table); return Tuple2.of(tableSchema, result); - case INCOMPATIBLE: + case SCHEMA_UPDATE_NEEDED: LOG.info( "Triggering schema update for table {} {} to {}", identifier, tableSchema, schema); UpdateSchema updateApi = table.updateSchema(); @@ -142,7 +142,7 @@ private Tuple2 findOrCreateSchema( "Schema update failed for {} from {} to {}", identifier, tableSchema, schema, e); Tuple2 newSchema = cache.schema(identifier, schema); - if (newSchema.f1 != CompareSchemasVisitor.Result.INCOMPATIBLE) { + if (newSchema.f1 != CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED) { LOG.info("Table {} schema updated concurrently to {}", identifier, schema); return newSchema; } else { diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java index 6edebaecce2e..81979d330afd 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java @@ -73,7 +73,7 @@ void testSchemaDifferent() { optional(2, "extra", StringType.get())), new Schema( optional(0, "id", IntegerType.get()), optional(1, "data", StringType.get())))) - .isEqualTo(CompareSchemasVisitor.Result.INCOMPATIBLE); + .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); } @Test @@ -86,7 +86,7 @@ void testSchemaWithMoreColumns() { optional(0, "id", IntegerType.get()), optional(1, "data", StringType.get()), optional(2, "extra", StringType.get())))) - .isEqualTo(CompareSchemasVisitor.Result.CONVERSION_NEEDED); + .isEqualTo(CompareSchemasVisitor.Result.DATA_ADAPTION_NEEDED); } @Test @@ -97,7 +97,7 @@ void testDifferentType() { optional(1, "id", LongType.get()), optional(2, "extra", StringType.get())), new Schema( optional(1, "id", IntegerType.get()), optional(2, "extra", StringType.get())))) - .isEqualTo(CompareSchemasVisitor.Result.INCOMPATIBLE); + .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); } @Test @@ -108,7 +108,7 @@ void testCompatibleType() { optional(1, "id", IntegerType.get()), optional(2, "extra", StringType.get())), new Schema( optional(1, "id", LongType.get()), optional(2, "extra", StringType.get())))) - .isEqualTo(CompareSchemasVisitor.Result.CONVERSION_NEEDED); + .isEqualTo(CompareSchemasVisitor.Result.DATA_ADAPTION_NEEDED); } @Test @@ -118,7 +118,7 @@ void testWithRequiredChange() { Schema tableSchema = new Schema(required(1, "id", IntegerType.get()), optional(2, "extra", StringType.get())); assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema)) - .isEqualTo(CompareSchemasVisitor.Result.INCOMPATIBLE); + .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); assertThat(CompareSchemasVisitor.visit(tableSchema, dataSchema)) .isEqualTo(CompareSchemasVisitor.Result.SAME); } @@ -148,7 +148,7 @@ void testStructChanged() { optional(1, "id", IntegerType.get()), optional( 2, "struct1", StructType.of(optional(3, "extra", IntegerType.get())))))) - .isEqualTo(CompareSchemasVisitor.Result.INCOMPATIBLE); + .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); } @Test @@ -178,7 +178,7 @@ void testMapChanged() { optional(1, "id", IntegerType.get()), optional( 2, "map1", MapType.ofOptional(3, 4, IntegerType.get(), StringType.get()))))) - .isEqualTo(CompareSchemasVisitor.Result.INCOMPATIBLE); + .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); } @Test @@ -204,6 +204,6 @@ void testListChanged() { new Schema( optional(1, "id", IntegerType.get()), optional(2, "list1", ListType.ofOptional(3, IntegerType.get()))))) - .isEqualTo(CompareSchemasVisitor.Result.INCOMPATIBLE); + .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); } } diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java index 2837a022c970..0572256da82a 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java @@ -73,7 +73,7 @@ void testLastResultInvalidation() { // Cache still stores the old information assertThat(cache.schema(tableIdentifier, SCHEMA2).f1) - .isEqualTo(CompareSchemasVisitor.Result.INCOMPATIBLE); + .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); assertThat( tableUpdater.update(tableIdentifier, "main", SCHEMA2, PartitionSpec.unpartitioned()).f1) From 372a9eac6e22d7de550e4b71ddcad4dc04de1e3f Mon Sep 17 00:00:00 2001 From: Max Michels Date: Wed, 21 May 2025 15:30:53 +0200 Subject: [PATCH 06/16] Update JavaDoc --- .../iceberg/flink/sink/dynamic/CompareSchemasVisitor.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java index 4334cd7d60c2..f21c3069adc8 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java @@ -32,9 +32,9 @@ * *
    *
  • SAME: The two schemas are semantically identical - *
  • CONVERSION_NEEDED: We can evolve the data associated with the input schema to match the + *
  • DATA_ADAPTION_NEEDED: We can evolve the data associated with the input schema to match the * table schema. - *
  • INCOMPATIBLE: We need to migrate the table schema to match the input schema. + *
  • SCHEMA_UPDATE_NEEDED: We need to migrate the table schema to match the input schema. *
* * The input schema fields are compared to the table schema via their names. From f19b627e48a1365a2871ce370d2088d90e058fe6 Mon Sep 17 00:00:00 2001 From: Max Michels Date: Fri, 30 May 2025 17:12:39 +0200 Subject: [PATCH 07/16] more JavaDoc --- .../org/apache/iceberg/flink/sink/dynamic/TableUpdater.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java index b71db8ab7dde..9ac23257c45e 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java @@ -35,6 +35,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Creates and maintains a view on Iceberg tables. Updates the Iceberg tables in case of schema, + * branch, or partition changes. + */ @Internal class TableUpdater { From 5ea47aee3844828febd1395cad5bc5d7debc44c4 Mon Sep 17 00:00:00 2001 From: Max Michels Date: Fri, 30 May 2025 17:31:20 +0200 Subject: [PATCH 08/16] Simplify code and add test for AlreadyExists path --- .../flink/sink/dynamic/TableUpdater.java | 23 ++++++++----------- .../flink/sink/dynamic/TestTableUpdater.java | 20 ++++++++++++++++ 2 files changed, 29 insertions(+), 14 deletions(-) diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java index 9ac23257c45e..aebdb8683bff 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java @@ -78,21 +78,16 @@ private void findOrCreateTable(TableIdentifier identifier, Schema schema, Partit } catch (AlreadyExistsException e) { LOG.debug("Namespace {} created concurrently", identifier.namespace(), e); } - - createTable(identifier, schema, spec); - } else { - LOG.info("Table {} not found during table search. Creating table.", identifier); - createTable(identifier, schema, spec); } - } - } - - private void createTable(TableIdentifier identifier, Schema schema, PartitionSpec spec) { - try { - Table table = catalog.createTable(identifier, schema, spec); - cache.update(identifier, table); - } catch (AlreadyExistsException e) { - LOG.info("Table {} created concurrently. Skipping creation.", identifier, e); + LOG.info("Table {} not found during table search. Creating table.", identifier); + try { + Table table = catalog.createTable(identifier, schema, spec); + cache.update(identifier, table); + } catch (AlreadyExistsException e) { + LOG.debug("Table {} created concurrently. Skipping creation.", identifier, e); + cache.invalidate(identifier); + findOrCreateTable(identifier, schema, spec); + } } } diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java index 0572256da82a..c7e07693e093 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java @@ -20,6 +20,7 @@ import static org.assertj.core.api.Assertions.assertThat; +import org.apache.flink.api.java.tuple.Tuple3; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.catalog.Catalog; @@ -88,4 +89,23 @@ void testLastResultInvalidation() { .getLastResult(SCHEMA2)) .isNull(); } + + @Test + void testTableAlreadyExists() { + Catalog catalog = CATALOG_EXTENSION.catalog(); + TableIdentifier tableIdentifier = TableIdentifier.parse("myTable"); + TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE); + TableUpdater tableUpdater = new TableUpdater(cache, catalog); + + // Make the table non-existent in cache + cache.exists(tableIdentifier); + // Create the table + catalog.createTable(tableIdentifier, SCHEMA); + // Make sure that the cache is invalidated and the table refreshed without an error + Tuple3 result = + tableUpdater.update(tableIdentifier, null, SCHEMA, PartitionSpec.unpartitioned()); + assertThat(result.f0.sameSchema(SCHEMA)).isTrue(); + assertThat(result.f1).isEqualTo(CompareSchemasVisitor.Result.SAME); + assertThat(result.f2).isEqualTo(PartitionSpec.unpartitioned()); + } } From fa064fee620d6f1047eb28cc91675fb2be3fbad3 Mon Sep 17 00:00:00 2001 From: Max Michels Date: Tue, 3 Jun 2025 15:05:12 +0200 Subject: [PATCH 09/16] Rename DATA_ADAPTION_NEEDED to DATA_CONVERSION_NEEDED --- .../flink/sink/dynamic/CompareSchemasVisitor.java | 14 +++++++------- .../flink/sink/dynamic/TableMetadataCache.java | 4 ++-- .../iceberg/flink/sink/dynamic/TableUpdater.java | 2 +- .../sink/dynamic/TestCompareSchemasVisitor.java | 4 ++-- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java index f21c3069adc8..bb3a1490ffa2 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java @@ -88,7 +88,7 @@ public Result struct(Types.StructType struct, Integer tableSchemaId, List BY_ID = Maps.newHashMap(); diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java index c42d4f8ee8ea..46037bcfa530 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java @@ -127,7 +127,7 @@ private Tuple2 schema( cached.schema.update(input, newResult); return newResult; } else if (compatible == null - && result == CompareSchemasVisitor.Result.DATA_ADAPTION_NEEDED) { + && result == CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED) { compatible = tableSchema.getValue(); } } @@ -138,7 +138,7 @@ private Tuple2 schema( return schema(identifier, input, false); } else if (compatible != null) { Tuple2 newResult = - Tuple2.of(compatible, CompareSchemasVisitor.Result.DATA_ADAPTION_NEEDED); + Tuple2.of(compatible, CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED); cached.schema.update(input, newResult); return newResult; } else if (cached != null && cached.tableExists) { diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java index aebdb8683bff..22ed90fffdf6 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java @@ -119,7 +119,7 @@ private Tuple2 findOrCreateSchema( CompareSchemasVisitor.Result result = CompareSchemasVisitor.visit(schema, tableSchema, true); switch (result) { case SAME: - case DATA_ADAPTION_NEEDED: + case DATA_CONVERSION_NEEDED: cache.update(identifier, table); return Tuple2.of(tableSchema, result); case SCHEMA_UPDATE_NEEDED: diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java index 81979d330afd..487b0ee7d94a 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java @@ -86,7 +86,7 @@ void testSchemaWithMoreColumns() { optional(0, "id", IntegerType.get()), optional(1, "data", StringType.get()), optional(2, "extra", StringType.get())))) - .isEqualTo(CompareSchemasVisitor.Result.DATA_ADAPTION_NEEDED); + .isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED); } @Test @@ -108,7 +108,7 @@ void testCompatibleType() { optional(1, "id", IntegerType.get()), optional(2, "extra", StringType.get())), new Schema( optional(1, "id", LongType.get()), optional(2, "extra", StringType.get())))) - .isEqualTo(CompareSchemasVisitor.Result.DATA_ADAPTION_NEEDED); + .isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED); } @Test From ef97a6e239eaadf89e6da8dff71fc47f42daed43 Mon Sep 17 00:00:00 2001 From: Max Michels Date: Tue, 3 Jun 2025 15:05:56 +0200 Subject: [PATCH 10/16] Rename MAX_SIZE to MAX_SCHEMA_COMPARISON_RESULTS_TO_CACHE --- .../apache/iceberg/flink/sink/dynamic/TableMetadataCache.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java index 46037bcfa530..3375d4ac6f78 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java @@ -44,7 +44,7 @@ class TableMetadataCache { private static final Logger LOG = LoggerFactory.getLogger(TableMetadataCache.class); - private static final int MAX_SIZE = 10; + private static final int MAX_SCHEMA_COMPARISON_RESULTS_TO_CACHE = 10; private static final Tuple2 EXISTS = Tuple2.of(true, null); private static final Tuple2 NOT_EXISTS = Tuple2.of(false, null); static final Tuple2 NOT_FOUND = @@ -245,7 +245,7 @@ Tuple2 getLastResult(Schema schema) { private static class LimitedLinkedHashMap extends LinkedHashMap { @Override protected boolean removeEldestEntry(Map.Entry eldest) { - boolean remove = size() > MAX_SIZE; + boolean remove = size() > MAX_SCHEMA_COMPARISON_RESULTS_TO_CACHE; if (remove) { LOG.warn( "Performance degraded as records with different schema is generated for the same table. " From 3e64f759784faddfe0e4b141a684942b790a56ab Mon Sep 17 00:00:00 2001 From: Max Michels Date: Tue, 3 Jun 2025 15:08:51 +0200 Subject: [PATCH 11/16] JavaDoc / newline --- .../org/apache/iceberg/flink/sink/dynamic/TableUpdater.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java index 22ed90fffdf6..49540f03db25 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java @@ -35,10 +35,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * Creates and maintains a view on Iceberg tables. Updates the Iceberg tables in case of schema, - * branch, or partition changes. - */ +/** Updates the Iceberg tables in case of schema, branch, or partition changes. */ @Internal class TableUpdater { @@ -79,6 +76,7 @@ private void findOrCreateTable(TableIdentifier identifier, Schema schema, Partit LOG.debug("Namespace {} created concurrently", identifier.namespace(), e); } } + LOG.info("Table {} not found during table search. Creating table.", identifier); try { Table table = catalog.createTable(identifier, schema, spec); From a0a67f61092c96f5d274c3ff6400ad3e3c9b7ce0 Mon Sep 17 00:00:00 2001 From: Max Michels Date: Tue, 3 Jun 2025 15:52:35 +0200 Subject: [PATCH 12/16] Rework branch creation and error handling --- .../flink/sink/dynamic/TableUpdater.java | 19 +++++++++++-------- .../flink/sink/dynamic/TestTableUpdater.java | 19 ++++++++++++++++++- 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java index 49540f03db25..99a93704be34 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java @@ -92,17 +92,20 @@ private void findOrCreateTable(TableIdentifier identifier, Schema schema, Partit private void findOrCreateBranch(TableIdentifier identifier, String branch) { String fromCache = cache.branch(identifier, branch); if (fromCache == null) { + Table table = catalog.loadTable(identifier); try { - // TODO: Which snapshot should be used to create the branch? - catalog.loadTable(identifier).manageSnapshots().createBranch(branch).commit(); + table.manageSnapshots().createBranch(branch).commit(); LOG.info("Branch {} for {} created", branch, identifier); - } catch (Exception e) { - LOG.info( - "Failed to create branch {} for {}. Maybe created concurrently?", - branch, - identifier, - e); + } catch (CommitFailedException e) { + table.refresh(); + if (table.refs().containsKey(branch)) { + LOG.debug("Branch {} concurrently created for {}.", branch, identifier); + } else { + LOG.error("Failed to create branch {} for {}.", branch, identifier, e); + throw e; + } } + cache.update(identifier, table); } } diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java index c7e07693e093..006be1b8bcc2 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java @@ -42,6 +42,23 @@ public class TestTableUpdater extends TestFlinkIcebergSinkBase { Types.NestedField.optional(2, "data", Types.StringType.get()), Types.NestedField.optional(3, "extra", Types.StringType.get())); + + @Test + void testBranchCreationAndCaching() { + Catalog catalog = CATALOG_EXTENSION.catalog(); + TableIdentifier tableIdentifier = TableIdentifier.parse("myTable"); + TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE); + TableUpdater tableUpdater = new TableUpdater(cache, catalog); + + catalog.createTable(tableIdentifier, SCHEMA); + tableUpdater.update(tableIdentifier, "myBranch", SCHEMA, PartitionSpec.unpartitioned()); + TableMetadataCache.CacheItem cacheItem = cache.getInternalCache().getIfPresent(tableIdentifier); + assertThat(cacheItem).isNotNull(); + + tableUpdater.update(tableIdentifier, "myBranch", SCHEMA, PartitionSpec.unpartitioned()); + assertThat(cache.getInternalCache().getIfPresent(tableIdentifier)).isEqualTo(cacheItem); + } + @Test void testInvalidateOldCacheEntryOnUpdate() { Catalog catalog = CATALOG_EXTENSION.catalog(); @@ -103,7 +120,7 @@ void testTableAlreadyExists() { catalog.createTable(tableIdentifier, SCHEMA); // Make sure that the cache is invalidated and the table refreshed without an error Tuple3 result = - tableUpdater.update(tableIdentifier, null, SCHEMA, PartitionSpec.unpartitioned()); + tableUpdater.update(tableIdentifier, "main", SCHEMA, PartitionSpec.unpartitioned()); assertThat(result.f0.sameSchema(SCHEMA)).isTrue(); assertThat(result.f1).isEqualTo(CompareSchemasVisitor.Result.SAME); assertThat(result.f2).isEqualTo(PartitionSpec.unpartitioned()); From 3229d05e2abb5b6c774001050b3ceab1ed43306d Mon Sep 17 00:00:00 2001 From: Max Michels Date: Tue, 3 Jun 2025 16:56:54 +0200 Subject: [PATCH 13/16] Reuse duplicated code --- .../iceberg/flink/sink/dynamic/TableMetadataCache.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java index 3375d4ac6f78..e9c77ea8c809 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java @@ -170,10 +170,7 @@ private PartitionSpec spec(TableIdentifier identifier, PartitionSpec spec, boole private Tuple2 refreshTable(TableIdentifier identifier) { try { Table table = catalog.loadTable(identifier); - cache.put( - identifier, - new CacheItem( - true, table.refs().keySet(), new SchemaInfo(table.schemas()), table.specs())); + update(identifier, table); return EXISTS; } catch (NoSuchTableException e) { LOG.debug("Table doesn't exist {}", identifier, e); From dd8d47717b38fa0b8531aee747aaa2a983a856a9 Mon Sep 17 00:00:00 2001 From: Max Michels Date: Tue, 3 Jun 2025 16:58:00 +0200 Subject: [PATCH 14/16] Revise logging and cache invalidation --- .../iceberg/flink/sink/dynamic/TableUpdater.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java index 99a93704be34..0edd2d6ec8cd 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java @@ -131,21 +131,22 @@ private Tuple2 findOrCreateSchema( try { updateApi.commit(); - cache.invalidate(identifier); + cache.update(identifier, table); Tuple2 comparisonAfterMigration = cache.schema(identifier, schema); Schema newSchema = comparisonAfterMigration.f0; LOG.info("Table {} schema updated from {} to {}", identifier, tableSchema, newSchema); return comparisonAfterMigration; } catch (CommitFailedException e) { - LOG.info( - "Schema update failed for {} from {} to {}", identifier, tableSchema, schema, e); + cache.invalidate(identifier); Tuple2 newSchema = cache.schema(identifier, schema); if (newSchema.f1 != CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED) { - LOG.info("Table {} schema updated concurrently to {}", identifier, schema); + LOG.debug("Table {} schema updated concurrently to {}", identifier, schema); return newSchema; } else { + LOG.error( + "Schema update failed for {} from {} to {}", identifier, tableSchema, schema, e); throw e; } } @@ -182,6 +183,7 @@ private PartitionSpec findOrCreateSpec(TableIdentifier identifier, PartitionSpec try { updater.commit(); + cache.update(identifier, table); } catch (CommitFailedException e) { cache.invalidate(identifier); PartitionSpec newSpec = cache.spec(identifier, targetSpec); @@ -199,8 +201,6 @@ private PartitionSpec findOrCreateSpec(TableIdentifier identifier, PartitionSpec throw e; } } - - cache.invalidate(identifier); return cache.spec(identifier, targetSpec); } } From bdd4b21e12e6012944606837e058f9b5875000e3 Mon Sep 17 00:00:00 2001 From: Max Michels Date: Tue, 3 Jun 2025 16:58:12 +0200 Subject: [PATCH 15/16] Add more tests --- .../flink/sink/dynamic/TestTableUpdater.java | 70 ++++++++++++++----- 1 file changed, 51 insertions(+), 19 deletions(-) diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java index 006be1b8bcc2..f7a3596462f1 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java @@ -20,9 +20,11 @@ import static org.assertj.core.api.Assertions.assertThat; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase; @@ -42,6 +44,39 @@ public class TestTableUpdater extends TestFlinkIcebergSinkBase { Types.NestedField.optional(2, "data", Types.StringType.get()), Types.NestedField.optional(3, "extra", Types.StringType.get())); + @Test + void testTableCreation() { + Catalog catalog = CATALOG_EXTENSION.catalog(); + TableIdentifier tableIdentifier = TableIdentifier.parse("myTable"); + TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE); + TableUpdater tableUpdater = new TableUpdater(cache, catalog); + + tableUpdater.update(tableIdentifier, "main", SCHEMA, PartitionSpec.unpartitioned()); + assertThat(catalog.tableExists(tableIdentifier)).isTrue(); + + Tuple2 cachedSchema = + cache.schema(tableIdentifier, SCHEMA); + assertThat(cachedSchema.f0.sameSchema(SCHEMA)).isTrue(); + } + + @Test + void testTableAlreadyExists() { + Catalog catalog = CATALOG_EXTENSION.catalog(); + TableIdentifier tableIdentifier = TableIdentifier.parse("myTable"); + TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE); + TableUpdater tableUpdater = new TableUpdater(cache, catalog); + + // Make the table non-existent in cache + cache.exists(tableIdentifier); + // Create the table + catalog.createTable(tableIdentifier, SCHEMA); + // Make sure that the cache is invalidated and the table refreshed without an error + Tuple3 result = + tableUpdater.update(tableIdentifier, "main", SCHEMA, PartitionSpec.unpartitioned()); + assertThat(result.f0.sameSchema(SCHEMA)).isTrue(); + assertThat(result.f1).isEqualTo(CompareSchemasVisitor.Result.SAME); + assertThat(result.f2).isEqualTo(PartitionSpec.unpartitioned()); + } @Test void testBranchCreationAndCaching() { @@ -59,6 +94,22 @@ void testBranchCreationAndCaching() { assertThat(cache.getInternalCache().getIfPresent(tableIdentifier)).isEqualTo(cacheItem); } + @Test + void testSpecCreation() { + Catalog catalog = CATALOG_EXTENSION.catalog(); + TableIdentifier tableIdentifier = TableIdentifier.parse("myTable"); + TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE); + TableUpdater tableUpdater = new TableUpdater(cache, catalog); + + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("data", 10).build(); + Tuple3 result = + tableUpdater.update(tableIdentifier, "main", SCHEMA, spec); + + Table table = catalog.loadTable(tableIdentifier); + assertThat(table).isNotNull(); + assertThat(table.spec()).isEqualTo(spec); + } + @Test void testInvalidateOldCacheEntryOnUpdate() { Catalog catalog = CATALOG_EXTENSION.catalog(); @@ -106,23 +157,4 @@ void testLastResultInvalidation() { .getLastResult(SCHEMA2)) .isNull(); } - - @Test - void testTableAlreadyExists() { - Catalog catalog = CATALOG_EXTENSION.catalog(); - TableIdentifier tableIdentifier = TableIdentifier.parse("myTable"); - TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE); - TableUpdater tableUpdater = new TableUpdater(cache, catalog); - - // Make the table non-existent in cache - cache.exists(tableIdentifier); - // Create the table - catalog.createTable(tableIdentifier, SCHEMA); - // Make sure that the cache is invalidated and the table refreshed without an error - Tuple3 result = - tableUpdater.update(tableIdentifier, "main", SCHEMA, PartitionSpec.unpartitioned()); - assertThat(result.f0.sameSchema(SCHEMA)).isTrue(); - assertThat(result.f1).isEqualTo(CompareSchemasVisitor.Result.SAME); - assertThat(result.f2).isEqualTo(PartitionSpec.unpartitioned()); - } } From 2dfe5e3a3b843974a725495701dfa654283e0197 Mon Sep 17 00:00:00 2001 From: Max Michels Date: Tue, 3 Jun 2025 17:48:32 +0200 Subject: [PATCH 16/16] Address comments --- .../iceberg/flink/sink/dynamic/CompareSchemasVisitor.java | 4 ++-- .../org/apache/iceberg/flink/sink/dynamic/TableUpdater.java | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java index bb3a1490ffa2..bb0d32f8f644 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java @@ -32,8 +32,8 @@ * *
    *
  • SAME: The two schemas are semantically identical - *
  • DATA_ADAPTION_NEEDED: We can evolve the data associated with the input schema to match the - * table schema. + *
  • DATA_CONVERSION_NEEDED: We can evolve the data associated with the input schema to match + * the table schema. *
  • SCHEMA_UPDATE_NEEDED: We need to migrate the table schema to match the input schema. *
* diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java index 0edd2d6ec8cd..40bb66f65125 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java @@ -105,6 +105,7 @@ private void findOrCreateBranch(TableIdentifier identifier, String branch) { throw e; } } + cache.update(identifier, table); } }