From b7bf30ef136a5758f2da5ded3ebb8e612bca3ab5 Mon Sep 17 00:00:00 2001 From: linyanghao Date: Mon, 22 May 2023 14:47:34 +0800 Subject: [PATCH 01/15] update --- .../apache/iceberg/flink/FlinkCatalog.java | 138 +++++++++++-- .../apache/iceberg/flink/FlinkSchemaUtil.java | 10 + .../apache/iceberg/flink/FlinkTypeToType.java | 4 + .../flink/util/FlinkAlterTableUtil.java | 168 ++++++++++++++++ .../flink/util/FlinkCompatibilityUtil.java | 5 + .../iceberg/flink/TestFlinkCatalogTable.java | 182 ++++++++++++++++++ 6 files changed, 488 insertions(+), 19 deletions(-) create mode 100644 flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index 825816fdf416..33ff8497544a 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -18,6 +18,9 @@ */ package org.apache.iceberg.flink; +import static org.apache.iceberg.flink.util.FlinkAlterTableUtil.applyPropertyChanges; +import static org.apache.iceberg.flink.util.FlinkAlterTableUtil.applySchemaChanges; + import java.io.Closeable; import java.io.IOException; import java.util.Collections; @@ -38,6 +41,7 @@ import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.CatalogTableImpl; import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.TableChange; import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; @@ -62,6 +66,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.Transaction; import org.apache.iceberg.UpdateProperties; +import org.apache.iceberg.UpdateSchema; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; @@ -442,6 +447,10 @@ private static void validateTableSchemaAndPartition(CatalogTable ct1, CatalogTab throw new UnsupportedOperationException("Altering schema is not supported yet."); } + validateTablePartition(ct1, ct2); + } + + private static void validateTablePartition(CatalogTable ct1, CatalogTable ct2) { if (!ct1.getPartitionKeys().equals(ct2.getPartitionKeys())) { throw new UnsupportedOperationException("Altering partition keys is not supported yet."); } @@ -465,11 +474,13 @@ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean CatalogTable table = toCatalogTable(icebergTable); - // Currently, Flink SQL only support altering table properties. + // This alterTable API only supports altering table properties. - // For current Flink Catalog API, support for adding/removing/renaming columns cannot be done by + // Support for adding/removing/renaming columns cannot be done by // comparing // CatalogTable instances, unless the Flink schema contains Iceberg column IDs. + + // To alter columns, use the other alterTable API and provide a list of TableChange's. validateTableSchemaAndPartition(table, (CatalogTable) newTable); Map oldProperties = table.getOptions(); @@ -510,6 +521,61 @@ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean commitChanges(icebergTable, setLocation, setSnapshotId, pickSnapshotId, setProperties); } + @Override + public void alterTable( + ObjectPath tablePath, + CatalogBaseTable newTable, + List tableChanges, + boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + validateFlinkTable(newTable); + + Table icebergTable; + try { + icebergTable = loadIcebergTable(tablePath); + } catch (TableNotExistException e) { + if (!ignoreIfNotExists) { + throw e; + } else { + return; + } + } + + // Does not support altering partition yet. + validateTablePartition(toCatalogTable(icebergTable), (CatalogTable) newTable); + + String setLocation = null; + String setSnapshotId = null; + String pickSnapshotId = null; + + List propertyChanges = Lists.newArrayList(); + List schemaChanges = Lists.newArrayList(); + for (TableChange change : tableChanges) { + if (change instanceof TableChange.SetOption) { + TableChange.SetOption set = (TableChange.SetOption) change; + + if ("location".equalsIgnoreCase(set.getKey())) { + setLocation = set.getValue(); + } else if ("current-snapshot-id".equalsIgnoreCase(set.getKey())) { + setSnapshotId = set.getValue(); + } else if ("cherry-pick-snapshot-id".equalsIgnoreCase(set.getKey())) { + pickSnapshotId = set.getValue(); + } else { + propertyChanges.add(change); + } + + } else if (change instanceof TableChange.ResetOption) { + propertyChanges.add(change); + + } else { + schemaChanges.add(change); + } + } + + commitChanges( + icebergTable, setLocation, setSnapshotId, pickSnapshotId, schemaChanges, propertyChanges); + } + private static void validateFlinkTable(CatalogBaseTable table) { Preconditions.checkArgument( table instanceof CatalogTable, "The Table should be a CatalogTable."); @@ -558,23 +624,7 @@ private static void commitChanges( String setSnapshotId, String pickSnapshotId, Map setProperties) { - // don't allow setting the snapshot and picking a commit at the same time because order is - // ambiguous and choosing - // one order leads to different results - Preconditions.checkArgument( - setSnapshotId == null || pickSnapshotId == null, - "Cannot set the current snapshot ID and cherry-pick snapshot changes"); - - if (setSnapshotId != null) { - long newSnapshotId = Long.parseLong(setSnapshotId); - table.manageSnapshots().setCurrentSnapshot(newSnapshotId).commit(); - } - - // if updating the table snapshot, perform that update first in case it fails - if (pickSnapshotId != null) { - long newSnapshotId = Long.parseLong(pickSnapshotId); - table.manageSnapshots().cherrypick(newSnapshotId).commit(); - } + commitChanges(table, setSnapshotId, pickSnapshotId); Transaction transaction = table.newTransaction(); @@ -598,6 +648,56 @@ private static void commitChanges( transaction.commitTransaction(); } + private static void commitChanges( + Table table, + String setLocation, + String setSnapshotId, + String pickSnapshotId, + List schemaChanges, + List propertyChanges) { + commitChanges(table, setSnapshotId, pickSnapshotId); + + Transaction transaction = table.newTransaction(); + + if (setLocation != null) { + transaction.updateLocation().setLocation(setLocation).commit(); + } + + if (!schemaChanges.isEmpty()) { + UpdateSchema updateSchema = transaction.updateSchema(); + applySchemaChanges(updateSchema, schemaChanges); + updateSchema.commit(); + } + + if (!propertyChanges.isEmpty()) { + UpdateProperties updateProperties = transaction.updateProperties(); + applyPropertyChanges(updateProperties, propertyChanges); + updateProperties.commit(); + } + + transaction.commitTransaction(); + } + + private static void commitChanges(Table table, String setSnapshotId, String pickSnapshotId) { + // don't allow setting the snapshot and picking a commit at the same time because order is + // ambiguous and choosing + // one order leads to different results + Preconditions.checkArgument( + setSnapshotId == null || pickSnapshotId == null, + "Cannot set the current snapshot ID and cherry-pick snapshot changes"); + + if (setSnapshotId != null) { + long newSnapshotId = Long.parseLong(setSnapshotId); + table.manageSnapshots().setCurrentSnapshot(newSnapshotId).commit(); + } + + // if updating the table snapshot, perform that update first in case it fails + if (pickSnapshotId != null) { + long newSnapshotId = Long.parseLong(pickSnapshotId); + table.manageSnapshots().cherrypick(newSnapshotId).commit(); + } + } + static CatalogTable toCatalogTable(Table table) { TableSchema schema = FlinkSchemaUtil.toSchema(table.schema()); List partitionKeys = toPartitionKeys(table.spec(), table.schema()); diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java index 25725639c330..a6b53879ad80 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java @@ -134,6 +134,16 @@ public static LogicalType convert(Type type) { return TypeUtil.visit(type, new TypeToFlinkType()); } + /** + * Convert a {@link LogicalType Flink type} to a {@link Type}. + * + * @param flinkType a FlinkType + * @return the equivalent Iceberg type + */ + public static Type convert(LogicalType flinkType) { + return flinkType.accept(new FlinkTypeToType()); + } + /** * Convert a {@link RowType} to a {@link TableSchema}. * diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkTypeToType.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkTypeToType.java index 6f8bfef2ef44..408065f06057 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkTypeToType.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkTypeToType.java @@ -49,6 +49,10 @@ class FlinkTypeToType extends FlinkTypeVisitor { private final RowType root; private int nextId; + FlinkTypeToType() { + this.root = null; + } + FlinkTypeToType(RowType root) { this.root = root; // the root struct's fields use the first ids diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java new file mode 100644 index 000000000000..3170bababc9e --- /dev/null +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.util; + +import java.util.List; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.catalog.UniqueConstraint; +import org.apache.iceberg.UpdateProperties; +import org.apache.iceberg.UpdateSchema; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Type; + +public class FlinkAlterTableUtil { + /** + * Applies a list of Flink table changes to an {@link UpdateSchema} operation. + * + * @param pendingUpdate an uncommitted UpdateSchema operation to configure + * @param schemaChanges a list of Flink table changes + */ + public static void applySchemaChanges( + UpdateSchema pendingUpdate, List schemaChanges) { + for (TableChange change : schemaChanges) { + if (change instanceof TableChange.AddColumn) { + TableChange.AddColumn addColumn = (TableChange.AddColumn) change; + Column flinkColumn = addColumn.getColumn(); + Preconditions.checkArgument( + FlinkCompatibilityUtil.isPhysicalColumn(flinkColumn), + "Adding computed columns is not supported yet: %s", + flinkColumn.getName()); + Type icebergType = FlinkSchemaUtil.convert(flinkColumn.getDataType().getLogicalType()); + pendingUpdate.addColumn(flinkColumn.getName(), icebergType); + + } else if (change instanceof TableChange.ModifyColumn) { + TableChange.ModifyColumn modifyColumn = (TableChange.ModifyColumn) change; + applyModifyColumn(pendingUpdate, modifyColumn); + + } else if (change instanceof TableChange.DropColumn) { + TableChange.DropColumn dropColumn = (TableChange.DropColumn) change; + pendingUpdate.deleteColumn(dropColumn.getColumnName()); + + } else if (change instanceof TableChange.AddWatermark) { + throw new UnsupportedOperationException("Adding watermark specs is not supported yet. "); + + } else if (change instanceof TableChange.ModifyWatermark) { + throw new UnsupportedOperationException("Modifying watermark specs is not supported yet. "); + + } else if (change instanceof TableChange.DropWatermark) { + throw new UnsupportedOperationException("Watermark specs is not supported yet. "); + + } else if (change instanceof TableChange.AddUniqueConstraint) { + TableChange.AddUniqueConstraint addPk = (TableChange.AddUniqueConstraint) change; + applyUniqueConstraint(pendingUpdate, addPk.getConstraint()); + + } else if (change instanceof TableChange.ModifyUniqueConstraint) { + TableChange.ModifyUniqueConstraint modifyPk = (TableChange.ModifyUniqueConstraint) change; + applyUniqueConstraint(pendingUpdate, modifyPk.getNewConstraint()); + + } else if (change instanceof TableChange.DropConstraint) { + throw new UnsupportedOperationException("Dropping constraints is not supported yet. "); + + } else { + throw new UnsupportedOperationException("Cannot apply unknown table change: " + change); + } + } + } + + /** + * Applies a list of Flink table changes to an {@link UpdateProperties} operation. + * + * @param pendingUpdate an uncommitted UpdateProperty operation to configure + * @param propertyChanges a list of Flink table changes + */ + public static void applyPropertyChanges( + UpdateProperties pendingUpdate, List propertyChanges) { + for (TableChange change : propertyChanges) { + if (change instanceof TableChange.SetOption) { + TableChange.SetOption setOption = (TableChange.SetOption) change; + pendingUpdate.set(setOption.getKey(), setOption.getValue()); + + } else if (change instanceof TableChange.ResetOption) { + TableChange.ResetOption resetOption = (TableChange.ResetOption) change; + pendingUpdate.remove(resetOption.getKey()); + + } else { + throw new UnsupportedOperationException("Cannot apply unknown table change: " + change); + } + } + } + + private static void applyModifyColumn( + UpdateSchema pendingUpdate, TableChange.ModifyColumn modifyColumn) { + if (modifyColumn instanceof TableChange.ModifyColumnName) { + TableChange.ModifyColumnName modifyName = (TableChange.ModifyColumnName) modifyColumn; + pendingUpdate.renameColumn(modifyName.getOldColumnName(), modifyName.getNewColumnName()); + + } else if (modifyColumn instanceof TableChange.ModifyColumnPosition) { + TableChange.ModifyColumnPosition modifyPosition = + (TableChange.ModifyColumnPosition) modifyColumn; + applyModifyColumnPosition(pendingUpdate, modifyPosition); + + } else if (modifyColumn instanceof TableChange.ModifyPhysicalColumnType) { + TableChange.ModifyPhysicalColumnType modifyType = + (TableChange.ModifyPhysicalColumnType) modifyColumn; + Type type = FlinkSchemaUtil.convert(modifyType.getNewType().getLogicalType()); + pendingUpdate.updateColumn(modifyType.getOldColumn().getName(), type.asPrimitiveType()); + + } else if (modifyColumn instanceof TableChange.ModifyColumnComment) { + TableChange.ModifyColumnComment modifyComment = + (TableChange.ModifyColumnComment) modifyColumn; + pendingUpdate.updateColumnDoc( + modifyComment.getOldColumn().getName(), modifyComment.getNewComment()); + + } else { + throw new UnsupportedOperationException("Cannot apply unknown table change: " + modifyColumn); + } + } + + private static void applyModifyColumnPosition( + UpdateSchema pendingUpdate, TableChange.ModifyColumnPosition modifyColumnPosition) { + TableChange.ColumnPosition newPosition = modifyColumnPosition.getNewPosition(); + if (newPosition instanceof TableChange.First) { + pendingUpdate.moveFirst(modifyColumnPosition.getOldColumn().getName()); + + } else if (newPosition instanceof TableChange.After) { + TableChange.After after = (TableChange.After) newPosition; + pendingUpdate.moveAfter(modifyColumnPosition.getOldColumn().getName(), after.column()); + + } else { + throw new UnsupportedOperationException( + "Cannot apply unknown table change: " + modifyColumnPosition); + } + } + + private static void applyUniqueConstraint( + UpdateSchema pendingUpdate, UniqueConstraint constraint) { + switch (constraint.getType()) { + case PRIMARY_KEY: + pendingUpdate.setIdentifierFields(constraint.getColumns()); + break; + + case UNIQUE_KEY: + throw new UnsupportedOperationException( + "Setting unique key constraint is not supported yet."); + + default: + throw new UnsupportedOperationException( + "Cannot apply unknown unique constraint: " + constraint.getType().name()); + } + } +} diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkCompatibilityUtil.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkCompatibilityUtil.java index 2c5c587f4ebf..f02af894e82b 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkCompatibilityUtil.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkCompatibilityUtil.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.api.TableColumn; +import org.apache.flink.table.catalog.Column; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.logical.RowType; @@ -39,4 +40,8 @@ public static TypeInformation toTypeInfo(RowType rowType) { public static boolean isPhysicalColumn(TableColumn column) { return column.isPhysical(); } + + public static boolean isPhysicalColumn(Column column) { + return column.isPhysical(); + } } diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java index 16dcf4a9f4ce..9ae95f21cd70 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java @@ -28,6 +28,7 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.constraints.UniqueConstraint; @@ -348,6 +349,187 @@ public void testAlterTableWithPrimaryKey() throws TableNotExistException { assertThat(table("tl").properties()).containsAllEntriesOf(properties); } + @Test + public void testAlterTableProperties() throws TableNotExistException { + sql("CREATE TABLE tl(id BIGINT) WITH ('oldK'='oldV')"); + Map properties = Maps.newHashMap(); + properties.put("oldK", "oldV"); + + // new + sql("ALTER TABLE tl SET('newK'='newV')"); + properties.put("newK", "newV"); + Assert.assertEquals(properties, table("tl").properties()); + + // update old + sql("ALTER TABLE tl SET('oldK'='oldV2')"); + properties.put("oldK", "oldV2"); + Assert.assertEquals(properties, table("tl").properties()); + + // remove property + sql("ALTER TABLE tl RESET('oldK')"); + properties.remove("oldK"); + Assert.assertEquals(properties, table("tl").properties()); + } + + @Test + public void testAlterTableAddColumn() { + sql("CREATE TABLE tl(id BIGINT)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), + schemaBefore.asStruct()); + + sql("ALTER TABLE tl ADD (dt STRING)"); + Schema schemaAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaAfter.asStruct()); + } + + @Test + public void testAlterTableDropColumn() { + sql("CREATE TABLE tl(id BIGINT, dt STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + + sql("ALTER TABLE tl DROP (dt)"); + Schema schemaAfter = table("tl").schema(); + Assert.assertEquals( + new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), + schemaAfter.asStruct()); + } + + @Test + public void testAlterTableModifyColumnName() { + sql("CREATE TABLE tl(id BIGINT, dt STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + + sql("ALTER TABLE tl RENAME dt TO data"); + Schema schemaAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get())) + .asStruct(), + schemaAfter.asStruct()); + } + + @Test + public void testAlterTableModifyColumnType() { + sql("CREATE TABLE tl(id INTEGER, dt STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + + sql("ALTER TABLE tl MODIFY (id BIGINT)"); + Schema schemaAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaAfter.asStruct()); + } + + @Test + public void testAlterTableModifyColumnPosition() { + sql("CREATE TABLE tl(id BIGINT, dt STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + + sql("ALTER TABLE tl MODIFY (dt STRING FIRST)"); + Schema schemaAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(2, "dt", Types.StringType.get()), + Types.NestedField.optional(1, "id", Types.LongType.get())) + .asStruct(), + schemaAfter.asStruct()); + + sql("ALTER TABLE tl MODIFY (dt STRING AFTER id)"); + Schema schemaAfterAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaAfterAfter.asStruct()); + } + + @Test + public void testAlterTableModifyColumnComment() { + sql("CREATE TABLE tl(id BIGINT, dt STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + + sql("ALTER TABLE tl MODIFY (dt STRING COMMENT 'some data')"); + Schema schemaAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get(), "some data")) + .asStruct(), + schemaAfter.asStruct()); + } + + @Test + public void testAlterTableConstraint() { + sql("CREATE TABLE tl(id BIGINT NOT NULL, dt STRING NOT NULL)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.required(2, "dt", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + Assert.assertEquals(ImmutableSet.of(), schemaBefore.identifierFieldNames()); + + sql("ALTER TABLE tl ADD (PRIMARY KEY (id) NOT ENFORCED)"); + Schema schemaAfterAdd = table("tl").schema(); + Assert.assertEquals(ImmutableSet.of("id"), schemaAfterAdd.identifierFieldNames()); + + sql("ALTER TABLE tl MODIFY (PRIMARY KEY (dt) NOT ENFORCED)"); + Schema schemaAfterModify = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.required(2, "dt", Types.StringType.get())) + .asStruct(), + schemaAfterModify.asStruct()); + Assert.assertEquals(ImmutableSet.of("dt"), schemaAfterModify.identifierFieldNames()); + + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl DROP PRIMARY KEY")) + .isInstanceOf(TableException.class); + } + @Test public void testRelocateTable() { Assume.assumeFalse("HadoopCatalog does not support relocate table", isHadoopCatalog); From 831b707fcc3ab8eaecb8f05f06bf8b93a0d6823c Mon Sep 17 00:00:00 2001 From: linyanghao Date: Mon, 22 May 2023 15:05:59 +0800 Subject: [PATCH 02/15] checkstyle --- .../main/java/org/apache/iceberg/flink/FlinkCatalog.java | 8 +++----- .../apache/iceberg/flink/util/FlinkAlterTableUtil.java | 2 ++ 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index 33ff8497544a..1361611190be 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -18,9 +18,6 @@ */ package org.apache.iceberg.flink; -import static org.apache.iceberg.flink.util.FlinkAlterTableUtil.applyPropertyChanges; -import static org.apache.iceberg.flink.util.FlinkAlterTableUtil.applySchemaChanges; - import java.io.Closeable; import java.io.IOException; import java.util.Collections; @@ -74,6 +71,7 @@ import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.flink.util.FlinkAlterTableUtil; import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -665,13 +663,13 @@ private static void commitChanges( if (!schemaChanges.isEmpty()) { UpdateSchema updateSchema = transaction.updateSchema(); - applySchemaChanges(updateSchema, schemaChanges); + FlinkAlterTableUtil.applySchemaChanges(updateSchema, schemaChanges); updateSchema.commit(); } if (!propertyChanges.isEmpty()) { UpdateProperties updateProperties = transaction.updateProperties(); - applyPropertyChanges(updateProperties, propertyChanges); + FlinkAlterTableUtil.applyPropertyChanges(updateProperties, propertyChanges); updateProperties.commit(); } diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java index 3170bababc9e..c538b2b09197 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java @@ -29,6 +29,8 @@ import org.apache.iceberg.types.Type; public class FlinkAlterTableUtil { + private FlinkAlterTableUtil() {} + /** * Applies a list of Flink table changes to an {@link UpdateSchema} operation. * From 3b8dcdaca95cd5a1d96fc50ffcf256d1a2a6a34c Mon Sep 17 00:00:00 2001 From: linyanghao Date: Wed, 24 May 2023 23:24:39 +0800 Subject: [PATCH 03/15] log deprecation --- .../main/java/org/apache/iceberg/flink/FlinkCatalog.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index 1361611190be..47aea8a531c2 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -81,6 +81,8 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A Flink Catalog implementation that wraps an Iceberg {@link Catalog}. @@ -95,6 +97,7 @@ */ public class FlinkCatalog extends AbstractCatalog { + private static final Logger LOG = LoggerFactory.getLogger(FlinkCatalog.class); private final CatalogLoader catalogLoader; private final Catalog icebergCatalog; private final Namespace baseNamespace; @@ -479,6 +482,9 @@ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean // CatalogTable instances, unless the Flink schema contains Iceberg column IDs. // To alter columns, use the other alterTable API and provide a list of TableChange's. + LOG.warn( + "This alterTable API only supports altering table properties. " + + "To alter columns, use the other alterTable API and provide a list of TableChange's."); validateTableSchemaAndPartition(table, (CatalogTable) newTable); Map oldProperties = table.getOptions(); From 60fff6f721d7204973cf59d2ac30fcf47fba8060 Mon Sep 17 00:00:00 2001 From: linyanghao Date: Wed, 31 May 2023 14:19:05 +0800 Subject: [PATCH 04/15] fix: modify nullability --- .../flink/util/FlinkAlterTableUtil.java | 6 +++++ .../iceberg/flink/TestFlinkCatalogTable.java | 26 +++++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java index c538b2b09197..6813c29a0b40 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java @@ -122,7 +122,13 @@ private static void applyModifyColumn( TableChange.ModifyPhysicalColumnType modifyType = (TableChange.ModifyPhysicalColumnType) modifyColumn; Type type = FlinkSchemaUtil.convert(modifyType.getNewType().getLogicalType()); + String columnName = modifyType.getOldColumn().getName(); pendingUpdate.updateColumn(modifyType.getOldColumn().getName(), type.asPrimitiveType()); + if (modifyType.getNewColumn().getDataType().getLogicalType().isNullable()) { + pendingUpdate.makeColumnOptional(columnName); + } else { + pendingUpdate.requireColumn(columnName); + } } else if (modifyColumn instanceof TableChange.ModifyColumnComment) { TableChange.ModifyColumnComment modifyComment = diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java index 9ae95f21cd70..f581c711e5c7 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java @@ -449,6 +449,32 @@ public void testAlterTableModifyColumnType() { schemaAfter.asStruct()); } + @Test + public void testAlterTableModifyColumnNullability() { + sql("CREATE TABLE tl(id INTEGER NOT NULL, dt STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + + // Cannot change nullability from optional to required + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl MODIFY (dt STRING NOT NULL)")) + .isInstanceOf(TableException.class); + + // Set nullability from required to optional + sql("ALTER TABLE tl MODIFY (id INTEGER)"); + Schema schemaAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaAfter.asStruct()); + } + @Test public void testAlterTableModifyColumnPosition() { sql("CREATE TABLE tl(id BIGINT, dt STRING)"); From 14f57981306cb2a813c85ba19dfb8eeeedfca759 Mon Sep 17 00:00:00 2001 From: linyanghao Date: Wed, 31 May 2023 16:37:16 +0800 Subject: [PATCH 05/15] spotless apply --- .../iceberg/flink/TestFlinkCatalogTable.java | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java index f581c711e5c7..546d7b5eea71 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java @@ -454,25 +454,25 @@ public void testAlterTableModifyColumnNullability() { sql("CREATE TABLE tl(id INTEGER NOT NULL, dt STRING)"); Schema schemaBefore = table("tl").schema(); Assert.assertEquals( - new Schema( - Types.NestedField.required(1, "id", Types.IntegerType.get()), - Types.NestedField.optional(2, "dt", Types.StringType.get())) - .asStruct(), - schemaBefore.asStruct()); + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); // Cannot change nullability from optional to required Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl MODIFY (dt STRING NOT NULL)")) - .isInstanceOf(TableException.class); + .isInstanceOf(TableException.class); // Set nullability from required to optional sql("ALTER TABLE tl MODIFY (id INTEGER)"); Schema schemaAfter = table("tl").schema(); Assert.assertEquals( - new Schema( - Types.NestedField.optional(1, "id", Types.IntegerType.get()), - Types.NestedField.optional(2, "dt", Types.StringType.get())) - .asStruct(), - schemaAfter.asStruct()); + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaAfter.asStruct()); } @Test From 2a23fc33211a8bdfafced18b05517e728c6712d7 Mon Sep 17 00:00:00 2001 From: linyanghao Date: Wed, 31 May 2023 17:51:28 +0800 Subject: [PATCH 06/15] style --- .../java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java index 6813c29a0b40..35c2c8f2ce31 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java @@ -123,7 +123,7 @@ private static void applyModifyColumn( (TableChange.ModifyPhysicalColumnType) modifyColumn; Type type = FlinkSchemaUtil.convert(modifyType.getNewType().getLogicalType()); String columnName = modifyType.getOldColumn().getName(); - pendingUpdate.updateColumn(modifyType.getOldColumn().getName(), type.asPrimitiveType()); + pendingUpdate.updateColumn(columnName, type.asPrimitiveType()); if (modifyType.getNewColumn().getDataType().getLogicalType().isNullable()) { pendingUpdate.makeColumnOptional(columnName); } else { From 8b6ae2af8fd599a74cd9fe84f62c80362a6ca6e1 Mon Sep 17 00:00:00 2001 From: linyanghao Date: Sun, 23 Jul 2023 23:41:43 +0800 Subject: [PATCH 07/15] tests --- .../iceberg/flink/TestFlinkCatalogTable.java | 64 +++---------------- 1 file changed, 10 insertions(+), 54 deletions(-) diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java index 546d7b5eea71..ed41b4decd61 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java @@ -298,7 +298,7 @@ public void testLoadTransformPartitionTable() throws TableNotExistException { } @Test - public void testAlterTable() throws TableNotExistException { + public void testAlterTableProperties() throws TableNotExistException { sql("CREATE TABLE tl(id BIGINT) WITH ('oldK'='oldV')"); Map properties = Maps.newHashMap(); properties.put("oldK", "oldV"); @@ -313,62 +313,10 @@ public void testAlterTable() throws TableNotExistException { properties.put("oldK", "oldV2"); assertThat(table("tl").properties()).containsAllEntriesOf(properties); - // remove property - CatalogTable catalogTable = catalogTable("tl"); - properties.remove("oldK"); - getTableEnv() - .getCatalog(getTableEnv().getCurrentCatalog()) - .get() - .alterTable(new ObjectPath(DATABASE, "tl"), catalogTable.copy(properties), false); - assertThat(table("tl").properties()).containsAllEntriesOf(properties); - } - - @Test - public void testAlterTableWithPrimaryKey() throws TableNotExistException { - sql("CREATE TABLE tl(id BIGINT, PRIMARY KEY(id) NOT ENFORCED) WITH ('oldK'='oldV')"); - Map properties = Maps.newHashMap(); - properties.put("oldK", "oldV"); - - // new - sql("ALTER TABLE tl SET('newK'='newV')"); - properties.put("newK", "newV"); - assertThat(table("tl").properties()).containsAllEntriesOf(properties); - - // update old - sql("ALTER TABLE tl SET('oldK'='oldV2')"); - properties.put("oldK", "oldV2"); - assertThat(table("tl").properties()).containsAllEntriesOf(properties); - - // remove property - CatalogTable catalogTable = catalogTable("tl"); - properties.remove("oldK"); - getTableEnv() - .getCatalog(getTableEnv().getCurrentCatalog()) - .get() - .alterTable(new ObjectPath(DATABASE, "tl"), catalogTable.copy(properties), false); - assertThat(table("tl").properties()).containsAllEntriesOf(properties); - } - - @Test - public void testAlterTableProperties() throws TableNotExistException { - sql("CREATE TABLE tl(id BIGINT) WITH ('oldK'='oldV')"); - Map properties = Maps.newHashMap(); - properties.put("oldK", "oldV"); - - // new - sql("ALTER TABLE tl SET('newK'='newV')"); - properties.put("newK", "newV"); - Assert.assertEquals(properties, table("tl").properties()); - - // update old - sql("ALTER TABLE tl SET('oldK'='oldV2')"); - properties.put("oldK", "oldV2"); - Assert.assertEquals(properties, table("tl").properties()); - // remove property sql("ALTER TABLE tl RESET('oldK')"); properties.remove("oldK"); - Assert.assertEquals(properties, table("tl").properties()); + assertThat(table("tl").properties()).containsAllEntriesOf(properties); } @Test @@ -387,6 +335,10 @@ public void testAlterTableAddColumn() { Types.NestedField.optional(2, "dt", Types.StringType.get())) .asStruct(), schemaAfter.asStruct()); + + // Try adding an existing field + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl ADD (id STRING)")) + .isInstanceOf(ValidationException.class); } @Test @@ -405,6 +357,10 @@ public void testAlterTableDropColumn() { Assert.assertEquals( new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), schemaAfter.asStruct()); + + // Try adding an existing field + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl DROP (foo)")) + .isInstanceOf(ValidationException.class); } @Test From 551064aa30362f3711973eb81ed5db6fae2150d5 Mon Sep 17 00:00:00 2001 From: linyanghao Date: Sun, 23 Jul 2023 23:42:35 +0800 Subject: [PATCH 08/15] refactors and styles --- .../apache/iceberg/flink/FlinkCatalog.java | 121 ++++-------------- .../flink/util/FlinkAlterTableUtil.java | 115 +++++++++++++---- 2 files changed, 117 insertions(+), 119 deletions(-) diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index 47aea8a531c2..84dc9eed2c29 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -61,9 +61,6 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; -import org.apache.iceberg.Transaction; -import org.apache.iceberg.UpdateProperties; -import org.apache.iceberg.UpdateSchema; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; @@ -457,6 +454,21 @@ private static void validateTablePartition(CatalogTable ct1, CatalogTable ct2) { } } + /** + * This alterTable API only supports altering table properties. + * + *

Support for adding/removing/renaming columns cannot be done by comparing CatalogTable + * instances, unless the Flink schema contains Iceberg column IDs. + * + *

To alter columns, use the other alterTable API and provide a list of TableChange's. + * + * @param tablePath path of the table or view to be modified + * @param newTable the new table definition + * @param ignoreIfNotExists flag to specify behavior when the table or view does not exist: if set + * to false, throw an exception, if set to true, do nothing. + * @throws CatalogException + * @throws TableNotExistException + */ @Override public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) throws CatalogException, TableNotExistException { @@ -475,13 +487,6 @@ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean CatalogTable table = toCatalogTable(icebergTable); - // This alterTable API only supports altering table properties. - - // Support for adding/removing/renaming columns cannot be done by - // comparing - // CatalogTable instances, unless the Flink schema contains Iceberg column IDs. - - // To alter columns, use the other alterTable API and provide a list of TableChange's. LOG.warn( "This alterTable API only supports altering table properties. " + "To alter columns, use the other alterTable API and provide a list of TableChange's."); @@ -522,7 +527,8 @@ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean } }); - commitChanges(icebergTable, setLocation, setSnapshotId, pickSnapshotId, setProperties); + FlinkAlterTableUtil.commitChanges( + icebergTable, setLocation, setSnapshotId, pickSnapshotId, setProperties); } @Override @@ -550,7 +556,7 @@ public void alterTable( String setLocation = null; String setSnapshotId = null; - String pickSnapshotId = null; + String cherrypickSnapshotId = null; List propertyChanges = Lists.newArrayList(); List schemaChanges = Lists.newArrayList(); @@ -563,7 +569,7 @@ public void alterTable( } else if ("current-snapshot-id".equalsIgnoreCase(set.getKey())) { setSnapshotId = set.getValue(); } else if ("cherry-pick-snapshot-id".equalsIgnoreCase(set.getKey())) { - pickSnapshotId = set.getValue(); + cherrypickSnapshotId = set.getValue(); } else { propertyChanges.add(change); } @@ -576,8 +582,13 @@ public void alterTable( } } - commitChanges( - icebergTable, setLocation, setSnapshotId, pickSnapshotId, schemaChanges, propertyChanges); + FlinkAlterTableUtil.commitChanges( + icebergTable, + setLocation, + setSnapshotId, + cherrypickSnapshotId, + schemaChanges, + propertyChanges); } private static void validateFlinkTable(CatalogBaseTable table) { @@ -622,86 +633,6 @@ private static List toPartitionKeys(PartitionSpec spec, Schema icebergSc return partitionKeysBuilder.build(); } - private static void commitChanges( - Table table, - String setLocation, - String setSnapshotId, - String pickSnapshotId, - Map setProperties) { - commitChanges(table, setSnapshotId, pickSnapshotId); - - Transaction transaction = table.newTransaction(); - - if (setLocation != null) { - transaction.updateLocation().setLocation(setLocation).commit(); - } - - if (!setProperties.isEmpty()) { - UpdateProperties updateProperties = transaction.updateProperties(); - setProperties.forEach( - (k, v) -> { - if (v == null) { - updateProperties.remove(k); - } else { - updateProperties.set(k, v); - } - }); - updateProperties.commit(); - } - - transaction.commitTransaction(); - } - - private static void commitChanges( - Table table, - String setLocation, - String setSnapshotId, - String pickSnapshotId, - List schemaChanges, - List propertyChanges) { - commitChanges(table, setSnapshotId, pickSnapshotId); - - Transaction transaction = table.newTransaction(); - - if (setLocation != null) { - transaction.updateLocation().setLocation(setLocation).commit(); - } - - if (!schemaChanges.isEmpty()) { - UpdateSchema updateSchema = transaction.updateSchema(); - FlinkAlterTableUtil.applySchemaChanges(updateSchema, schemaChanges); - updateSchema.commit(); - } - - if (!propertyChanges.isEmpty()) { - UpdateProperties updateProperties = transaction.updateProperties(); - FlinkAlterTableUtil.applyPropertyChanges(updateProperties, propertyChanges); - updateProperties.commit(); - } - - transaction.commitTransaction(); - } - - private static void commitChanges(Table table, String setSnapshotId, String pickSnapshotId) { - // don't allow setting the snapshot and picking a commit at the same time because order is - // ambiguous and choosing - // one order leads to different results - Preconditions.checkArgument( - setSnapshotId == null || pickSnapshotId == null, - "Cannot set the current snapshot ID and cherry-pick snapshot changes"); - - if (setSnapshotId != null) { - long newSnapshotId = Long.parseLong(setSnapshotId); - table.manageSnapshots().setCurrentSnapshot(newSnapshotId).commit(); - } - - // if updating the table snapshot, perform that update first in case it fails - if (pickSnapshotId != null) { - long newSnapshotId = Long.parseLong(pickSnapshotId); - table.manageSnapshots().cherrypick(newSnapshotId).commit(); - } - } - static CatalogTable toCatalogTable(Table table) { TableSchema schema = FlinkSchemaUtil.toSchema(table.schema()); List partitionKeys = toPartitionKeys(table.spec(), table.schema()); diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java index 35c2c8f2ce31..0868e2050dc1 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java @@ -19,9 +19,12 @@ package org.apache.iceberg.flink.util; import java.util.List; +import java.util.Map; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.TableChange; import org.apache.flink.table.catalog.UniqueConstraint; +import org.apache.iceberg.Table; +import org.apache.iceberg.Transaction; import org.apache.iceberg.UpdateProperties; import org.apache.iceberg.UpdateSchema; import org.apache.iceberg.flink.FlinkSchemaUtil; @@ -31,6 +34,87 @@ public class FlinkAlterTableUtil { private FlinkAlterTableUtil() {} + public static void commitChanges( + Table table, + String setLocation, + String setSnapshotId, + String pickSnapshotId, + Map setProperties) { + commitManageSnapshots(table, setSnapshotId, pickSnapshotId); + + Transaction transaction = table.newTransaction(); + + if (setLocation != null) { + transaction.updateLocation().setLocation(setLocation).commit(); + } + + if (!setProperties.isEmpty()) { + UpdateProperties updateProperties = transaction.updateProperties(); + setProperties.forEach( + (k, v) -> { + if (v == null) { + updateProperties.remove(k); + } else { + updateProperties.set(k, v); + } + }); + updateProperties.commit(); + } + + transaction.commitTransaction(); + } + + public static void commitChanges( + Table table, + String setLocation, + String setSnapshotId, + String pickSnapshotId, + List schemaChanges, + List propertyChanges) { + commitManageSnapshots(table, setSnapshotId, pickSnapshotId); + + Transaction transaction = table.newTransaction(); + + if (setLocation != null) { + transaction.updateLocation().setLocation(setLocation).commit(); + } + + if (!schemaChanges.isEmpty()) { + UpdateSchema updateSchema = transaction.updateSchema(); + FlinkAlterTableUtil.applySchemaChanges(updateSchema, schemaChanges); + updateSchema.commit(); + } + + if (!propertyChanges.isEmpty()) { + UpdateProperties updateProperties = transaction.updateProperties(); + FlinkAlterTableUtil.applyPropertyChanges(updateProperties, propertyChanges); + updateProperties.commit(); + } + + transaction.commitTransaction(); + } + + public static void commitManageSnapshots( + Table table, String setSnapshotId, String cherrypickSnapshotId) { + // don't allow setting the snapshot and picking a commit at the same time because order is + // ambiguous and choosing + // one order leads to different results + Preconditions.checkArgument( + setSnapshotId == null || cherrypickSnapshotId == null, + "Cannot set the current snapshot ID and cherry-pick snapshot changes"); + + if (setSnapshotId != null) { + long newSnapshotId = Long.parseLong(setSnapshotId); + table.manageSnapshots().setCurrentSnapshot(newSnapshotId).commit(); + } + + // if updating the table snapshot, perform that update first in case it fails + if (cherrypickSnapshotId != null) { + long newSnapshotId = Long.parseLong(cherrypickSnapshotId); + table.manageSnapshots().cherrypick(newSnapshotId).commit(); + } + } + /** * Applies a list of Flink table changes to an {@link UpdateSchema} operation. * @@ -49,35 +133,26 @@ public static void applySchemaChanges( flinkColumn.getName()); Type icebergType = FlinkSchemaUtil.convert(flinkColumn.getDataType().getLogicalType()); pendingUpdate.addColumn(flinkColumn.getName(), icebergType); - } else if (change instanceof TableChange.ModifyColumn) { TableChange.ModifyColumn modifyColumn = (TableChange.ModifyColumn) change; applyModifyColumn(pendingUpdate, modifyColumn); - } else if (change instanceof TableChange.DropColumn) { TableChange.DropColumn dropColumn = (TableChange.DropColumn) change; pendingUpdate.deleteColumn(dropColumn.getColumnName()); - } else if (change instanceof TableChange.AddWatermark) { throw new UnsupportedOperationException("Adding watermark specs is not supported yet. "); - } else if (change instanceof TableChange.ModifyWatermark) { throw new UnsupportedOperationException("Modifying watermark specs is not supported yet. "); - } else if (change instanceof TableChange.DropWatermark) { - throw new UnsupportedOperationException("Watermark specs is not supported yet. "); - + throw new UnsupportedOperationException("Dropping watermark specs is not supported yet. "); } else if (change instanceof TableChange.AddUniqueConstraint) { TableChange.AddUniqueConstraint addPk = (TableChange.AddUniqueConstraint) change; applyUniqueConstraint(pendingUpdate, addPk.getConstraint()); - } else if (change instanceof TableChange.ModifyUniqueConstraint) { TableChange.ModifyUniqueConstraint modifyPk = (TableChange.ModifyUniqueConstraint) change; applyUniqueConstraint(pendingUpdate, modifyPk.getNewConstraint()); - } else if (change instanceof TableChange.DropConstraint) { throw new UnsupportedOperationException("Dropping constraints is not supported yet. "); - } else { throw new UnsupportedOperationException("Cannot apply unknown table change: " + change); } @@ -85,7 +160,7 @@ public static void applySchemaChanges( } /** - * Applies a list of Flink table changes to an {@link UpdateProperties} operation. + * Applies a list of Flink table property changes to an {@link UpdateProperties} operation. * * @param pendingUpdate an uncommitted UpdateProperty operation to configure * @param propertyChanges a list of Flink table changes @@ -96,13 +171,12 @@ public static void applyPropertyChanges( if (change instanceof TableChange.SetOption) { TableChange.SetOption setOption = (TableChange.SetOption) change; pendingUpdate.set(setOption.getKey(), setOption.getValue()); - } else if (change instanceof TableChange.ResetOption) { TableChange.ResetOption resetOption = (TableChange.ResetOption) change; pendingUpdate.remove(resetOption.getKey()); - } else { - throw new UnsupportedOperationException("Cannot apply unknown table change: " + change); + throw new UnsupportedOperationException( + "The given table change is not a property change: " + change); } } } @@ -112,12 +186,10 @@ private static void applyModifyColumn( if (modifyColumn instanceof TableChange.ModifyColumnName) { TableChange.ModifyColumnName modifyName = (TableChange.ModifyColumnName) modifyColumn; pendingUpdate.renameColumn(modifyName.getOldColumnName(), modifyName.getNewColumnName()); - } else if (modifyColumn instanceof TableChange.ModifyColumnPosition) { TableChange.ModifyColumnPosition modifyPosition = (TableChange.ModifyColumnPosition) modifyColumn; applyModifyColumnPosition(pendingUpdate, modifyPosition); - } else if (modifyColumn instanceof TableChange.ModifyPhysicalColumnType) { TableChange.ModifyPhysicalColumnType modifyType = (TableChange.ModifyPhysicalColumnType) modifyColumn; @@ -129,15 +201,14 @@ private static void applyModifyColumn( } else { pendingUpdate.requireColumn(columnName); } - } else if (modifyColumn instanceof TableChange.ModifyColumnComment) { TableChange.ModifyColumnComment modifyComment = (TableChange.ModifyColumnComment) modifyColumn; pendingUpdate.updateColumnDoc( modifyComment.getOldColumn().getName(), modifyComment.getNewComment()); - } else { - throw new UnsupportedOperationException("Cannot apply unknown table change: " + modifyColumn); + throw new UnsupportedOperationException( + "Cannot apply unknown modify-column change: " + modifyColumn); } } @@ -146,14 +217,12 @@ private static void applyModifyColumnPosition( TableChange.ColumnPosition newPosition = modifyColumnPosition.getNewPosition(); if (newPosition instanceof TableChange.First) { pendingUpdate.moveFirst(modifyColumnPosition.getOldColumn().getName()); - } else if (newPosition instanceof TableChange.After) { TableChange.After after = (TableChange.After) newPosition; pendingUpdate.moveAfter(modifyColumnPosition.getOldColumn().getName(), after.column()); - } else { throw new UnsupportedOperationException( - "Cannot apply unknown table change: " + modifyColumnPosition); + "Cannot apply unknown modify-column-position change: " + modifyColumnPosition); } } @@ -163,11 +232,9 @@ private static void applyUniqueConstraint( case PRIMARY_KEY: pendingUpdate.setIdentifierFields(constraint.getColumns()); break; - case UNIQUE_KEY: throw new UnsupportedOperationException( "Setting unique key constraint is not supported yet."); - default: throw new UnsupportedOperationException( "Cannot apply unknown unique constraint: " + constraint.getType().name()); From 7585fa453887c8a87fc083aa815ba74f079a24a1 Mon Sep 17 00:00:00 2001 From: linyanghao Date: Mon, 24 Jul 2023 02:34:54 +0800 Subject: [PATCH 09/15] tests and fixes --- .../flink/util/FlinkAlterTableUtil.java | 6 +- .../iceberg/flink/TestFlinkCatalogTable.java | 85 ++++++++++++++++--- 2 files changed, 79 insertions(+), 12 deletions(-) diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java index 0868e2050dc1..477638f8bf07 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java @@ -132,7 +132,11 @@ public static void applySchemaChanges( "Adding computed columns is not supported yet: %s", flinkColumn.getName()); Type icebergType = FlinkSchemaUtil.convert(flinkColumn.getDataType().getLogicalType()); - pendingUpdate.addColumn(flinkColumn.getName(), icebergType); + if (flinkColumn.getDataType().getLogicalType().isNullable()) { + pendingUpdate.addColumn(flinkColumn.getName(), icebergType); + } else { + pendingUpdate.addRequiredColumn(flinkColumn.getName(), icebergType); + } } else if (change instanceof TableChange.ModifyColumn) { TableChange.ModifyColumn modifyColumn = (TableChange.ModifyColumn) change; applyModifyColumn(pendingUpdate, modifyColumn); diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java index ed41b4decd61..1d82aa8cd2d5 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java @@ -328,39 +328,72 @@ public void testAlterTableAddColumn() { schemaBefore.asStruct()); sql("ALTER TABLE tl ADD (dt STRING)"); - Schema schemaAfter = table("tl").schema(); + Schema schemaAfter1 = table("tl").schema(); Assert.assertEquals( new Schema( Types.NestedField.optional(1, "id", Types.LongType.get()), Types.NestedField.optional(2, "dt", Types.StringType.get())) .asStruct(), - schemaAfter.asStruct()); + schemaAfter1.asStruct()); - // Try adding an existing field + // Add multiple columns + sql("ALTER TABLE tl ADD (col1 STRING, col2 BIGINT)"); + Schema schemaAfter2 = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get()), + Types.NestedField.optional(4, "col2", Types.LongType.get())) + .asStruct(), + schemaAfter2.asStruct()); + + // Adding a required field should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl ADD (pk STRING NOT NULL)")) + .isInstanceOf(TableException.class); + + // Adding an existing field should fail Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl ADD (id STRING)")) .isInstanceOf(ValidationException.class); } @Test public void testAlterTableDropColumn() { - sql("CREATE TABLE tl(id BIGINT, dt STRING)"); + sql("CREATE TABLE tl(id BIGINT, dt STRING, col1 STRING, col2 BIGINT)"); Schema schemaBefore = table("tl").schema(); Assert.assertEquals( new Schema( Types.NestedField.optional(1, "id", Types.LongType.get()), - Types.NestedField.optional(2, "dt", Types.StringType.get())) + Types.NestedField.optional(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get()), + Types.NestedField.optional(4, "col2", Types.LongType.get())) .asStruct(), schemaBefore.asStruct()); sql("ALTER TABLE tl DROP (dt)"); - Schema schemaAfter = table("tl").schema(); + Schema schemaAfter1 = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get()), + Types.NestedField.optional(4, "col2", Types.LongType.get())) + .asStruct(), + schemaAfter1.asStruct()); + + // Drop multiple columns + sql("ALTER TABLE tl DROP (col1, col2)"); + Schema schemaAfter2 = table("tl").schema(); Assert.assertEquals( new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), - schemaAfter.asStruct()); + schemaAfter2.asStruct()); - // Try adding an existing field + // Dropping an non-existing field should fail Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl DROP (foo)")) .isInstanceOf(ValidationException.class); + + // Dropping an already-deleted field should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl DROP (dt)")) + .isInstanceOf(ValidationException.class); } @Test @@ -395,6 +428,7 @@ public void testAlterTableModifyColumnType() { .asStruct(), schemaBefore.asStruct()); + // Promote type from Integer to Long sql("ALTER TABLE tl MODIFY (id BIGINT)"); Schema schemaAfter = table("tl").schema(); Assert.assertEquals( @@ -403,6 +437,10 @@ public void testAlterTableModifyColumnType() { Types.NestedField.optional(2, "dt", Types.StringType.get())) .asStruct(), schemaAfter.asStruct()); + + // Type change that doesn't follow the type-promotion rule should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl MODIFY (dt INTEGER)")) + .isInstanceOf(TableException.class); } @Test @@ -484,12 +522,13 @@ public void testAlterTableModifyColumnComment() { @Test public void testAlterTableConstraint() { - sql("CREATE TABLE tl(id BIGINT NOT NULL, dt STRING NOT NULL)"); + sql("CREATE TABLE tl(id BIGINT NOT NULL, dt STRING NOT NULL, col1 STRING)"); Schema schemaBefore = table("tl").schema(); Assert.assertEquals( new Schema( Types.NestedField.required(1, "id", Types.LongType.get()), - Types.NestedField.required(2, "dt", Types.StringType.get())) + Types.NestedField.required(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get())) .asStruct(), schemaBefore.asStruct()); Assert.assertEquals(ImmutableSet.of(), schemaBefore.identifierFieldNames()); @@ -503,11 +542,35 @@ public void testAlterTableConstraint() { Assert.assertEquals( new Schema( Types.NestedField.required(1, "id", Types.LongType.get()), - Types.NestedField.required(2, "dt", Types.StringType.get())) + Types.NestedField.required(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get())) .asStruct(), schemaAfterModify.asStruct()); Assert.assertEquals(ImmutableSet.of("dt"), schemaAfterModify.identifierFieldNames()); + // Composite primary key + sql("ALTER TABLE tl MODIFY (PRIMARY KEY (id, dt) NOT ENFORCED)"); + Schema schemaAfterComposite = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.required(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get())) + .asStruct(), + schemaAfterComposite.asStruct()); + Assert.assertEquals(ImmutableSet.of("id", "dt"), schemaAfterComposite.identifierFieldNames()); + + // Setting an optional field as primary key should fail + Assertions.assertThatThrownBy( + () -> sql("ALTER TABLE tl MODIFY (PRIMARY KEY (col1) NOT ENFORCED)")) + .isInstanceOf(TableException.class); + + // Setting a composite key containing an optional field should fail + Assertions.assertThatThrownBy( + () -> sql("ALTER TABLE tl MODIFY (PRIMARY KEY (id, col1) NOT ENFORCED)")) + .isInstanceOf(TableException.class); + + // Dropping constraints is not supported yet Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl DROP PRIMARY KEY")) .isInstanceOf(TableException.class); } From 70d5c7b9b2a9896bd527a33440827b1c8a42d19c Mon Sep 17 00:00:00 2001 From: linyanghao Date: Mon, 24 Jul 2023 11:31:01 +0800 Subject: [PATCH 10/15] fix doc --- .../src/main/java/org/apache/iceberg/flink/FlinkCatalog.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index 84dc9eed2c29..8d07ec78ea36 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -466,8 +466,8 @@ private static void validateTablePartition(CatalogTable ct1, CatalogTable ct2) { * @param newTable the new table definition * @param ignoreIfNotExists flag to specify behavior when the table or view does not exist: if set * to false, throw an exception, if set to true, do nothing. - * @throws CatalogException - * @throws TableNotExistException + * @throws CatalogException in case of any runtime exception + * @throws TableNotExistException if the table does not exist */ @Override public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) From adfa4e46e5bc354dd7e4b3a616fec698b5b30be7 Mon Sep 17 00:00:00 2001 From: linyanghao Date: Tue, 12 Sep 2023 16:29:21 +0800 Subject: [PATCH 11/15] style --- .../src/main/java/org/apache/iceberg/flink/FlinkCatalog.java | 2 -- .../org/apache/iceberg/flink/util/FlinkAlterTableUtil.java | 3 +-- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index 8d07ec78ea36..4c5b5f58642f 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -573,10 +573,8 @@ public void alterTable( } else { propertyChanges.add(change); } - } else if (change instanceof TableChange.ResetOption) { propertyChanges.add(change); - } else { schemaChanges.add(change); } diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java index 477638f8bf07..efea4e1bf350 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java @@ -97,8 +97,7 @@ public static void commitChanges( public static void commitManageSnapshots( Table table, String setSnapshotId, String cherrypickSnapshotId) { // don't allow setting the snapshot and picking a commit at the same time because order is - // ambiguous and choosing - // one order leads to different results + // ambiguous and choosing one order leads to different results Preconditions.checkArgument( setSnapshotId == null || cherrypickSnapshotId == null, "Cannot set the current snapshot ID and cherry-pick snapshot changes"); From d22b117eb40c36ff66e0fd1af81b65a235a47cc9 Mon Sep 17 00:00:00 2001 From: linyanghao Date: Tue, 12 Sep 2023 16:53:23 +0800 Subject: [PATCH 12/15] better assertions & add tests --- .../iceberg/flink/TestFlinkCatalogTable.java | 66 +++++++++++++------ 1 file changed, 47 insertions(+), 19 deletions(-) diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java index 1d82aa8cd2d5..290333c7e236 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java @@ -348,13 +348,15 @@ public void testAlterTableAddColumn() { .asStruct(), schemaAfter2.asStruct()); - // Adding a required field should fail + // Adding a required field should fail due to Iceberg's validation. Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl ADD (pk STRING NOT NULL)")) - .isInstanceOf(TableException.class); + .hasRootCauseInstanceOf(IllegalArgumentException.class) + .hasRootCauseMessage("Incompatible change: cannot add required column: pk"); - // Adding an existing field should fail + // Adding an existing field should fail due to Flink's internal validation. Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl ADD (id STRING)")) - .isInstanceOf(ValidationException.class); + .isInstanceOf(ValidationException.class) + .hasMessageContaining("Try to add a column `id` which already exists in the table."); } @Test @@ -387,13 +389,15 @@ public void testAlterTableDropColumn() { new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), schemaAfter2.asStruct()); - // Dropping an non-existing field should fail + // Dropping an non-existing field should fail due to Flink's internal validation. Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl DROP (foo)")) - .isInstanceOf(ValidationException.class); + .isInstanceOf(ValidationException.class) + .hasMessageContaining("The column `foo` does not exist in the base table."); - // Dropping an already-deleted field should fail + // Dropping an already-deleted field should fail due to Flink's internal validation. Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl DROP (dt)")) - .isInstanceOf(ValidationException.class); + .isInstanceOf(ValidationException.class) + .hasMessageContaining("The column `dt` does not exist in the base table."); } @Test @@ -438,9 +442,12 @@ public void testAlterTableModifyColumnType() { .asStruct(), schemaAfter.asStruct()); - // Type change that doesn't follow the type-promotion rule should fail + // Type change that doesn't follow the type-promotion rule should fail due to Iceberg's + // validation. Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl MODIFY (dt INTEGER)")) - .isInstanceOf(TableException.class); + .isInstanceOf(TableException.class) + .hasRootCauseInstanceOf(IllegalArgumentException.class) + .hasRootCauseMessage("Cannot change column type: dt: string -> int"); } @Test @@ -454,9 +461,11 @@ public void testAlterTableModifyColumnNullability() { .asStruct(), schemaBefore.asStruct()); - // Cannot change nullability from optional to required + // Changing nullability from optional to required should fail due to Iceberg's validation. Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl MODIFY (dt STRING NOT NULL)")) - .isInstanceOf(TableException.class); + .isInstanceOf(TableException.class) + .hasRootCauseInstanceOf(IllegalArgumentException.class) + .hasRootCauseMessage("Cannot change column nullability: dt: optional -> required"); // Set nullability from required to optional sql("ALTER TABLE tl MODIFY (id INTEGER)"); @@ -497,6 +506,19 @@ public void testAlterTableModifyColumnPosition() { Types.NestedField.optional(2, "dt", Types.StringType.get())) .asStruct(), schemaAfterAfter.asStruct()); + + // Modifying the position of a non-existing column should fail due to Flink's internal + // validation. + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl MODIFY (non_existing STRING FIRST)")) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "Try to modify a column `non_existing` which does not exist in the table."); + + // Moving a column after a non-existing column should fail due to Flink's internal validation. + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl MODIFY (dt STRING AFTER non_existing)")) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "Referenced column `non_existing` by 'AFTER' does not exist in the table."); } @Test @@ -510,12 +532,12 @@ public void testAlterTableModifyColumnComment() { .asStruct(), schemaBefore.asStruct()); - sql("ALTER TABLE tl MODIFY (dt STRING COMMENT 'some data')"); + sql("ALTER TABLE tl MODIFY (dt STRING COMMENT 'comment for dt field')"); Schema schemaAfter = table("tl").schema(); Assert.assertEquals( new Schema( Types.NestedField.optional(1, "id", Types.LongType.get()), - Types.NestedField.optional(2, "dt", Types.StringType.get(), "some data")) + Types.NestedField.optional(2, "dt", Types.StringType.get(), "comment for dt field")) .asStruct(), schemaAfter.asStruct()); } @@ -560,19 +582,25 @@ public void testAlterTableConstraint() { schemaAfterComposite.asStruct()); Assert.assertEquals(ImmutableSet.of("id", "dt"), schemaAfterComposite.identifierFieldNames()); - // Setting an optional field as primary key should fail + // Setting an optional field as primary key should fail due to Iceberg's validation. Assertions.assertThatThrownBy( () -> sql("ALTER TABLE tl MODIFY (PRIMARY KEY (col1) NOT ENFORCED)")) - .isInstanceOf(TableException.class); + .isInstanceOf(TableException.class) + .hasRootCauseInstanceOf(IllegalArgumentException.class) + .hasRootCauseMessage("Cannot add field col1 as an identifier field: not a required field"); - // Setting a composite key containing an optional field should fail + // Setting a composite key containing an optional field should fail due to Iceberg's validation. Assertions.assertThatThrownBy( () -> sql("ALTER TABLE tl MODIFY (PRIMARY KEY (id, col1) NOT ENFORCED)")) - .isInstanceOf(TableException.class); + .isInstanceOf(TableException.class) + .hasRootCauseInstanceOf(IllegalArgumentException.class) + .hasRootCauseMessage("Cannot add field col1 as an identifier field: not a required field"); // Dropping constraints is not supported yet Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl DROP PRIMARY KEY")) - .isInstanceOf(TableException.class); + .isInstanceOf(TableException.class) + .hasRootCauseInstanceOf(UnsupportedOperationException.class) + .hasRootCauseMessage("Dropping constraints is not supported yet. "); } @Test From dcc71094714458d21689eead430d1c6bbd07c71d Mon Sep 17 00:00:00 2001 From: linyanghao Date: Tue, 12 Sep 2023 17:46:49 +0800 Subject: [PATCH 13/15] move deprecation message --- .../main/java/org/apache/iceberg/flink/FlinkCatalog.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index 4c5b5f58642f..7e9baad9b647 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -442,7 +442,9 @@ private static void validateTableSchemaAndPartition(CatalogTable ct1, CatalogTab if (!(Objects.equals(ts1.getTableColumns(), ts2.getTableColumns()) && Objects.equals(ts1.getWatermarkSpecs(), ts2.getWatermarkSpecs()) && equalsPrimary)) { - throw new UnsupportedOperationException("Altering schema is not supported yet."); + throw new UnsupportedOperationException( + "Altering schema is not supported in the old alterTable API. " + + "To alter schema, use the other alterTable API and provide a list of TableChange's."); } validateTablePartition(ct1, ct2); @@ -486,10 +488,6 @@ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean } CatalogTable table = toCatalogTable(icebergTable); - - LOG.warn( - "This alterTable API only supports altering table properties. " - + "To alter columns, use the other alterTable API and provide a list of TableChange's."); validateTableSchemaAndPartition(table, (CatalogTable) newTable); Map oldProperties = table.getOptions(); From bbbe086001f56522d457a2c39894c798a4e4f334 Mon Sep 17 00:00:00 2001 From: linyanghao Date: Tue, 12 Sep 2023 17:51:15 +0800 Subject: [PATCH 14/15] remove unused logger --- .../src/main/java/org/apache/iceberg/flink/FlinkCatalog.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index 7e9baad9b647..f022c8abcb00 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -78,8 +78,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * A Flink Catalog implementation that wraps an Iceberg {@link Catalog}. @@ -93,8 +91,6 @@ * independent of the partition of Flink. */ public class FlinkCatalog extends AbstractCatalog { - - private static final Logger LOG = LoggerFactory.getLogger(FlinkCatalog.class); private final CatalogLoader catalogLoader; private final Catalog icebergCatalog; private final Namespace baseNamespace; From bac300dd78b79807da1643d811a85f382e9bc700 Mon Sep 17 00:00:00 2001 From: linyanghao Date: Tue, 26 Sep 2023 17:23:08 +0800 Subject: [PATCH 15/15] style --- .../iceberg/flink/util/FlinkAlterTableUtil.java | 12 ++++++------ .../iceberg/flink/TestFlinkCatalogTable.java | 14 +++++++++----- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java index efea4e1bf350..f0b9bf64fb1a 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java @@ -128,7 +128,7 @@ public static void applySchemaChanges( Column flinkColumn = addColumn.getColumn(); Preconditions.checkArgument( FlinkCompatibilityUtil.isPhysicalColumn(flinkColumn), - "Adding computed columns is not supported yet: %s", + "Unsupported table change: Adding computed column %s.", flinkColumn.getName()); Type icebergType = FlinkSchemaUtil.convert(flinkColumn.getDataType().getLogicalType()); if (flinkColumn.getDataType().getLogicalType().isNullable()) { @@ -143,11 +143,11 @@ public static void applySchemaChanges( TableChange.DropColumn dropColumn = (TableChange.DropColumn) change; pendingUpdate.deleteColumn(dropColumn.getColumnName()); } else if (change instanceof TableChange.AddWatermark) { - throw new UnsupportedOperationException("Adding watermark specs is not supported yet. "); + throw new UnsupportedOperationException("Unsupported table change: AddWatermark."); } else if (change instanceof TableChange.ModifyWatermark) { - throw new UnsupportedOperationException("Modifying watermark specs is not supported yet. "); + throw new UnsupportedOperationException("Unsupported table change: ModifyWatermark."); } else if (change instanceof TableChange.DropWatermark) { - throw new UnsupportedOperationException("Dropping watermark specs is not supported yet. "); + throw new UnsupportedOperationException("Unsupported table change: DropWatermark."); } else if (change instanceof TableChange.AddUniqueConstraint) { TableChange.AddUniqueConstraint addPk = (TableChange.AddUniqueConstraint) change; applyUniqueConstraint(pendingUpdate, addPk.getConstraint()); @@ -155,7 +155,7 @@ public static void applySchemaChanges( TableChange.ModifyUniqueConstraint modifyPk = (TableChange.ModifyUniqueConstraint) change; applyUniqueConstraint(pendingUpdate, modifyPk.getNewConstraint()); } else if (change instanceof TableChange.DropConstraint) { - throw new UnsupportedOperationException("Dropping constraints is not supported yet. "); + throw new UnsupportedOperationException("Unsupported table change: DropConstraint."); } else { throw new UnsupportedOperationException("Cannot apply unknown table change: " + change); } @@ -237,7 +237,7 @@ private static void applyUniqueConstraint( break; case UNIQUE_KEY: throw new UnsupportedOperationException( - "Setting unique key constraint is not supported yet."); + "Unsupported table change: setting unique key constraints."); default: throw new UnsupportedOperationException( "Cannot apply unknown unique constraint: " + constraint.getType().name()); diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java index 290333c7e236..8f5ddde91851 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java @@ -348,7 +348,8 @@ public void testAlterTableAddColumn() { .asStruct(), schemaAfter2.asStruct()); - // Adding a required field should fail due to Iceberg's validation. + // Adding a required field should fail because Iceberg's SchemaUpdate does not allow + // incompatible changes. Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl ADD (pk STRING NOT NULL)")) .hasRootCauseInstanceOf(IllegalArgumentException.class) .hasRootCauseMessage("Incompatible change: cannot add required column: pk"); @@ -461,7 +462,8 @@ public void testAlterTableModifyColumnNullability() { .asStruct(), schemaBefore.asStruct()); - // Changing nullability from optional to required should fail due to Iceberg's validation. + // Changing nullability from optional to required should fail + // because Iceberg's SchemaUpdate does not allow incompatible changes. Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl MODIFY (dt STRING NOT NULL)")) .isInstanceOf(TableException.class) .hasRootCauseInstanceOf(IllegalArgumentException.class) @@ -582,14 +584,16 @@ public void testAlterTableConstraint() { schemaAfterComposite.asStruct()); Assert.assertEquals(ImmutableSet.of("id", "dt"), schemaAfterComposite.identifierFieldNames()); - // Setting an optional field as primary key should fail due to Iceberg's validation. + // Setting an optional field as primary key should fail + // because Iceberg's SchemaUpdate does not allow incompatible changes. Assertions.assertThatThrownBy( () -> sql("ALTER TABLE tl MODIFY (PRIMARY KEY (col1) NOT ENFORCED)")) .isInstanceOf(TableException.class) .hasRootCauseInstanceOf(IllegalArgumentException.class) .hasRootCauseMessage("Cannot add field col1 as an identifier field: not a required field"); - // Setting a composite key containing an optional field should fail due to Iceberg's validation. + // Setting a composite key containing an optional field should fail + // because Iceberg's SchemaUpdate does not allow incompatible changes. Assertions.assertThatThrownBy( () -> sql("ALTER TABLE tl MODIFY (PRIMARY KEY (id, col1) NOT ENFORCED)")) .isInstanceOf(TableException.class) @@ -600,7 +604,7 @@ public void testAlterTableConstraint() { Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl DROP PRIMARY KEY")) .isInstanceOf(TableException.class) .hasRootCauseInstanceOf(UnsupportedOperationException.class) - .hasRootCauseMessage("Dropping constraints is not supported yet. "); + .hasRootCauseMessage("Unsupported table change: DropConstraint."); } @Test