diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java new file mode 100644 index 000000000000..bb0d32f8f644 --- /dev/null +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.Schema; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.schema.SchemaWithPartnerVisitor; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +/** + * Visitor class which compares an input schema to a table schema and emits a compatibility {@link + * Result}. + * + *
We support: + * + *
Changes are accumulated to evolve the existingSchema into a targetSchema.
+ *
+ * @param api an UpdateSchema for adding changes
+ * @param existingSchema an existing schema
+ * @param targetSchema a new schema to compare with the existing
+ */
+ public static void visit(UpdateSchema api, Schema existingSchema, Schema targetSchema) {
+ visit(
+ targetSchema,
+ -1,
+ new EvolveSchemaVisitor(api, existingSchema, targetSchema),
+ new CompareSchemasVisitor.PartnerIdByNameAccessors(existingSchema));
+ }
+
+ @Override
+ public Boolean struct(Types.StructType struct, Integer partnerId, List We support:
+ *
+ * Changes are accumulated to evolve the existingSchema into a targetSchema.
+ *
+ * @param api an UpdateSchema for adding changes
+ * @param existingSchema an existing schema
+ * @param targetSchema a new schema to compare with the existing
+ */
+ public static void visit(UpdateSchema api, Schema existingSchema, Schema targetSchema) {
+ visit(
+ targetSchema,
+ -1,
+ new EvolveSchemaVisitor(api, existingSchema, targetSchema),
+ new CompareSchemasVisitor.PartnerIdByNameAccessors(existingSchema));
+ }
+
+ @Override
+ public Boolean struct(Types.StructType struct, Integer partnerId, List
+ *
+ *
+ * The input schema fields are compared to the table schema via their names.
+ */
+public class CompareSchemasVisitor
+ extends SchemaWithPartnerVisitor
+ *
+ *
+ * We don't support:
+ *
+ *
+ *
+ *
+ * The reason is that dropping columns would create issues with late / out of order data. Once we
+ * drop fields, we wouldn't be able to easily add them back later without losing the associated
+ * data. Renaming columns is not supported because we compare schemas by name, which doesn't allow
+ * for renaming without additional hints.
+ */
+public class EvolveSchemaVisitor extends SchemaWithPartnerVisitor