From 639555c5d7a8fb0af47b247356ee851fa3fe1a02 Mon Sep 17 00:00:00 2001 From: Zsolt Miskolczi Date: Thu, 19 Jan 2023 21:09:11 +0100 Subject: [PATCH 01/12] Update iceberg version --- iceberg/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iceberg/pom.xml b/iceberg/pom.xml index 348bf5b64f4d..f6426e605989 100644 --- a/iceberg/pom.xml +++ b/iceberg/pom.xml @@ -25,7 +25,7 @@ .. . - 1.0.0 + 1.1.0 4.0.2 3.4.4 1.9.2 From 994de25e08a1240f57fefe9238e3e7db1319c7d1 Mon Sep 17 00:00:00 2001 From: Zsolt Miskolczi Date: Thu, 19 Jan 2023 21:35:21 +0100 Subject: [PATCH 02/12] Port: Core: Use CharSequenceSet instead of Set (apache#2712) --- .../test/java/org/apache/iceberg/data/DeleteReadTests.java | 5 +++-- .../src/test/java/org/apache/iceberg/data/FileHelpers.java | 6 +++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/data/DeleteReadTests.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/data/DeleteReadTests.java index ae42382f9723..d7f8fe84b527 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/data/DeleteReadTests.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/data/DeleteReadTests.java @@ -33,6 +33,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.ArrayUtil; +import org.apache.iceberg.util.CharSequenceSet; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.StructLikeSet; import org.apache.iceberg.util.StructProjection; @@ -193,7 +194,7 @@ public void testPositionDeletes() throws IOException { Pair.of(dataFile.path(), 6L) // id = 122 ); - Pair> posDeletes = FileHelpers.writeDeleteFile( + Pair posDeletes = FileHelpers.writeDeleteFile( table, Files.localOutput(temp.newFile()), Row.of(0), deletes); table.newRowDelta() @@ -225,7 +226,7 @@ public void testMixedPositionAndEqualityDeletes() throws IOException { Pair.of(dataFile.path(), 5L) // id = 121 ); - Pair> posDeletes = FileHelpers.writeDeleteFile( + Pair posDeletes = FileHelpers.writeDeleteFile( table, Files.localOutput(temp.newFile()), Row.of(0), deletes); table.newRowDelta() diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/data/FileHelpers.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/data/FileHelpers.java index 4b82fc234c5e..5b58c8d027c2 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/data/FileHelpers.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/data/FileHelpers.java @@ -22,7 +22,6 @@ import java.io.Closeable; import java.io.IOException; import java.util.List; -import java.util.Set; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.DeleteFile; @@ -38,19 +37,20 @@ import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.CharSequenceSet; import org.apache.iceberg.util.Pair; public class FileHelpers { private FileHelpers() { } - public static Pair> writeDeleteFile(Table table, OutputFile out, + public static Pair writeDeleteFile(Table table, OutputFile out, List> deletes) throws IOException { return writeDeleteFile(table, out, null, deletes); } - public static Pair> writeDeleteFile(Table table, OutputFile out, StructLike partition, + public static Pair writeDeleteFile(Table table, OutputFile out, StructLike partition, List> deletes) throws IOException { PositionDeleteWriter writer = Parquet.writeDeletes(out) From b5606cce944f18e8675d3d2924f59bc6e2c1fd3c Mon Sep 17 00:00:00 2001 From: openinx Date: Mon, 15 Nov 2021 02:43:46 +0800 Subject: [PATCH 03/12] Port: Flink: Add SupportsRowPosition to Avro reader to fix position deletes --- .../org/apache/iceberg/data/FileHelpers.java | 71 ++++++++++--------- 1 file changed, 38 insertions(+), 33 deletions(-) diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/data/FileHelpers.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/data/FileHelpers.java index 5b58c8d027c2..dfcb75b04b01 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/data/FileHelpers.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/data/FileHelpers.java @@ -22,45 +22,47 @@ import java.io.Closeable; import java.io.IOException; import java.util.List; +import java.util.Locale; +import java.util.Map; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; -import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.OutputFile; -import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.CharSequenceSet; import org.apache.iceberg.util.Pair; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; + public class FileHelpers { private FileHelpers() { } public static Pair writeDeleteFile(Table table, OutputFile out, - List> deletes) + List> deletes) throws IOException { return writeDeleteFile(table, out, null, deletes); } public static Pair writeDeleteFile(Table table, OutputFile out, StructLike partition, - List> deletes) + List> deletes) throws IOException { - PositionDeleteWriter writer = Parquet.writeDeletes(out) - .withSpec(table.spec()) - .setAll(table.properties()) - .metricsConfig(MetricsConfig.forTable(table)) - .withPartition(partition) - .overwrite() - .buildPositionWriter(); + FileFormat format = defaultFormat(table.properties()); + FileAppenderFactory factory = new GenericAppenderFactory(table.schema(), table.spec()); + PositionDeleteWriter writer = factory.newPosDeleteWriter(encrypt(out), format, partition); try (Closeable toClose = writer) { for (Pair delete : deletes) { writer.delete(delete.first(), delete.second()); @@ -76,16 +78,14 @@ public static DeleteFile writeDeleteFile(Table table, OutputFile out, List deletes, Schema deleteRowSchema) throws IOException { - EqualityDeleteWriter writer = Parquet.writeDeletes(out) - .forTable(table) - .withPartition(partition) - .rowSchema(deleteRowSchema) - .createWriterFunc(GenericParquetWriter::buildWriter) - .overwrite() - .equalityFieldIds(deleteRowSchema.columns().stream().mapToInt(Types.NestedField::fieldId).toArray()) - .buildEqualityWriter(); + List deletes, Schema deleteRowSchema) + throws IOException { + FileFormat format = defaultFormat(table.properties()); + int[] equalityFieldIds = deleteRowSchema.columns().stream().mapToInt(Types.NestedField::fieldId).toArray(); + FileAppenderFactory factory = new GenericAppenderFactory(table.schema(), table.spec(), + equalityFieldIds, deleteRowSchema, null); + EqualityDeleteWriter writer = factory.newEqDeleteWriter(encrypt(out), format, partition); try (Closeable toClose = writer) { writer.deleteAll(deletes); } @@ -94,18 +94,16 @@ public static DeleteFile writeDeleteFile(Table table, OutputFile out, StructLike } public static DataFile writeDataFile(Table table, OutputFile out, List rows) throws IOException { - FileAppender writer = Parquet.write(out) - .createWriterFunc(GenericParquetWriter::buildWriter) - .schema(table.schema()) - .overwrite() - .build(); + FileFormat format = defaultFormat(table.properties()); + GenericAppenderFactory factory = new GenericAppenderFactory(table.schema()); + FileAppender writer = factory.newAppender(out, format); try (Closeable toClose = writer) { writer.addAll(rows); } return DataFiles.builder(table.spec()) - .withFormat(FileFormat.PARQUET) + .withFormat(format) .withPath(out.location()) .withFileSizeInBytes(writer.length()) .withSplitOffsets(writer.splitOffsets()) @@ -115,18 +113,16 @@ public static DataFile writeDataFile(Table table, OutputFile out, List r public static DataFile writeDataFile(Table table, OutputFile out, StructLike partition, List rows) throws IOException { - FileAppender writer = Parquet.write(out) - .createWriterFunc(GenericParquetWriter::buildWriter) - .schema(table.schema()) - .overwrite() - .build(); + FileFormat format = defaultFormat(table.properties()); + GenericAppenderFactory factory = new GenericAppenderFactory(table.schema(), table.spec()); + FileAppender writer = factory.newAppender(out, format); try (Closeable toClose = writer) { writer.addAll(rows); } return DataFiles.builder(table.spec()) - .withFormat(FileFormat.PARQUET) + .withFormat(format) .withPath(out.location()) .withPartition(partition) .withFileSizeInBytes(writer.length()) @@ -134,4 +130,13 @@ public static DataFile writeDataFile(Table table, OutputFile out, StructLike par .withMetrics(writer.metrics()) .build(); } + + private static EncryptedOutputFile encrypt(OutputFile out) { + return EncryptedFiles.encryptedOutput(out, EncryptionKeyMetadata.EMPTY); + } + + private static FileFormat defaultFormat(Map properties) { + String formatString = properties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT); + return FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH)); + } } From 752c0f44f721cfde79e5dadbe33c1af5acf23e62 Mon Sep 17 00:00:00 2001 From: gaborkaszab Date: Wed, 28 Sep 2022 17:16:03 +0200 Subject: [PATCH 04/12] Port: Core: Deprecate functions in DeleteWriters (apache#5771) --- .../test/java/org/apache/iceberg/data/FileHelpers.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/data/FileHelpers.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/data/FileHelpers.java index dfcb75b04b01..4919e6b060ea 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/data/FileHelpers.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/data/FileHelpers.java @@ -32,6 +32,7 @@ import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedFiles; import org.apache.iceberg.encryption.EncryptedOutputFile; @@ -62,10 +63,12 @@ public static Pair writeDeleteFile(Table table, Out FileFormat format = defaultFormat(table.properties()); FileAppenderFactory factory = new GenericAppenderFactory(table.schema(), table.spec()); - PositionDeleteWriter writer = factory.newPosDeleteWriter(encrypt(out), format, partition); + PositionDeleteWriter writer = + factory.newPosDeleteWriter(encrypt(out), format, partition); try (Closeable toClose = writer) { for (Pair delete : deletes) { - writer.delete(delete.first(), delete.second()); + PositionDelete posDelete = PositionDelete.create(); + writer.write(posDelete.set(delete.first(), delete.second(), null)); } } @@ -87,7 +90,7 @@ public static DeleteFile writeDeleteFile(Table table, OutputFile out, StructLike EqualityDeleteWriter writer = factory.newEqDeleteWriter(encrypt(out), format, partition); try (Closeable toClose = writer) { - writer.deleteAll(deletes); + writer.write(deletes); } return writer.toDeleteFile(); From 6a2535e0f64107580915efd98ad48e7ef3d39eb1 Mon Sep 17 00:00:00 2001 From: Zsolt Miskolczi Date: Thu, 19 Jan 2023 22:05:17 +0100 Subject: [PATCH 05/12] Fix compile error after PositionDeleteWriter changes --- .../org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java index dc68ce9a80be..1bb12189dc1d 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java @@ -353,7 +353,7 @@ public static DeleteFile createEqualityDeleteFile(Table table, String deleteFile part.partition(rowsToDelete.get(0)); EqualityDeleteWriter eqWriter = appenderFactory.newEqDeleteWriter(outputFile, fileFormat, part); try (EqualityDeleteWriter writer = eqWriter) { - writer.deleteAll(rowsToDelete); + writer.write(rowsToDelete); } return eqWriter.toDeleteFile(); } @@ -386,7 +386,11 @@ public static DeleteFile createPositionalDeleteFile(Table table, String deleteFi PositionDeleteWriter posWriter = appenderFactory.newPosDeleteWriter(outputFile, fileFormat, partitionKey); try (PositionDeleteWriter writer = posWriter) { - deletes.forEach(del -> writer.delete(del.path(), del.pos(), del.row())); + deletes.forEach(del -> { + PositionDelete positionDelete = PositionDelete.create(); + positionDelete.set(del.path(), del.pos(), del.row()); + writer.write(positionDelete); + }); } return posWriter.toDeleteFile(); } From 9c777848b843befedf4da635975c5279c2670511 Mon Sep 17 00:00:00 2001 From: Zsolt Miskolczi Date: Fri, 20 Jan 2023 01:05:05 +0100 Subject: [PATCH 06/12] Fix failing test --- .../TestHiveIcebergStorageHandlerNoScan.java | 9 +- iceberg/patched-iceberg-core/pom.xml | 1 + .../iceberg/BaseUpdatePartitionSpec.java | 552 ++++++++++++++++++ 3 files changed, 561 insertions(+), 1 deletion(-) create mode 100644 iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java index b00017726b16..97a4678bae5c 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java @@ -247,6 +247,7 @@ public void testSetPartitionTransform() { .build(); Table table = testTables.loadTable(identifier); + Assert.assertEquals(spec.specId(), table.spec().specId()); Assert.assertEquals(spec, table.spec()); shell.executeStatement("ALTER TABLE default.part_test SET PARTITION SPEC(year(year_field), month(month_field), " + @@ -264,7 +265,13 @@ public void testSetPartitionTransform() { .build(); table.refresh(); - Assert.assertEquals(spec, table.spec()); + Assert.assertEquals(spec.specId(), table.spec().specId()); + + for (PartitionField field : + spec.fields()) { + Assert.assertTrue(field.name(), table.spec().fields().stream().anyMatch( + tableField -> tableField.name().equals(field.name()))); + } } @Test diff --git a/iceberg/patched-iceberg-core/pom.xml b/iceberg/patched-iceberg-core/pom.xml index ec1bdc39afc4..82f44a7ee4fc 100644 --- a/iceberg/patched-iceberg-core/pom.xml +++ b/iceberg/patched-iceberg-core/pom.xml @@ -76,6 +76,7 @@ ${project.build.directory}/classes **/HadoopInputFile.class + **/BaseUpdatePartitionSpec.class diff --git a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java new file mode 100644 index 000000000000..aac7b403af0d --- /dev/null +++ b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java @@ -0,0 +1,552 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.expressions.BoundReference; +import org.apache.iceberg.expressions.BoundTerm; +import org.apache.iceberg.expressions.BoundTransform; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.Term; +import org.apache.iceberg.expressions.UnboundTerm; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.transforms.PartitionSpecVisitor; +import org.apache.iceberg.transforms.Transform; +import org.apache.iceberg.transforms.Transforms; +import org.apache.iceberg.transforms.UnknownTransform; +import org.apache.iceberg.util.Pair; + +public class BaseUpdatePartitionSpec implements UpdatePartitionSpec { + private final TableOperations ops; + private final TableMetadata base; + private final int formatVersion; + private final PartitionSpec spec; + private final Schema schema; + private final Map nameToField; + private final Map, PartitionField> transformToField; + + private final List adds = Lists.newArrayList(); + private final Map addedTimeFields = Maps.newHashMap(); + private final Map, PartitionField> transformToAddedField = + Maps.newHashMap(); + private final Map nameToAddedField = Maps.newHashMap(); + private final Set deletes = Sets.newHashSet(); + private final Map renames = Maps.newHashMap(); + + private boolean caseSensitive; + private int lastAssignedPartitionId; + + BaseUpdatePartitionSpec(TableOperations ops) { + this.ops = ops; + this.caseSensitive = true; + this.base = ops.current(); + this.formatVersion = base.formatVersion(); + this.spec = base.spec(); + this.schema = spec.schema(); + this.nameToField = indexSpecByName(spec); + this.transformToField = indexSpecByTransform(spec); + this.lastAssignedPartitionId = base.lastAssignedPartitionId(); + + spec.fields().stream() + .filter(field -> field.transform() instanceof UnknownTransform) + .findAny() + .ifPresent( + field -> { + throw new IllegalArgumentException( + "Cannot update partition spec with unknown transform: " + field); + }); + } + + /** + * For testing only. + */ + @VisibleForTesting + BaseUpdatePartitionSpec(int formatVersion, PartitionSpec spec) { + this(formatVersion, spec, spec.lastAssignedFieldId()); + } + + /** + * For testing only. + */ + @VisibleForTesting + BaseUpdatePartitionSpec(int formatVersion, PartitionSpec spec, int lastAssignedPartitionId) { + this.ops = null; + this.base = null; + this.formatVersion = formatVersion; + this.caseSensitive = true; + this.spec = spec; + this.schema = spec.schema(); + this.nameToField = indexSpecByName(spec); + this.transformToField = indexSpecByTransform(spec); + this.lastAssignedPartitionId = lastAssignedPartitionId; + } + + private int assignFieldId() { + this.lastAssignedPartitionId += 1; + return lastAssignedPartitionId; + } + + /** + * In V2 it searches for a similar partition field in historical partition specs. Tries to match + * on source field ID, transform type and target name (optional). If not found or in V1 cases it + * creates a new PartitionField. + * + * @param sourceTransform pair of source ID and transform for this PartitionField addition + * @param name target partition field name, if specified + * @return the recycled or newly created partition field + */ + private PartitionField recycleOrCreatePartitionField( + Pair> sourceTransform, String name) { + if (formatVersion == 2 && base != null) { + int sourceId = sourceTransform.first(); + Transform transform = sourceTransform.second(); + + Set allHistoricalFields = Sets.newHashSet(); + for (PartitionSpec partitionSpec : base.specs()) { + allHistoricalFields.addAll(partitionSpec.fields()); + } + + for (PartitionField field : allHistoricalFields) { + if (field.sourceId() == sourceId && field.transform().equals(transform)) { + // if target name is specified then consider it too, otherwise not + if (name == null || field.name().equals(name)) { + return field; + } + } + } + } + return new PartitionField( + sourceTransform.first(), assignFieldId(), name, sourceTransform.second()); + } + + @Override + public UpdatePartitionSpec caseSensitive(boolean isCaseSensitive) { + this.caseSensitive = isCaseSensitive; + return this; + } + + @Override + public BaseUpdatePartitionSpec addField(String sourceName) { + return addField(Expressions.ref(sourceName)); + } + + @Override + public BaseUpdatePartitionSpec addField(Term term) { + return addField(null, term); + } + + private BaseUpdatePartitionSpec rewriteDeleteAndAddField( + PartitionField existing, String name, Pair> sourceTransform) { + deletes.remove(existing.fieldId()); + if (name == null || existing.name().equals(name)) { + return this; + } else { + return renameField(existing.name(), name); + } + } + + @Override + public BaseUpdatePartitionSpec addField(String name, Term term) { + PartitionField alreadyAdded = nameToAddedField.get(name); + Preconditions.checkArgument( + alreadyAdded == null, "Cannot add duplicate partition field: %s", alreadyAdded); + + Pair> sourceTransform = resolve(term); + Pair validationKey = + Pair.of(sourceTransform.first(), sourceTransform.second().toString()); + + PartitionField existing = transformToField.get(validationKey); + if (existing != null && + deletes.contains(existing.fieldId()) && + existing.transform().equals(sourceTransform.second())) { + return rewriteDeleteAndAddField(existing, name, sourceTransform); + } + + Preconditions.checkArgument( + existing == null || + (deletes.contains(existing.fieldId()) && + !existing.name().equals(name)), +// !existing.transform().toString().equals(sourceTransform.second().toString())), + "Cannot add duplicate partition field %s=%s, conflicts with lofasz %s", + name, + term, + existing); + + PartitionField added = transformToAddedField.get(validationKey); + Preconditions.checkArgument( + added == null, + "Cannot add duplicate partition field %s=%s, already added: %s", + name, + term, + added); + + PartitionField newField = recycleOrCreatePartitionField(sourceTransform, name); + if (newField.name() == null) { + String partitionName = + PartitionSpecVisitor.visit(schema, newField, PartitionNameGenerator.INSTANCE); + newField = + new PartitionField( + newField.sourceId(), newField.fieldId(), partitionName, newField.transform()); + } + + checkForRedundantAddedPartitions(newField); + transformToAddedField.put(validationKey, newField); + + PartitionField existingField = nameToField.get(newField.name()); + if (existingField != null && !deletes.contains(existingField.fieldId())) { + if (isVoidTransform(existingField)) { + // rename the old deleted field that is being replaced by the new field + renameField(existingField.name(), existingField.name() + "_" + existingField.fieldId()); + } else { + throw new IllegalArgumentException( + String.format("Cannot add duplicate partition field name: %s", name)); + } + } else if (existingField != null && deletes.contains(existingField.fieldId())) { + renames.put(existingField.name(), existingField.name() + "_" + existingField.fieldId()); + } + + nameToAddedField.put(newField.name(), newField); + + adds.add(newField); + + return this; + } + + @Override + public BaseUpdatePartitionSpec removeField(String name) { + PartitionField alreadyAdded = nameToAddedField.get(name); + Preconditions.checkArgument( + alreadyAdded == null, "Cannot delete newly added field: %s", alreadyAdded); + + Preconditions.checkArgument( + renames.get(name) == null, "Cannot rename and delete partition field: %s", name); + + PartitionField field = nameToField.get(name); + Preconditions.checkArgument(field != null, "Cannot find partition field to remove: %s", name); + + deletes.add(field.fieldId()); + + return this; + } + + @Override + public BaseUpdatePartitionSpec removeField(Term term) { + Pair> sourceTransform = resolve(term); + Pair key = + Pair.of(sourceTransform.first(), sourceTransform.second().toString()); + + PartitionField added = transformToAddedField.get(key); + Preconditions.checkArgument(added == null, "Cannot delete newly added field: %s", added); + + PartitionField field = transformToField.get(key); + Preconditions.checkArgument(field != null, "Cannot find partition field to remove: %s", term); + Preconditions.checkArgument( + renames.get(field.name()) == null, + "Cannot rename and delete partition field: %s", + field.name()); + + deletes.add(field.fieldId()); + + return this; + } + + @Override + public BaseUpdatePartitionSpec renameField(String name, String newName) { + PartitionField existingField = nameToField.get(newName); + if (existingField != null && isVoidTransform(existingField)) { + // rename the old deleted field that is being replaced by the new field + renameField(existingField.name(), existingField.name() + "_" + existingField.fieldId()); + } + + PartitionField added = nameToAddedField.get(name); + Preconditions.checkArgument( + added == null, "Cannot rename newly added partition field: %s", name); + + PartitionField field = nameToField.get(name); + Preconditions.checkArgument(field != null, "Cannot find partition field to rename: %s", name); + Preconditions.checkArgument( + !deletes.contains(field.fieldId()), "Cannot delete and rename partition field: %s", name); + + renames.put(name, newName); + + return this; + } + + @Override + public PartitionSpec apply() { + PartitionSpec.Builder builder = PartitionSpec.builderFor(schema); + + for (PartitionField field : spec.fields()) { + if (!deletes.contains(field.fieldId())) { + String newName = renames.get(field.name()); + if (newName != null) { + builder.add(field.sourceId(), field.fieldId(), newName, field.transform()); + } else { + builder.add(field.sourceId(), field.fieldId(), field.name(), field.transform()); + } + } else if (formatVersion < 2) { + // field IDs were not required for v1 and were assigned sequentially in each partition spec + // starting at 1,000. + // to maintain consistent field ids across partition specs in v1 tables, any partition field + // that is removed + // must be replaced with a null transform. null values are always allowed in partition data. + String newName = renames.get(field.name()); + if (newName != null) { + builder.add(field.sourceId(), field.fieldId(), newName, Transforms.alwaysNull()); + } else { + builder.add(field.sourceId(), field.fieldId(), field.name(), Transforms.alwaysNull()); + } + } + } + + for (PartitionField newField : adds) { + builder.add(newField.sourceId(), newField.fieldId(), newField.name(), newField.transform()); + } + + return builder.build(); + } + + @Override + public void commit() { + TableMetadata update = base.updatePartitionSpec(apply()); + ops.commit(base, update); + } + + private Pair> resolve(Term term) { + Preconditions.checkArgument(term instanceof UnboundTerm, "Term must be unbound"); + + BoundTerm boundTerm = ((UnboundTerm) term).bind(schema.asStruct(), caseSensitive); + int sourceId = boundTerm.ref().fieldId(); + Transform transform = toTransform(boundTerm); + + return Pair.of(sourceId, transform); + } + + private Transform toTransform(BoundTerm term) { + if (term instanceof BoundReference) { + return Transforms.identity(term.type()); + } else if (term instanceof BoundTransform) { + return ((BoundTransform) term).transform(); + } else { + throw new ValidationException( + "Invalid term: %s, expected either a bound reference or transform", term); + } + } + + private void checkForRedundantAddedPartitions(PartitionField field) { + if (isTimeTransform(field)) { + PartitionField timeField = addedTimeFields.get(field.sourceId()); + Preconditions.checkArgument( + timeField == null, + "Cannot add redundant partition field: %s conflicts with %s", + timeField, + field); + addedTimeFields.put(field.sourceId(), field); + } + } + + private static Map indexSpecByName(PartitionSpec spec) { + ImmutableMap.Builder builder = ImmutableMap.builder(); + List fields = spec.fields(); + for (PartitionField field : fields) { + builder.put(field.name(), field); + } + + return builder.build(); + } + + private static Map, PartitionField> indexSpecByTransform( + PartitionSpec spec) { + Map, PartitionField> indexSpecs = Maps.newHashMap(); + List fields = spec.fields(); + for (PartitionField field : fields) { + indexSpecs.put(Pair.of(field.sourceId(), field.transform().toString()), field); + } + + return indexSpecs; + } + + private boolean isTimeTransform(PartitionField field) { + return PartitionSpecVisitor.visit(schema, field, IsTimeTransform.INSTANCE); + } + + private static class IsTimeTransform implements PartitionSpecVisitor { + private static final IsTimeTransform INSTANCE = new IsTimeTransform(); + + private IsTimeTransform() { + } + + @Override + public Boolean identity(int fieldId, String sourceName, int sourceId) { + return false; + } + + @Override + public Boolean bucket(int fieldId, String sourceName, int sourceId, int numBuckets) { + return false; + } + + @Override + public Boolean truncate(int fieldId, String sourceName, int sourceId, int width) { + return false; + } + + @Override + public Boolean year(int fieldId, String sourceName, int sourceId) { + return true; + } + + @Override + public Boolean month(int fieldId, String sourceName, int sourceId) { + return true; + } + + @Override + public Boolean day(int fieldId, String sourceName, int sourceId) { + return true; + } + + @Override + public Boolean hour(int fieldId, String sourceName, int sourceId) { + return true; + } + + @Override + public Boolean alwaysNull(int fieldId, String sourceName, int sourceId) { + return false; + } + + @Override + public Boolean unknown(int fieldId, String sourceName, int sourceId, String transform) { + return false; + } + } + + private boolean isVoidTransform(PartitionField field) { + return PartitionSpecVisitor.visit(schema, field, IsVoidTransform.INSTANCE); + } + + private static class IsVoidTransform implements PartitionSpecVisitor { + private static final IsVoidTransform INSTANCE = new IsVoidTransform(); + + private IsVoidTransform() { + } + + @Override + public Boolean identity(int fieldId, String sourceName, int sourceId) { + return false; + } + + @Override + public Boolean bucket(int fieldId, String sourceName, int sourceId, int numBuckets) { + return false; + } + + @Override + public Boolean truncate(int fieldId, String sourceName, int sourceId, int width) { + return false; + } + + @Override + public Boolean year(int fieldId, String sourceName, int sourceId) { + return false; + } + + @Override + public Boolean month(int fieldId, String sourceName, int sourceId) { + return false; + } + + @Override + public Boolean day(int fieldId, String sourceName, int sourceId) { + return false; + } + + @Override + public Boolean hour(int fieldId, String sourceName, int sourceId) { + return false; + } + + @Override + public Boolean alwaysNull(int fieldId, String sourceName, int sourceId) { + return true; + } + + @Override + public Boolean unknown(int fieldId, String sourceName, int sourceId, String transform) { + return false; + } + } + + private static class PartitionNameGenerator implements PartitionSpecVisitor { + private static final PartitionNameGenerator INSTANCE = new PartitionNameGenerator(); + + private PartitionNameGenerator() { + } + + @Override + public String identity(int fieldId, String sourceName, int sourceId) { + return sourceName; + } + + @Override + public String bucket(int fieldId, String sourceName, int sourceId, int numBuckets) { + return sourceName + "_bucket_" + numBuckets; + } + + @Override + public String truncate(int fieldId, String sourceName, int sourceId, int width) { + return sourceName + "_trunc_" + width; + } + + @Override + public String year(int fieldId, String sourceName, int sourceId) { + return sourceName + "_year"; + } + + @Override + public String month(int fieldId, String sourceName, int sourceId) { + return sourceName + "_month"; + } + + @Override + public String day(int fieldId, String sourceName, int sourceId) { + return sourceName + "_day"; + } + + @Override + public String hour(int fieldId, String sourceName, int sourceId) { + return sourceName + "_hour"; + } + + @Override + public String alwaysNull(int fieldId, String sourceName, int sourceId) { + return sourceName + "_null"; + } + } +} From 67506d6808738cdd875a52964ac39a7175977959 Mon Sep 17 00:00:00 2001 From: Zsolt Miskolczi Date: Mon, 23 Jan 2023 16:09:59 +0100 Subject: [PATCH 07/12] Fix failing test - alter partition spec --- .../iceberg/mr/hive/IcebergTableUtil.java | 72 ++- .../TestHiveIcebergStorageHandlerNoScan.java | 120 ++-- iceberg/patched-iceberg-core/pom.xml | 1 - .../iceberg/BaseUpdatePartitionSpec.java | 552 ------------------ 4 files changed, 150 insertions(+), 595 deletions(-) delete mode 100644 iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java index 23244fc1adb0..015ad6c2c841 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java @@ -20,6 +20,7 @@ package org.apache.iceberg.mr.hive; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.function.Function; import org.apache.hadoop.conf.Configuration; @@ -29,14 +30,25 @@ import org.apache.hadoop.hive.ql.parse.TransformSpec; import org.apache.hadoop.hive.ql.session.SessionStateUtil; import org.apache.iceberg.ManageSnapshots; +import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.UpdatePartitionSpec; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.expressions.BoundReference; +import org.apache.iceberg.expressions.BoundTerm; +import org.apache.iceberg.expressions.BoundTransform; import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.Term; +import org.apache.iceberg.expressions.UnboundTerm; import org.apache.iceberg.mr.Catalogs; import org.apache.iceberg.mr.InputFormatConfig; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.transforms.Transform; +import org.apache.iceberg.transforms.Transforms; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -163,6 +175,25 @@ public static PartitionSpec spec(Configuration configuration, Schema schema) { return builder.build(); } + private static Transform resolve(Term term, Schema schema) { + Preconditions.checkArgument(term instanceof UnboundTerm, "Term must be unbound"); + + BoundTerm boundTerm = ((UnboundTerm) term).bind(schema.asStruct(), false); + Transform transform = toTransform(boundTerm); + + return transform; + } + + private static Transform toTransform(BoundTerm term) { + if (term instanceof BoundReference) { + return Transforms.identity(term.type()); + } else if (term instanceof BoundTransform) { + return ((BoundTransform) term).transform(); + } else { + throw new ValidationException("Invalid term: %s, expected either a bound reference or transform", term); + } + } + public static void updateSpec(Configuration configuration, Table table) { // get the new partition transform spec PartitionSpec newPartitionSpec = spec(configuration, table.schema()); @@ -171,10 +202,15 @@ public static void updateSpec(Configuration configuration, Table table) { return; } - // delete every field from the old partition spec UpdatePartitionSpec updatePartitionSpec = table.updateSpec().caseSensitive(false); - table.spec().fields().forEach(field -> updatePartitionSpec.removeField(field.name())); + // Maintain a map of existing fields to avoid removing and adding the same field again + Map fields = Maps.newHashMap(); + for (PartitionField existingField : table.spec().fields()) { + fields.put(existingField.transform().toString(), existingField.name()); + } + + Schema schema = table.spec().schema(); List partitionTransformSpecList = SessionStateUtil .getResource(configuration, hive_metastoreConstants.PARTITION_TRANSFORM_SPEC) .map(o -> (List) o).orElseGet(() -> null); @@ -182,29 +218,59 @@ public static void updateSpec(Configuration configuration, Table table) { partitionTransformSpecList.forEach(spec -> { switch (spec.getTransformType()) { case IDENTITY: - updatePartitionSpec.addField(spec.getColumnName()); + if (fields.remove(resolve(Expressions.ref(spec.getColumnName()), schema).toString()) != null) { + break; + } + updatePartitionSpec.addField(Expressions.ref(spec.getColumnName())); break; case YEAR: + if (fields.remove(resolve(Expressions.year(spec.getColumnName()), schema).toString()) != null) { + break; + } updatePartitionSpec.addField(Expressions.year(spec.getColumnName())); break; case MONTH: + if (fields.remove(resolve(Expressions.month(spec.getColumnName()), schema).toString()) != null) { + break; + } updatePartitionSpec.addField(Expressions.month(spec.getColumnName())); break; case DAY: + if (fields.remove(resolve(Expressions.day(spec.getColumnName()), schema).toString()) != null) { + break; + } updatePartitionSpec.addField(Expressions.day(spec.getColumnName())); break; case HOUR: + if (fields.remove(resolve(Expressions.hour(spec.getColumnName()), schema).toString()) != null) { + break; + } updatePartitionSpec.addField(Expressions.hour(spec.getColumnName())); break; case TRUNCATE: + if (fields.remove( + resolve(Expressions.truncate(spec.getColumnName(), spec.getTransformParam().get()), schema).toString()) != + null) { + break; + } updatePartitionSpec.addField(Expressions.truncate(spec.getColumnName(), spec.getTransformParam().get())); break; case BUCKET: + if (fields.remove( + resolve(Expressions.bucket(spec.getColumnName(), spec.getTransformParam().get()), schema).toString()) != + null) { + break; + } updatePartitionSpec.addField(Expressions.bucket(spec.getColumnName(), spec.getTransformParam().get())); break; } }); + // remove the remaining fields where weren't added + for (Map.Entry entry : fields.entrySet()) { + updatePartitionSpec.removeField(entry.getValue()); + } + updatePartitionSpec.commit(); } diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java index 97a4678bae5c..99b53cbf2ac8 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java @@ -215,63 +215,105 @@ public void testPartitionEvolution() { Assert.assertEquals(spec, table.spec()); } + @Test + public void testSetPartitionTransformId() { + Schema schema = new Schema( + optional(1, "id", Types.LongType.get()), + optional(2, "year_field", Types.DateType.get()), + optional(3, "month_field", Types.TimestampType.withZone()), + optional(4, "day_field", Types.TimestampType.withoutZone()), + optional(5, "hour_field", Types.TimestampType.withoutZone()), + optional(6, "truncate_field", Types.StringType.get()), + optional(7, "bucket_field", Types.StringType.get()), + optional(8, "identity_field", Types.StringType.get()) + ); + + TableIdentifier identifier = TableIdentifier.of("default", "part_test"); + shell.executeStatement("CREATE EXTERNAL TABLE " + identifier + + " PARTITIONED BY SPEC (year(year_field), hour(hour_field), " + + "truncate(2, truncate_field), bucket(2, bucket_field), identity_field)" + + " STORED BY ICEBERG " + + testTables.locationForCreateTableSQL(identifier) + + "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" + + SchemaParser.toJson(schema) + "', " + + "'" + InputFormatConfig.CATALOG_NAME + "'='" + testTables.catalogName() + "')"); + + PartitionSpec spec = PartitionSpec.builderFor(schema) + .year("year_field") + .hour("hour_field") + .truncate("truncate_field", 2) + .bucket("bucket_field", 2) + .identity("identity_field") + .build(); + + Table table = testTables.loadTable(identifier); + Assert.assertEquals(spec, table.spec()); + + shell.executeStatement("ALTER TABLE default.part_test SET PARTITION SPEC(identity_field)"); + + spec = PartitionSpec.builderFor(schema) + .withSpecId(1) + .alwaysNull("year_field", "year_field_year") + .alwaysNull("hour_field", "hour_field_hour") + .alwaysNull("truncate_field", "truncate_field_trunc") + .alwaysNull("bucket_field", "bucket_field_bucket") + .identity("identity_field") + .build(); + + table.refresh(); + Assert.assertEquals(spec, table.spec()); + } + @Test public void testSetPartitionTransform() { Schema schema = new Schema( - optional(1, "id", Types.LongType.get()), - optional(2, "year_field", Types.DateType.get()), - optional(3, "month_field", Types.TimestampType.withZone()), - optional(4, "day_field", Types.TimestampType.withoutZone()), - optional(5, "hour_field", Types.TimestampType.withoutZone()), - optional(6, "truncate_field", Types.StringType.get()), - optional(7, "bucket_field", Types.StringType.get()), - optional(8, "identity_field", Types.StringType.get()) + optional(1, "id", Types.LongType.get()), + optional(2, "year_field", Types.DateType.get()), + optional(3, "month_field", Types.TimestampType.withZone()), + optional(4, "day_field", Types.TimestampType.withoutZone()), + optional(5, "hour_field", Types.TimestampType.withoutZone()), + optional(6, "truncate_field", Types.StringType.get()), + optional(7, "bucket_field", Types.StringType.get()), + optional(8, "identity_field", Types.StringType.get()) ); TableIdentifier identifier = TableIdentifier.of("default", "part_test"); shell.executeStatement("CREATE EXTERNAL TABLE " + identifier + - " PARTITIONED BY SPEC (year(year_field), hour(hour_field), " + - "truncate(2, truncate_field), bucket(2, bucket_field), identity_field)" + - " STORED BY ICEBERG " + - testTables.locationForCreateTableSQL(identifier) + - "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" + - SchemaParser.toJson(schema) + "', " + - "'" + InputFormatConfig.CATALOG_NAME + "'='" + testTables.catalogName() + "')"); + " PARTITIONED BY SPEC (year(year_field), hour(hour_field), " + + "truncate(2, truncate_field), bucket(2, bucket_field), identity_field)" + + " STORED BY ICEBERG " + + testTables.locationForCreateTableSQL(identifier) + + "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" + + SchemaParser.toJson(schema) + "', " + + "'" + InputFormatConfig.CATALOG_NAME + "'='" + testTables.catalogName() + "')"); PartitionSpec spec = PartitionSpec.builderFor(schema) - .year("year_field") - .hour("hour_field") - .truncate("truncate_field", 2) - .bucket("bucket_field", 2) - .identity("identity_field") - .build(); + .year("year_field") + .hour("hour_field") + .truncate("truncate_field", 2) + .bucket("bucket_field", 2) + .identity("identity_field") + .build(); Table table = testTables.loadTable(identifier); - Assert.assertEquals(spec.specId(), table.spec().specId()); Assert.assertEquals(spec, table.spec()); shell.executeStatement("ALTER TABLE default.part_test SET PARTITION SPEC(year(year_field), month(month_field), " + - "day(day_field))"); + "day(day_field))"); spec = PartitionSpec.builderFor(schema) - .withSpecId(1) - .year("year_field") - .alwaysNull("hour_field", "hour_field_hour") - .alwaysNull("truncate_field", "truncate_field_trunc") - .alwaysNull("bucket_field", "bucket_field_bucket") - .alwaysNull("identity_field", "identity_field") - .month("month_field") - .day("day_field") - .build(); + .withSpecId(1) + .year("year_field") + .alwaysNull("hour_field", "hour_field_hour") + .alwaysNull("truncate_field", "truncate_field_trunc") + .alwaysNull("bucket_field", "bucket_field_bucket") + .alwaysNull("identity_field", "identity_field") + .month("month_field") + .day("day_field") + .build(); table.refresh(); - Assert.assertEquals(spec.specId(), table.spec().specId()); - - for (PartitionField field : - spec.fields()) { - Assert.assertTrue(field.name(), table.spec().fields().stream().anyMatch( - tableField -> tableField.name().equals(field.name()))); - } + Assert.assertEquals(spec, table.spec()); } @Test diff --git a/iceberg/patched-iceberg-core/pom.xml b/iceberg/patched-iceberg-core/pom.xml index 82f44a7ee4fc..ec1bdc39afc4 100644 --- a/iceberg/patched-iceberg-core/pom.xml +++ b/iceberg/patched-iceberg-core/pom.xml @@ -76,7 +76,6 @@ ${project.build.directory}/classes **/HadoopInputFile.class - **/BaseUpdatePartitionSpec.class diff --git a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java deleted file mode 100644 index aac7b403af0d..000000000000 --- a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java +++ /dev/null @@ -1,552 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg; - -import java.util.List; -import java.util.Map; -import java.util.Set; -import org.apache.iceberg.exceptions.ValidationException; -import org.apache.iceberg.expressions.BoundReference; -import org.apache.iceberg.expressions.BoundTerm; -import org.apache.iceberg.expressions.BoundTransform; -import org.apache.iceberg.expressions.Expressions; -import org.apache.iceberg.expressions.Term; -import org.apache.iceberg.expressions.UnboundTerm; -import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.transforms.PartitionSpecVisitor; -import org.apache.iceberg.transforms.Transform; -import org.apache.iceberg.transforms.Transforms; -import org.apache.iceberg.transforms.UnknownTransform; -import org.apache.iceberg.util.Pair; - -public class BaseUpdatePartitionSpec implements UpdatePartitionSpec { - private final TableOperations ops; - private final TableMetadata base; - private final int formatVersion; - private final PartitionSpec spec; - private final Schema schema; - private final Map nameToField; - private final Map, PartitionField> transformToField; - - private final List adds = Lists.newArrayList(); - private final Map addedTimeFields = Maps.newHashMap(); - private final Map, PartitionField> transformToAddedField = - Maps.newHashMap(); - private final Map nameToAddedField = Maps.newHashMap(); - private final Set deletes = Sets.newHashSet(); - private final Map renames = Maps.newHashMap(); - - private boolean caseSensitive; - private int lastAssignedPartitionId; - - BaseUpdatePartitionSpec(TableOperations ops) { - this.ops = ops; - this.caseSensitive = true; - this.base = ops.current(); - this.formatVersion = base.formatVersion(); - this.spec = base.spec(); - this.schema = spec.schema(); - this.nameToField = indexSpecByName(spec); - this.transformToField = indexSpecByTransform(spec); - this.lastAssignedPartitionId = base.lastAssignedPartitionId(); - - spec.fields().stream() - .filter(field -> field.transform() instanceof UnknownTransform) - .findAny() - .ifPresent( - field -> { - throw new IllegalArgumentException( - "Cannot update partition spec with unknown transform: " + field); - }); - } - - /** - * For testing only. - */ - @VisibleForTesting - BaseUpdatePartitionSpec(int formatVersion, PartitionSpec spec) { - this(formatVersion, spec, spec.lastAssignedFieldId()); - } - - /** - * For testing only. - */ - @VisibleForTesting - BaseUpdatePartitionSpec(int formatVersion, PartitionSpec spec, int lastAssignedPartitionId) { - this.ops = null; - this.base = null; - this.formatVersion = formatVersion; - this.caseSensitive = true; - this.spec = spec; - this.schema = spec.schema(); - this.nameToField = indexSpecByName(spec); - this.transformToField = indexSpecByTransform(spec); - this.lastAssignedPartitionId = lastAssignedPartitionId; - } - - private int assignFieldId() { - this.lastAssignedPartitionId += 1; - return lastAssignedPartitionId; - } - - /** - * In V2 it searches for a similar partition field in historical partition specs. Tries to match - * on source field ID, transform type and target name (optional). If not found or in V1 cases it - * creates a new PartitionField. - * - * @param sourceTransform pair of source ID and transform for this PartitionField addition - * @param name target partition field name, if specified - * @return the recycled or newly created partition field - */ - private PartitionField recycleOrCreatePartitionField( - Pair> sourceTransform, String name) { - if (formatVersion == 2 && base != null) { - int sourceId = sourceTransform.first(); - Transform transform = sourceTransform.second(); - - Set allHistoricalFields = Sets.newHashSet(); - for (PartitionSpec partitionSpec : base.specs()) { - allHistoricalFields.addAll(partitionSpec.fields()); - } - - for (PartitionField field : allHistoricalFields) { - if (field.sourceId() == sourceId && field.transform().equals(transform)) { - // if target name is specified then consider it too, otherwise not - if (name == null || field.name().equals(name)) { - return field; - } - } - } - } - return new PartitionField( - sourceTransform.first(), assignFieldId(), name, sourceTransform.second()); - } - - @Override - public UpdatePartitionSpec caseSensitive(boolean isCaseSensitive) { - this.caseSensitive = isCaseSensitive; - return this; - } - - @Override - public BaseUpdatePartitionSpec addField(String sourceName) { - return addField(Expressions.ref(sourceName)); - } - - @Override - public BaseUpdatePartitionSpec addField(Term term) { - return addField(null, term); - } - - private BaseUpdatePartitionSpec rewriteDeleteAndAddField( - PartitionField existing, String name, Pair> sourceTransform) { - deletes.remove(existing.fieldId()); - if (name == null || existing.name().equals(name)) { - return this; - } else { - return renameField(existing.name(), name); - } - } - - @Override - public BaseUpdatePartitionSpec addField(String name, Term term) { - PartitionField alreadyAdded = nameToAddedField.get(name); - Preconditions.checkArgument( - alreadyAdded == null, "Cannot add duplicate partition field: %s", alreadyAdded); - - Pair> sourceTransform = resolve(term); - Pair validationKey = - Pair.of(sourceTransform.first(), sourceTransform.second().toString()); - - PartitionField existing = transformToField.get(validationKey); - if (existing != null && - deletes.contains(existing.fieldId()) && - existing.transform().equals(sourceTransform.second())) { - return rewriteDeleteAndAddField(existing, name, sourceTransform); - } - - Preconditions.checkArgument( - existing == null || - (deletes.contains(existing.fieldId()) && - !existing.name().equals(name)), -// !existing.transform().toString().equals(sourceTransform.second().toString())), - "Cannot add duplicate partition field %s=%s, conflicts with lofasz %s", - name, - term, - existing); - - PartitionField added = transformToAddedField.get(validationKey); - Preconditions.checkArgument( - added == null, - "Cannot add duplicate partition field %s=%s, already added: %s", - name, - term, - added); - - PartitionField newField = recycleOrCreatePartitionField(sourceTransform, name); - if (newField.name() == null) { - String partitionName = - PartitionSpecVisitor.visit(schema, newField, PartitionNameGenerator.INSTANCE); - newField = - new PartitionField( - newField.sourceId(), newField.fieldId(), partitionName, newField.transform()); - } - - checkForRedundantAddedPartitions(newField); - transformToAddedField.put(validationKey, newField); - - PartitionField existingField = nameToField.get(newField.name()); - if (existingField != null && !deletes.contains(existingField.fieldId())) { - if (isVoidTransform(existingField)) { - // rename the old deleted field that is being replaced by the new field - renameField(existingField.name(), existingField.name() + "_" + existingField.fieldId()); - } else { - throw new IllegalArgumentException( - String.format("Cannot add duplicate partition field name: %s", name)); - } - } else if (existingField != null && deletes.contains(existingField.fieldId())) { - renames.put(existingField.name(), existingField.name() + "_" + existingField.fieldId()); - } - - nameToAddedField.put(newField.name(), newField); - - adds.add(newField); - - return this; - } - - @Override - public BaseUpdatePartitionSpec removeField(String name) { - PartitionField alreadyAdded = nameToAddedField.get(name); - Preconditions.checkArgument( - alreadyAdded == null, "Cannot delete newly added field: %s", alreadyAdded); - - Preconditions.checkArgument( - renames.get(name) == null, "Cannot rename and delete partition field: %s", name); - - PartitionField field = nameToField.get(name); - Preconditions.checkArgument(field != null, "Cannot find partition field to remove: %s", name); - - deletes.add(field.fieldId()); - - return this; - } - - @Override - public BaseUpdatePartitionSpec removeField(Term term) { - Pair> sourceTransform = resolve(term); - Pair key = - Pair.of(sourceTransform.first(), sourceTransform.second().toString()); - - PartitionField added = transformToAddedField.get(key); - Preconditions.checkArgument(added == null, "Cannot delete newly added field: %s", added); - - PartitionField field = transformToField.get(key); - Preconditions.checkArgument(field != null, "Cannot find partition field to remove: %s", term); - Preconditions.checkArgument( - renames.get(field.name()) == null, - "Cannot rename and delete partition field: %s", - field.name()); - - deletes.add(field.fieldId()); - - return this; - } - - @Override - public BaseUpdatePartitionSpec renameField(String name, String newName) { - PartitionField existingField = nameToField.get(newName); - if (existingField != null && isVoidTransform(existingField)) { - // rename the old deleted field that is being replaced by the new field - renameField(existingField.name(), existingField.name() + "_" + existingField.fieldId()); - } - - PartitionField added = nameToAddedField.get(name); - Preconditions.checkArgument( - added == null, "Cannot rename newly added partition field: %s", name); - - PartitionField field = nameToField.get(name); - Preconditions.checkArgument(field != null, "Cannot find partition field to rename: %s", name); - Preconditions.checkArgument( - !deletes.contains(field.fieldId()), "Cannot delete and rename partition field: %s", name); - - renames.put(name, newName); - - return this; - } - - @Override - public PartitionSpec apply() { - PartitionSpec.Builder builder = PartitionSpec.builderFor(schema); - - for (PartitionField field : spec.fields()) { - if (!deletes.contains(field.fieldId())) { - String newName = renames.get(field.name()); - if (newName != null) { - builder.add(field.sourceId(), field.fieldId(), newName, field.transform()); - } else { - builder.add(field.sourceId(), field.fieldId(), field.name(), field.transform()); - } - } else if (formatVersion < 2) { - // field IDs were not required for v1 and were assigned sequentially in each partition spec - // starting at 1,000. - // to maintain consistent field ids across partition specs in v1 tables, any partition field - // that is removed - // must be replaced with a null transform. null values are always allowed in partition data. - String newName = renames.get(field.name()); - if (newName != null) { - builder.add(field.sourceId(), field.fieldId(), newName, Transforms.alwaysNull()); - } else { - builder.add(field.sourceId(), field.fieldId(), field.name(), Transforms.alwaysNull()); - } - } - } - - for (PartitionField newField : adds) { - builder.add(newField.sourceId(), newField.fieldId(), newField.name(), newField.transform()); - } - - return builder.build(); - } - - @Override - public void commit() { - TableMetadata update = base.updatePartitionSpec(apply()); - ops.commit(base, update); - } - - private Pair> resolve(Term term) { - Preconditions.checkArgument(term instanceof UnboundTerm, "Term must be unbound"); - - BoundTerm boundTerm = ((UnboundTerm) term).bind(schema.asStruct(), caseSensitive); - int sourceId = boundTerm.ref().fieldId(); - Transform transform = toTransform(boundTerm); - - return Pair.of(sourceId, transform); - } - - private Transform toTransform(BoundTerm term) { - if (term instanceof BoundReference) { - return Transforms.identity(term.type()); - } else if (term instanceof BoundTransform) { - return ((BoundTransform) term).transform(); - } else { - throw new ValidationException( - "Invalid term: %s, expected either a bound reference or transform", term); - } - } - - private void checkForRedundantAddedPartitions(PartitionField field) { - if (isTimeTransform(field)) { - PartitionField timeField = addedTimeFields.get(field.sourceId()); - Preconditions.checkArgument( - timeField == null, - "Cannot add redundant partition field: %s conflicts with %s", - timeField, - field); - addedTimeFields.put(field.sourceId(), field); - } - } - - private static Map indexSpecByName(PartitionSpec spec) { - ImmutableMap.Builder builder = ImmutableMap.builder(); - List fields = spec.fields(); - for (PartitionField field : fields) { - builder.put(field.name(), field); - } - - return builder.build(); - } - - private static Map, PartitionField> indexSpecByTransform( - PartitionSpec spec) { - Map, PartitionField> indexSpecs = Maps.newHashMap(); - List fields = spec.fields(); - for (PartitionField field : fields) { - indexSpecs.put(Pair.of(field.sourceId(), field.transform().toString()), field); - } - - return indexSpecs; - } - - private boolean isTimeTransform(PartitionField field) { - return PartitionSpecVisitor.visit(schema, field, IsTimeTransform.INSTANCE); - } - - private static class IsTimeTransform implements PartitionSpecVisitor { - private static final IsTimeTransform INSTANCE = new IsTimeTransform(); - - private IsTimeTransform() { - } - - @Override - public Boolean identity(int fieldId, String sourceName, int sourceId) { - return false; - } - - @Override - public Boolean bucket(int fieldId, String sourceName, int sourceId, int numBuckets) { - return false; - } - - @Override - public Boolean truncate(int fieldId, String sourceName, int sourceId, int width) { - return false; - } - - @Override - public Boolean year(int fieldId, String sourceName, int sourceId) { - return true; - } - - @Override - public Boolean month(int fieldId, String sourceName, int sourceId) { - return true; - } - - @Override - public Boolean day(int fieldId, String sourceName, int sourceId) { - return true; - } - - @Override - public Boolean hour(int fieldId, String sourceName, int sourceId) { - return true; - } - - @Override - public Boolean alwaysNull(int fieldId, String sourceName, int sourceId) { - return false; - } - - @Override - public Boolean unknown(int fieldId, String sourceName, int sourceId, String transform) { - return false; - } - } - - private boolean isVoidTransform(PartitionField field) { - return PartitionSpecVisitor.visit(schema, field, IsVoidTransform.INSTANCE); - } - - private static class IsVoidTransform implements PartitionSpecVisitor { - private static final IsVoidTransform INSTANCE = new IsVoidTransform(); - - private IsVoidTransform() { - } - - @Override - public Boolean identity(int fieldId, String sourceName, int sourceId) { - return false; - } - - @Override - public Boolean bucket(int fieldId, String sourceName, int sourceId, int numBuckets) { - return false; - } - - @Override - public Boolean truncate(int fieldId, String sourceName, int sourceId, int width) { - return false; - } - - @Override - public Boolean year(int fieldId, String sourceName, int sourceId) { - return false; - } - - @Override - public Boolean month(int fieldId, String sourceName, int sourceId) { - return false; - } - - @Override - public Boolean day(int fieldId, String sourceName, int sourceId) { - return false; - } - - @Override - public Boolean hour(int fieldId, String sourceName, int sourceId) { - return false; - } - - @Override - public Boolean alwaysNull(int fieldId, String sourceName, int sourceId) { - return true; - } - - @Override - public Boolean unknown(int fieldId, String sourceName, int sourceId, String transform) { - return false; - } - } - - private static class PartitionNameGenerator implements PartitionSpecVisitor { - private static final PartitionNameGenerator INSTANCE = new PartitionNameGenerator(); - - private PartitionNameGenerator() { - } - - @Override - public String identity(int fieldId, String sourceName, int sourceId) { - return sourceName; - } - - @Override - public String bucket(int fieldId, String sourceName, int sourceId, int numBuckets) { - return sourceName + "_bucket_" + numBuckets; - } - - @Override - public String truncate(int fieldId, String sourceName, int sourceId, int width) { - return sourceName + "_trunc_" + width; - } - - @Override - public String year(int fieldId, String sourceName, int sourceId) { - return sourceName + "_year"; - } - - @Override - public String month(int fieldId, String sourceName, int sourceId) { - return sourceName + "_month"; - } - - @Override - public String day(int fieldId, String sourceName, int sourceId) { - return sourceName + "_day"; - } - - @Override - public String hour(int fieldId, String sourceName, int sourceId) { - return sourceName + "_hour"; - } - - @Override - public String alwaysNull(int fieldId, String sourceName, int sourceId) { - return sourceName + "_null"; - } - } -} From 8cd1269d5f26271ef6f263da7b67e56df6b11f85 Mon Sep 17 00:00:00 2001 From: Zsolt Miskolczi Date: Tue, 24 Jan 2023 14:20:04 +0100 Subject: [PATCH 08/12] Revert "Fix failing test - alter partition spec" This reverts commit 67506d6808738cdd875a52964ac39a7175977959. --- .../iceberg/mr/hive/IcebergTableUtil.java | 72 +-- .../TestHiveIcebergStorageHandlerNoScan.java | 120 ++-- iceberg/patched-iceberg-core/pom.xml | 1 + .../iceberg/BaseUpdatePartitionSpec.java | 552 ++++++++++++++++++ 4 files changed, 595 insertions(+), 150 deletions(-) create mode 100644 iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java index 015ad6c2c841..23244fc1adb0 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java @@ -20,7 +20,6 @@ package org.apache.iceberg.mr.hive; import java.util.List; -import java.util.Map; import java.util.Properties; import java.util.function.Function; import org.apache.hadoop.conf.Configuration; @@ -30,25 +29,14 @@ import org.apache.hadoop.hive.ql.parse.TransformSpec; import org.apache.hadoop.hive.ql.session.SessionStateUtil; import org.apache.iceberg.ManageSnapshots; -import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.UpdatePartitionSpec; import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.exceptions.ValidationException; -import org.apache.iceberg.expressions.BoundReference; -import org.apache.iceberg.expressions.BoundTerm; -import org.apache.iceberg.expressions.BoundTransform; import org.apache.iceberg.expressions.Expressions; -import org.apache.iceberg.expressions.Term; -import org.apache.iceberg.expressions.UnboundTerm; import org.apache.iceberg.mr.Catalogs; import org.apache.iceberg.mr.InputFormatConfig; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.transforms.Transform; -import org.apache.iceberg.transforms.Transforms; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -175,25 +163,6 @@ public static PartitionSpec spec(Configuration configuration, Schema schema) { return builder.build(); } - private static Transform resolve(Term term, Schema schema) { - Preconditions.checkArgument(term instanceof UnboundTerm, "Term must be unbound"); - - BoundTerm boundTerm = ((UnboundTerm) term).bind(schema.asStruct(), false); - Transform transform = toTransform(boundTerm); - - return transform; - } - - private static Transform toTransform(BoundTerm term) { - if (term instanceof BoundReference) { - return Transforms.identity(term.type()); - } else if (term instanceof BoundTransform) { - return ((BoundTransform) term).transform(); - } else { - throw new ValidationException("Invalid term: %s, expected either a bound reference or transform", term); - } - } - public static void updateSpec(Configuration configuration, Table table) { // get the new partition transform spec PartitionSpec newPartitionSpec = spec(configuration, table.schema()); @@ -202,15 +171,10 @@ public static void updateSpec(Configuration configuration, Table table) { return; } + // delete every field from the old partition spec UpdatePartitionSpec updatePartitionSpec = table.updateSpec().caseSensitive(false); + table.spec().fields().forEach(field -> updatePartitionSpec.removeField(field.name())); - // Maintain a map of existing fields to avoid removing and adding the same field again - Map fields = Maps.newHashMap(); - for (PartitionField existingField : table.spec().fields()) { - fields.put(existingField.transform().toString(), existingField.name()); - } - - Schema schema = table.spec().schema(); List partitionTransformSpecList = SessionStateUtil .getResource(configuration, hive_metastoreConstants.PARTITION_TRANSFORM_SPEC) .map(o -> (List) o).orElseGet(() -> null); @@ -218,59 +182,29 @@ public static void updateSpec(Configuration configuration, Table table) { partitionTransformSpecList.forEach(spec -> { switch (spec.getTransformType()) { case IDENTITY: - if (fields.remove(resolve(Expressions.ref(spec.getColumnName()), schema).toString()) != null) { - break; - } - updatePartitionSpec.addField(Expressions.ref(spec.getColumnName())); + updatePartitionSpec.addField(spec.getColumnName()); break; case YEAR: - if (fields.remove(resolve(Expressions.year(spec.getColumnName()), schema).toString()) != null) { - break; - } updatePartitionSpec.addField(Expressions.year(spec.getColumnName())); break; case MONTH: - if (fields.remove(resolve(Expressions.month(spec.getColumnName()), schema).toString()) != null) { - break; - } updatePartitionSpec.addField(Expressions.month(spec.getColumnName())); break; case DAY: - if (fields.remove(resolve(Expressions.day(spec.getColumnName()), schema).toString()) != null) { - break; - } updatePartitionSpec.addField(Expressions.day(spec.getColumnName())); break; case HOUR: - if (fields.remove(resolve(Expressions.hour(spec.getColumnName()), schema).toString()) != null) { - break; - } updatePartitionSpec.addField(Expressions.hour(spec.getColumnName())); break; case TRUNCATE: - if (fields.remove( - resolve(Expressions.truncate(spec.getColumnName(), spec.getTransformParam().get()), schema).toString()) != - null) { - break; - } updatePartitionSpec.addField(Expressions.truncate(spec.getColumnName(), spec.getTransformParam().get())); break; case BUCKET: - if (fields.remove( - resolve(Expressions.bucket(spec.getColumnName(), spec.getTransformParam().get()), schema).toString()) != - null) { - break; - } updatePartitionSpec.addField(Expressions.bucket(spec.getColumnName(), spec.getTransformParam().get())); break; } }); - // remove the remaining fields where weren't added - for (Map.Entry entry : fields.entrySet()) { - updatePartitionSpec.removeField(entry.getValue()); - } - updatePartitionSpec.commit(); } diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java index 99b53cbf2ac8..97a4678bae5c 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java @@ -215,105 +215,63 @@ public void testPartitionEvolution() { Assert.assertEquals(spec, table.spec()); } - @Test - public void testSetPartitionTransformId() { - Schema schema = new Schema( - optional(1, "id", Types.LongType.get()), - optional(2, "year_field", Types.DateType.get()), - optional(3, "month_field", Types.TimestampType.withZone()), - optional(4, "day_field", Types.TimestampType.withoutZone()), - optional(5, "hour_field", Types.TimestampType.withoutZone()), - optional(6, "truncate_field", Types.StringType.get()), - optional(7, "bucket_field", Types.StringType.get()), - optional(8, "identity_field", Types.StringType.get()) - ); - - TableIdentifier identifier = TableIdentifier.of("default", "part_test"); - shell.executeStatement("CREATE EXTERNAL TABLE " + identifier + - " PARTITIONED BY SPEC (year(year_field), hour(hour_field), " + - "truncate(2, truncate_field), bucket(2, bucket_field), identity_field)" + - " STORED BY ICEBERG " + - testTables.locationForCreateTableSQL(identifier) + - "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" + - SchemaParser.toJson(schema) + "', " + - "'" + InputFormatConfig.CATALOG_NAME + "'='" + testTables.catalogName() + "')"); - - PartitionSpec spec = PartitionSpec.builderFor(schema) - .year("year_field") - .hour("hour_field") - .truncate("truncate_field", 2) - .bucket("bucket_field", 2) - .identity("identity_field") - .build(); - - Table table = testTables.loadTable(identifier); - Assert.assertEquals(spec, table.spec()); - - shell.executeStatement("ALTER TABLE default.part_test SET PARTITION SPEC(identity_field)"); - - spec = PartitionSpec.builderFor(schema) - .withSpecId(1) - .alwaysNull("year_field", "year_field_year") - .alwaysNull("hour_field", "hour_field_hour") - .alwaysNull("truncate_field", "truncate_field_trunc") - .alwaysNull("bucket_field", "bucket_field_bucket") - .identity("identity_field") - .build(); - - table.refresh(); - Assert.assertEquals(spec, table.spec()); - } - @Test public void testSetPartitionTransform() { Schema schema = new Schema( - optional(1, "id", Types.LongType.get()), - optional(2, "year_field", Types.DateType.get()), - optional(3, "month_field", Types.TimestampType.withZone()), - optional(4, "day_field", Types.TimestampType.withoutZone()), - optional(5, "hour_field", Types.TimestampType.withoutZone()), - optional(6, "truncate_field", Types.StringType.get()), - optional(7, "bucket_field", Types.StringType.get()), - optional(8, "identity_field", Types.StringType.get()) + optional(1, "id", Types.LongType.get()), + optional(2, "year_field", Types.DateType.get()), + optional(3, "month_field", Types.TimestampType.withZone()), + optional(4, "day_field", Types.TimestampType.withoutZone()), + optional(5, "hour_field", Types.TimestampType.withoutZone()), + optional(6, "truncate_field", Types.StringType.get()), + optional(7, "bucket_field", Types.StringType.get()), + optional(8, "identity_field", Types.StringType.get()) ); TableIdentifier identifier = TableIdentifier.of("default", "part_test"); shell.executeStatement("CREATE EXTERNAL TABLE " + identifier + - " PARTITIONED BY SPEC (year(year_field), hour(hour_field), " + - "truncate(2, truncate_field), bucket(2, bucket_field), identity_field)" + - " STORED BY ICEBERG " + - testTables.locationForCreateTableSQL(identifier) + - "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" + - SchemaParser.toJson(schema) + "', " + - "'" + InputFormatConfig.CATALOG_NAME + "'='" + testTables.catalogName() + "')"); + " PARTITIONED BY SPEC (year(year_field), hour(hour_field), " + + "truncate(2, truncate_field), bucket(2, bucket_field), identity_field)" + + " STORED BY ICEBERG " + + testTables.locationForCreateTableSQL(identifier) + + "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" + + SchemaParser.toJson(schema) + "', " + + "'" + InputFormatConfig.CATALOG_NAME + "'='" + testTables.catalogName() + "')"); PartitionSpec spec = PartitionSpec.builderFor(schema) - .year("year_field") - .hour("hour_field") - .truncate("truncate_field", 2) - .bucket("bucket_field", 2) - .identity("identity_field") - .build(); + .year("year_field") + .hour("hour_field") + .truncate("truncate_field", 2) + .bucket("bucket_field", 2) + .identity("identity_field") + .build(); Table table = testTables.loadTable(identifier); + Assert.assertEquals(spec.specId(), table.spec().specId()); Assert.assertEquals(spec, table.spec()); shell.executeStatement("ALTER TABLE default.part_test SET PARTITION SPEC(year(year_field), month(month_field), " + - "day(day_field))"); + "day(day_field))"); spec = PartitionSpec.builderFor(schema) - .withSpecId(1) - .year("year_field") - .alwaysNull("hour_field", "hour_field_hour") - .alwaysNull("truncate_field", "truncate_field_trunc") - .alwaysNull("bucket_field", "bucket_field_bucket") - .alwaysNull("identity_field", "identity_field") - .month("month_field") - .day("day_field") - .build(); + .withSpecId(1) + .year("year_field") + .alwaysNull("hour_field", "hour_field_hour") + .alwaysNull("truncate_field", "truncate_field_trunc") + .alwaysNull("bucket_field", "bucket_field_bucket") + .alwaysNull("identity_field", "identity_field") + .month("month_field") + .day("day_field") + .build(); table.refresh(); - Assert.assertEquals(spec, table.spec()); + Assert.assertEquals(spec.specId(), table.spec().specId()); + + for (PartitionField field : + spec.fields()) { + Assert.assertTrue(field.name(), table.spec().fields().stream().anyMatch( + tableField -> tableField.name().equals(field.name()))); + } } @Test diff --git a/iceberg/patched-iceberg-core/pom.xml b/iceberg/patched-iceberg-core/pom.xml index ec1bdc39afc4..82f44a7ee4fc 100644 --- a/iceberg/patched-iceberg-core/pom.xml +++ b/iceberg/patched-iceberg-core/pom.xml @@ -76,6 +76,7 @@ ${project.build.directory}/classes **/HadoopInputFile.class + **/BaseUpdatePartitionSpec.class diff --git a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java new file mode 100644 index 000000000000..aac7b403af0d --- /dev/null +++ b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java @@ -0,0 +1,552 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.expressions.BoundReference; +import org.apache.iceberg.expressions.BoundTerm; +import org.apache.iceberg.expressions.BoundTransform; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.Term; +import org.apache.iceberg.expressions.UnboundTerm; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.transforms.PartitionSpecVisitor; +import org.apache.iceberg.transforms.Transform; +import org.apache.iceberg.transforms.Transforms; +import org.apache.iceberg.transforms.UnknownTransform; +import org.apache.iceberg.util.Pair; + +public class BaseUpdatePartitionSpec implements UpdatePartitionSpec { + private final TableOperations ops; + private final TableMetadata base; + private final int formatVersion; + private final PartitionSpec spec; + private final Schema schema; + private final Map nameToField; + private final Map, PartitionField> transformToField; + + private final List adds = Lists.newArrayList(); + private final Map addedTimeFields = Maps.newHashMap(); + private final Map, PartitionField> transformToAddedField = + Maps.newHashMap(); + private final Map nameToAddedField = Maps.newHashMap(); + private final Set deletes = Sets.newHashSet(); + private final Map renames = Maps.newHashMap(); + + private boolean caseSensitive; + private int lastAssignedPartitionId; + + BaseUpdatePartitionSpec(TableOperations ops) { + this.ops = ops; + this.caseSensitive = true; + this.base = ops.current(); + this.formatVersion = base.formatVersion(); + this.spec = base.spec(); + this.schema = spec.schema(); + this.nameToField = indexSpecByName(spec); + this.transformToField = indexSpecByTransform(spec); + this.lastAssignedPartitionId = base.lastAssignedPartitionId(); + + spec.fields().stream() + .filter(field -> field.transform() instanceof UnknownTransform) + .findAny() + .ifPresent( + field -> { + throw new IllegalArgumentException( + "Cannot update partition spec with unknown transform: " + field); + }); + } + + /** + * For testing only. + */ + @VisibleForTesting + BaseUpdatePartitionSpec(int formatVersion, PartitionSpec spec) { + this(formatVersion, spec, spec.lastAssignedFieldId()); + } + + /** + * For testing only. + */ + @VisibleForTesting + BaseUpdatePartitionSpec(int formatVersion, PartitionSpec spec, int lastAssignedPartitionId) { + this.ops = null; + this.base = null; + this.formatVersion = formatVersion; + this.caseSensitive = true; + this.spec = spec; + this.schema = spec.schema(); + this.nameToField = indexSpecByName(spec); + this.transformToField = indexSpecByTransform(spec); + this.lastAssignedPartitionId = lastAssignedPartitionId; + } + + private int assignFieldId() { + this.lastAssignedPartitionId += 1; + return lastAssignedPartitionId; + } + + /** + * In V2 it searches for a similar partition field in historical partition specs. Tries to match + * on source field ID, transform type and target name (optional). If not found or in V1 cases it + * creates a new PartitionField. + * + * @param sourceTransform pair of source ID and transform for this PartitionField addition + * @param name target partition field name, if specified + * @return the recycled or newly created partition field + */ + private PartitionField recycleOrCreatePartitionField( + Pair> sourceTransform, String name) { + if (formatVersion == 2 && base != null) { + int sourceId = sourceTransform.first(); + Transform transform = sourceTransform.second(); + + Set allHistoricalFields = Sets.newHashSet(); + for (PartitionSpec partitionSpec : base.specs()) { + allHistoricalFields.addAll(partitionSpec.fields()); + } + + for (PartitionField field : allHistoricalFields) { + if (field.sourceId() == sourceId && field.transform().equals(transform)) { + // if target name is specified then consider it too, otherwise not + if (name == null || field.name().equals(name)) { + return field; + } + } + } + } + return new PartitionField( + sourceTransform.first(), assignFieldId(), name, sourceTransform.second()); + } + + @Override + public UpdatePartitionSpec caseSensitive(boolean isCaseSensitive) { + this.caseSensitive = isCaseSensitive; + return this; + } + + @Override + public BaseUpdatePartitionSpec addField(String sourceName) { + return addField(Expressions.ref(sourceName)); + } + + @Override + public BaseUpdatePartitionSpec addField(Term term) { + return addField(null, term); + } + + private BaseUpdatePartitionSpec rewriteDeleteAndAddField( + PartitionField existing, String name, Pair> sourceTransform) { + deletes.remove(existing.fieldId()); + if (name == null || existing.name().equals(name)) { + return this; + } else { + return renameField(existing.name(), name); + } + } + + @Override + public BaseUpdatePartitionSpec addField(String name, Term term) { + PartitionField alreadyAdded = nameToAddedField.get(name); + Preconditions.checkArgument( + alreadyAdded == null, "Cannot add duplicate partition field: %s", alreadyAdded); + + Pair> sourceTransform = resolve(term); + Pair validationKey = + Pair.of(sourceTransform.first(), sourceTransform.second().toString()); + + PartitionField existing = transformToField.get(validationKey); + if (existing != null && + deletes.contains(existing.fieldId()) && + existing.transform().equals(sourceTransform.second())) { + return rewriteDeleteAndAddField(existing, name, sourceTransform); + } + + Preconditions.checkArgument( + existing == null || + (deletes.contains(existing.fieldId()) && + !existing.name().equals(name)), +// !existing.transform().toString().equals(sourceTransform.second().toString())), + "Cannot add duplicate partition field %s=%s, conflicts with lofasz %s", + name, + term, + existing); + + PartitionField added = transformToAddedField.get(validationKey); + Preconditions.checkArgument( + added == null, + "Cannot add duplicate partition field %s=%s, already added: %s", + name, + term, + added); + + PartitionField newField = recycleOrCreatePartitionField(sourceTransform, name); + if (newField.name() == null) { + String partitionName = + PartitionSpecVisitor.visit(schema, newField, PartitionNameGenerator.INSTANCE); + newField = + new PartitionField( + newField.sourceId(), newField.fieldId(), partitionName, newField.transform()); + } + + checkForRedundantAddedPartitions(newField); + transformToAddedField.put(validationKey, newField); + + PartitionField existingField = nameToField.get(newField.name()); + if (existingField != null && !deletes.contains(existingField.fieldId())) { + if (isVoidTransform(existingField)) { + // rename the old deleted field that is being replaced by the new field + renameField(existingField.name(), existingField.name() + "_" + existingField.fieldId()); + } else { + throw new IllegalArgumentException( + String.format("Cannot add duplicate partition field name: %s", name)); + } + } else if (existingField != null && deletes.contains(existingField.fieldId())) { + renames.put(existingField.name(), existingField.name() + "_" + existingField.fieldId()); + } + + nameToAddedField.put(newField.name(), newField); + + adds.add(newField); + + return this; + } + + @Override + public BaseUpdatePartitionSpec removeField(String name) { + PartitionField alreadyAdded = nameToAddedField.get(name); + Preconditions.checkArgument( + alreadyAdded == null, "Cannot delete newly added field: %s", alreadyAdded); + + Preconditions.checkArgument( + renames.get(name) == null, "Cannot rename and delete partition field: %s", name); + + PartitionField field = nameToField.get(name); + Preconditions.checkArgument(field != null, "Cannot find partition field to remove: %s", name); + + deletes.add(field.fieldId()); + + return this; + } + + @Override + public BaseUpdatePartitionSpec removeField(Term term) { + Pair> sourceTransform = resolve(term); + Pair key = + Pair.of(sourceTransform.first(), sourceTransform.second().toString()); + + PartitionField added = transformToAddedField.get(key); + Preconditions.checkArgument(added == null, "Cannot delete newly added field: %s", added); + + PartitionField field = transformToField.get(key); + Preconditions.checkArgument(field != null, "Cannot find partition field to remove: %s", term); + Preconditions.checkArgument( + renames.get(field.name()) == null, + "Cannot rename and delete partition field: %s", + field.name()); + + deletes.add(field.fieldId()); + + return this; + } + + @Override + public BaseUpdatePartitionSpec renameField(String name, String newName) { + PartitionField existingField = nameToField.get(newName); + if (existingField != null && isVoidTransform(existingField)) { + // rename the old deleted field that is being replaced by the new field + renameField(existingField.name(), existingField.name() + "_" + existingField.fieldId()); + } + + PartitionField added = nameToAddedField.get(name); + Preconditions.checkArgument( + added == null, "Cannot rename newly added partition field: %s", name); + + PartitionField field = nameToField.get(name); + Preconditions.checkArgument(field != null, "Cannot find partition field to rename: %s", name); + Preconditions.checkArgument( + !deletes.contains(field.fieldId()), "Cannot delete and rename partition field: %s", name); + + renames.put(name, newName); + + return this; + } + + @Override + public PartitionSpec apply() { + PartitionSpec.Builder builder = PartitionSpec.builderFor(schema); + + for (PartitionField field : spec.fields()) { + if (!deletes.contains(field.fieldId())) { + String newName = renames.get(field.name()); + if (newName != null) { + builder.add(field.sourceId(), field.fieldId(), newName, field.transform()); + } else { + builder.add(field.sourceId(), field.fieldId(), field.name(), field.transform()); + } + } else if (formatVersion < 2) { + // field IDs were not required for v1 and were assigned sequentially in each partition spec + // starting at 1,000. + // to maintain consistent field ids across partition specs in v1 tables, any partition field + // that is removed + // must be replaced with a null transform. null values are always allowed in partition data. + String newName = renames.get(field.name()); + if (newName != null) { + builder.add(field.sourceId(), field.fieldId(), newName, Transforms.alwaysNull()); + } else { + builder.add(field.sourceId(), field.fieldId(), field.name(), Transforms.alwaysNull()); + } + } + } + + for (PartitionField newField : adds) { + builder.add(newField.sourceId(), newField.fieldId(), newField.name(), newField.transform()); + } + + return builder.build(); + } + + @Override + public void commit() { + TableMetadata update = base.updatePartitionSpec(apply()); + ops.commit(base, update); + } + + private Pair> resolve(Term term) { + Preconditions.checkArgument(term instanceof UnboundTerm, "Term must be unbound"); + + BoundTerm boundTerm = ((UnboundTerm) term).bind(schema.asStruct(), caseSensitive); + int sourceId = boundTerm.ref().fieldId(); + Transform transform = toTransform(boundTerm); + + return Pair.of(sourceId, transform); + } + + private Transform toTransform(BoundTerm term) { + if (term instanceof BoundReference) { + return Transforms.identity(term.type()); + } else if (term instanceof BoundTransform) { + return ((BoundTransform) term).transform(); + } else { + throw new ValidationException( + "Invalid term: %s, expected either a bound reference or transform", term); + } + } + + private void checkForRedundantAddedPartitions(PartitionField field) { + if (isTimeTransform(field)) { + PartitionField timeField = addedTimeFields.get(field.sourceId()); + Preconditions.checkArgument( + timeField == null, + "Cannot add redundant partition field: %s conflicts with %s", + timeField, + field); + addedTimeFields.put(field.sourceId(), field); + } + } + + private static Map indexSpecByName(PartitionSpec spec) { + ImmutableMap.Builder builder = ImmutableMap.builder(); + List fields = spec.fields(); + for (PartitionField field : fields) { + builder.put(field.name(), field); + } + + return builder.build(); + } + + private static Map, PartitionField> indexSpecByTransform( + PartitionSpec spec) { + Map, PartitionField> indexSpecs = Maps.newHashMap(); + List fields = spec.fields(); + for (PartitionField field : fields) { + indexSpecs.put(Pair.of(field.sourceId(), field.transform().toString()), field); + } + + return indexSpecs; + } + + private boolean isTimeTransform(PartitionField field) { + return PartitionSpecVisitor.visit(schema, field, IsTimeTransform.INSTANCE); + } + + private static class IsTimeTransform implements PartitionSpecVisitor { + private static final IsTimeTransform INSTANCE = new IsTimeTransform(); + + private IsTimeTransform() { + } + + @Override + public Boolean identity(int fieldId, String sourceName, int sourceId) { + return false; + } + + @Override + public Boolean bucket(int fieldId, String sourceName, int sourceId, int numBuckets) { + return false; + } + + @Override + public Boolean truncate(int fieldId, String sourceName, int sourceId, int width) { + return false; + } + + @Override + public Boolean year(int fieldId, String sourceName, int sourceId) { + return true; + } + + @Override + public Boolean month(int fieldId, String sourceName, int sourceId) { + return true; + } + + @Override + public Boolean day(int fieldId, String sourceName, int sourceId) { + return true; + } + + @Override + public Boolean hour(int fieldId, String sourceName, int sourceId) { + return true; + } + + @Override + public Boolean alwaysNull(int fieldId, String sourceName, int sourceId) { + return false; + } + + @Override + public Boolean unknown(int fieldId, String sourceName, int sourceId, String transform) { + return false; + } + } + + private boolean isVoidTransform(PartitionField field) { + return PartitionSpecVisitor.visit(schema, field, IsVoidTransform.INSTANCE); + } + + private static class IsVoidTransform implements PartitionSpecVisitor { + private static final IsVoidTransform INSTANCE = new IsVoidTransform(); + + private IsVoidTransform() { + } + + @Override + public Boolean identity(int fieldId, String sourceName, int sourceId) { + return false; + } + + @Override + public Boolean bucket(int fieldId, String sourceName, int sourceId, int numBuckets) { + return false; + } + + @Override + public Boolean truncate(int fieldId, String sourceName, int sourceId, int width) { + return false; + } + + @Override + public Boolean year(int fieldId, String sourceName, int sourceId) { + return false; + } + + @Override + public Boolean month(int fieldId, String sourceName, int sourceId) { + return false; + } + + @Override + public Boolean day(int fieldId, String sourceName, int sourceId) { + return false; + } + + @Override + public Boolean hour(int fieldId, String sourceName, int sourceId) { + return false; + } + + @Override + public Boolean alwaysNull(int fieldId, String sourceName, int sourceId) { + return true; + } + + @Override + public Boolean unknown(int fieldId, String sourceName, int sourceId, String transform) { + return false; + } + } + + private static class PartitionNameGenerator implements PartitionSpecVisitor { + private static final PartitionNameGenerator INSTANCE = new PartitionNameGenerator(); + + private PartitionNameGenerator() { + } + + @Override + public String identity(int fieldId, String sourceName, int sourceId) { + return sourceName; + } + + @Override + public String bucket(int fieldId, String sourceName, int sourceId, int numBuckets) { + return sourceName + "_bucket_" + numBuckets; + } + + @Override + public String truncate(int fieldId, String sourceName, int sourceId, int width) { + return sourceName + "_trunc_" + width; + } + + @Override + public String year(int fieldId, String sourceName, int sourceId) { + return sourceName + "_year"; + } + + @Override + public String month(int fieldId, String sourceName, int sourceId) { + return sourceName + "_month"; + } + + @Override + public String day(int fieldId, String sourceName, int sourceId) { + return sourceName + "_day"; + } + + @Override + public String hour(int fieldId, String sourceName, int sourceId) { + return sourceName + "_hour"; + } + + @Override + public String alwaysNull(int fieldId, String sourceName, int sourceId) { + return sourceName + "_null"; + } + } +} From 6215288bf69c6b5b6125632786822650341d4644 Mon Sep 17 00:00:00 2001 From: Denys Kuzmenko Date: Tue, 24 Jan 2023 01:08:52 +0100 Subject: [PATCH 09/12] API: Fix Transform backward compatibility in PartitionSpec --- .../java/org/apache/iceberg/BaseUpdatePartitionSpec.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java index aac7b403af0d..0ff41bf150de 100644 --- a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java +++ b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java @@ -39,6 +39,7 @@ import org.apache.iceberg.transforms.Transform; import org.apache.iceberg.transforms.Transforms; import org.apache.iceberg.transforms.UnknownTransform; +import org.apache.iceberg.types.Type; import org.apache.iceberg.util.Pair; public class BaseUpdatePartitionSpec implements UpdatePartitionSpec { @@ -344,6 +345,12 @@ public void commit() { int sourceId = boundTerm.ref().fieldId(); Transform transform = toTransform(boundTerm); + Type fieldType = schema.findType(sourceId); + if (fieldType != null) { + transform = Transforms.fromString(fieldType, transform.toString()); + } else { + transform = Transforms.fromString(transform.toString()); + } return Pair.of(sourceId, transform); } From c13294b4c63c457c5b152919c64629960f12a8ea Mon Sep 17 00:00:00 2001 From: Zsolt Miskolczi Date: Sun, 15 Jan 2023 22:23:37 +0100 Subject: [PATCH 10/12] Fix failing qtest --- .../results/positive/describe_iceberg_metadata_tables.q.out | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/iceberg/iceberg-handler/src/test/results/positive/describe_iceberg_metadata_tables.q.out b/iceberg/iceberg-handler/src/test/results/positive/describe_iceberg_metadata_tables.q.out index bb64e68cd494..656f0a0954d6 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/describe_iceberg_metadata_tables.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/describe_iceberg_metadata_tables.q.out @@ -80,6 +80,7 @@ POSTHOOK: Input: default@ice_meta_desc status int snapshot_id bigint sequence_number bigint +file_sequence_number bigint data_file struct,value_counts:map,null_value_counts:map,nan_value_counts:map,lower_bounds:map,upper_bounds:map,key_metadata:binary,split_offsets:array,equality_ids:array,sort_order_id:int> PREHOOK: query: describe default.ice_meta_desc.history PREHOOK: type: DESCTABLE @@ -178,6 +179,7 @@ POSTHOOK: Input: default@ice_meta_desc status int snapshot_id bigint sequence_number bigint +file_sequence_number bigint data_file struct,value_counts:map,null_value_counts:map,nan_value_counts:map,lower_bounds:map,upper_bounds:map,key_metadata:binary,split_offsets:array,equality_ids:array,sort_order_id:int> PREHOOK: query: describe formatted default.ice_meta_desc.files PREHOOK: type: DESCTABLE @@ -211,6 +213,7 @@ POSTHOOK: Input: default@ice_meta_desc status int snapshot_id bigint sequence_number bigint +file_sequence_number bigint data_file struct,value_counts:map,null_value_counts:map,nan_value_counts:map,lower_bounds:map,upper_bounds:map,key_metadata:binary,split_offsets:array,equality_ids:array,sort_order_id:int> PREHOOK: query: describe formatted default.ice_meta_desc.history PREHOOK: type: DESCTABLE @@ -316,6 +319,7 @@ POSTHOOK: Input: default@ice_meta_desc status int snapshot_id bigint sequence_number bigint +file_sequence_number bigint data_file struct,value_counts:map,null_value_counts:map,nan_value_counts:map,lower_bounds:map,upper_bounds:map,key_metadata:binary,split_offsets:array,equality_ids:array,sort_order_id:int> PREHOOK: query: describe extended default.ice_meta_desc.files PREHOOK: type: DESCTABLE @@ -347,6 +351,7 @@ POSTHOOK: Input: default@ice_meta_desc status int snapshot_id bigint sequence_number bigint +file_sequence_number bigint data_file struct,value_counts:map,null_value_counts:map,nan_value_counts:map,lower_bounds:map,upper_bounds:map,key_metadata:binary,split_offsets:array,equality_ids:array,sort_order_id:int> PREHOOK: query: describe extended default.ice_meta_desc.history PREHOOK: type: DESCTABLE @@ -445,6 +450,7 @@ POSTHOOK: Input: default@ice_meta_desc status int snapshot_id bigint sequence_number bigint +file_sequence_number bigint data_file struct,value_counts:map,null_value_counts:map,nan_value_counts:map,lower_bounds:map,upper_bounds:map,key_metadata:binary,split_offsets:array,equality_ids:array,sort_order_id:int> PREHOOK: query: drop table ice_meta_desc PREHOOK: type: DROPTABLE From c3dec121d714a7605eeae49210d9f32133c797ae Mon Sep 17 00:00:00 2001 From: Zsolt Miskolczi Date: Tue, 31 Jan 2023 16:07:36 +0100 Subject: [PATCH 11/12] Update with most recent changes --- .../iceberg/BaseUpdatePartitionSpec.java | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java index 0ff41bf150de..f448f5314443 100644 --- a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java +++ b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java @@ -42,7 +42,7 @@ import org.apache.iceberg.types.Type; import org.apache.iceberg.util.Pair; -public class BaseUpdatePartitionSpec implements UpdatePartitionSpec { +class BaseUpdatePartitionSpec implements UpdatePartitionSpec { private final TableOperations ops; private final TableMetadata base; private final int formatVersion; @@ -83,17 +83,13 @@ public class BaseUpdatePartitionSpec implements UpdatePartitionSpec { }); } - /** - * For testing only. - */ + /** For testing only. */ @VisibleForTesting BaseUpdatePartitionSpec(int formatVersion, PartitionSpec spec) { this(formatVersion, spec, spec.lastAssignedFieldId()); } - /** - * For testing only. - */ + /** For testing only. */ @VisibleForTesting BaseUpdatePartitionSpec(int formatVersion, PartitionSpec spec, int lastAssignedPartitionId) { this.ops = null; @@ -118,7 +114,7 @@ private int assignFieldId() { * creates a new PartitionField. * * @param sourceTransform pair of source ID and transform for this PartitionField addition - * @param name target partition field name, if specified + * @param name target partition field name, if specified * @return the recycled or newly created partition field */ private PartitionField recycleOrCreatePartitionField( @@ -191,9 +187,8 @@ public BaseUpdatePartitionSpec addField(String name, Term term) { Preconditions.checkArgument( existing == null || (deletes.contains(existing.fieldId()) && - !existing.name().equals(name)), -// !existing.transform().toString().equals(sourceTransform.second().toString())), - "Cannot add duplicate partition field %s=%s, conflicts with lofasz %s", + !existing.transform().toString().equals(sourceTransform.second().toString())), + "Cannot add duplicate partition field %s=%s, conflicts with %s", name, term, existing); @@ -351,12 +346,13 @@ public void commit() { } else { transform = Transforms.fromString(transform.toString()); } + return Pair.of(sourceId, transform); } private Transform toTransform(BoundTerm term) { if (term instanceof BoundReference) { - return Transforms.identity(term.type()); + return Transforms.identity(); } else if (term instanceof BoundTransform) { return ((BoundTransform) term).transform(); } else { From 7b4e868d4fed48a5ec03d2d13e537b72b5803246 Mon Sep 17 00:00:00 2001 From: Prashant Singh <35593236+singhpk234@users.noreply.github.com> Date: Wed, 28 Sep 2022 23:01:42 +0530 Subject: [PATCH 12/12] Refresh HadoopInputFile with the most recent changes --- .../java/org/apache/iceberg/hadoop/HadoopInputFile.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/hadoop/HadoopInputFile.java b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/hadoop/HadoopInputFile.java index 34d2e691497a..bd310b4c2657 100644 --- a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/hadoop/HadoopInputFile.java +++ b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/hadoop/HadoopInputFile.java @@ -167,6 +167,8 @@ private FileStatus lazyStat() { if (stat == null) { try { this.stat = fs.getFileStatus(path); + } catch (FileNotFoundException e) { + throw new NotFoundException(e, "File does not exist: %s", path); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to get status for file: %s", path); } @@ -238,9 +240,9 @@ public String location() { @Override public boolean exists() { try { - return fs.exists(path); - } catch (IOException e) { - throw new RuntimeIOException(e, "Failed to check existence for file: %s", path); + return lazyStat() != null; + } catch (NotFoundException e) { + return false; } }