diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/data/DeleteReadTests.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/data/DeleteReadTests.java index ae42382f9723..d7f8fe84b527 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/data/DeleteReadTests.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/data/DeleteReadTests.java @@ -33,6 +33,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.ArrayUtil; +import org.apache.iceberg.util.CharSequenceSet; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.StructLikeSet; import org.apache.iceberg.util.StructProjection; @@ -193,7 +194,7 @@ public void testPositionDeletes() throws IOException { Pair.of(dataFile.path(), 6L) // id = 122 ); - Pair> posDeletes = FileHelpers.writeDeleteFile( + Pair posDeletes = FileHelpers.writeDeleteFile( table, Files.localOutput(temp.newFile()), Row.of(0), deletes); table.newRowDelta() @@ -225,7 +226,7 @@ public void testMixedPositionAndEqualityDeletes() throws IOException { Pair.of(dataFile.path(), 5L) // id = 121 ); - Pair> posDeletes = FileHelpers.writeDeleteFile( + Pair posDeletes = FileHelpers.writeDeleteFile( table, Files.localOutput(temp.newFile()), Row.of(0), deletes); table.newRowDelta() diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/data/FileHelpers.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/data/FileHelpers.java index 4b82fc234c5e..4919e6b060ea 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/data/FileHelpers.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/data/FileHelpers.java @@ -22,48 +22,53 @@ import java.io.Closeable; import java.io.IOException; import java.util.List; -import java.util.Set; +import java.util.Locale; +import java.util.Map; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; -import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.OutputFile; -import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.CharSequenceSet; import org.apache.iceberg.util.Pair; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; + public class FileHelpers { private FileHelpers() { } - public static Pair> writeDeleteFile(Table table, OutputFile out, - List> deletes) + public static Pair writeDeleteFile(Table table, OutputFile out, + List> deletes) throws IOException { return writeDeleteFile(table, out, null, deletes); } - public static Pair> writeDeleteFile(Table table, OutputFile out, StructLike partition, - List> deletes) + public static Pair writeDeleteFile(Table table, OutputFile out, StructLike partition, + List> deletes) throws IOException { - PositionDeleteWriter writer = Parquet.writeDeletes(out) - .withSpec(table.spec()) - .setAll(table.properties()) - .metricsConfig(MetricsConfig.forTable(table)) - .withPartition(partition) - .overwrite() - .buildPositionWriter(); + FileFormat format = defaultFormat(table.properties()); + FileAppenderFactory factory = new GenericAppenderFactory(table.schema(), table.spec()); + PositionDeleteWriter writer = + factory.newPosDeleteWriter(encrypt(out), format, partition); try (Closeable toClose = writer) { for (Pair delete : deletes) { - writer.delete(delete.first(), delete.second()); + PositionDelete posDelete = PositionDelete.create(); + writer.write(posDelete.set(delete.first(), delete.second(), null)); } } @@ -76,36 +81,32 @@ public static DeleteFile writeDeleteFile(Table table, OutputFile out, List deletes, Schema deleteRowSchema) throws IOException { - EqualityDeleteWriter writer = Parquet.writeDeletes(out) - .forTable(table) - .withPartition(partition) - .rowSchema(deleteRowSchema) - .createWriterFunc(GenericParquetWriter::buildWriter) - .overwrite() - .equalityFieldIds(deleteRowSchema.columns().stream().mapToInt(Types.NestedField::fieldId).toArray()) - .buildEqualityWriter(); + List deletes, Schema deleteRowSchema) + throws IOException { + FileFormat format = defaultFormat(table.properties()); + int[] equalityFieldIds = deleteRowSchema.columns().stream().mapToInt(Types.NestedField::fieldId).toArray(); + FileAppenderFactory factory = new GenericAppenderFactory(table.schema(), table.spec(), + equalityFieldIds, deleteRowSchema, null); + EqualityDeleteWriter writer = factory.newEqDeleteWriter(encrypt(out), format, partition); try (Closeable toClose = writer) { - writer.deleteAll(deletes); + writer.write(deletes); } return writer.toDeleteFile(); } public static DataFile writeDataFile(Table table, OutputFile out, List rows) throws IOException { - FileAppender writer = Parquet.write(out) - .createWriterFunc(GenericParquetWriter::buildWriter) - .schema(table.schema()) - .overwrite() - .build(); + FileFormat format = defaultFormat(table.properties()); + GenericAppenderFactory factory = new GenericAppenderFactory(table.schema()); + FileAppender writer = factory.newAppender(out, format); try (Closeable toClose = writer) { writer.addAll(rows); } return DataFiles.builder(table.spec()) - .withFormat(FileFormat.PARQUET) + .withFormat(format) .withPath(out.location()) .withFileSizeInBytes(writer.length()) .withSplitOffsets(writer.splitOffsets()) @@ -115,18 +116,16 @@ public static DataFile writeDataFile(Table table, OutputFile out, List r public static DataFile writeDataFile(Table table, OutputFile out, StructLike partition, List rows) throws IOException { - FileAppender writer = Parquet.write(out) - .createWriterFunc(GenericParquetWriter::buildWriter) - .schema(table.schema()) - .overwrite() - .build(); + FileFormat format = defaultFormat(table.properties()); + GenericAppenderFactory factory = new GenericAppenderFactory(table.schema(), table.spec()); + FileAppender writer = factory.newAppender(out, format); try (Closeable toClose = writer) { writer.addAll(rows); } return DataFiles.builder(table.spec()) - .withFormat(FileFormat.PARQUET) + .withFormat(format) .withPath(out.location()) .withPartition(partition) .withFileSizeInBytes(writer.length()) @@ -134,4 +133,13 @@ public static DataFile writeDataFile(Table table, OutputFile out, StructLike par .withMetrics(writer.metrics()) .build(); } + + private static EncryptedOutputFile encrypt(OutputFile out) { + return EncryptedFiles.encryptedOutput(out, EncryptionKeyMetadata.EMPTY); + } + + private static FileFormat defaultFormat(Map properties) { + String formatString = properties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT); + return FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH)); + } } diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java index dc68ce9a80be..1bb12189dc1d 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java @@ -353,7 +353,7 @@ public static DeleteFile createEqualityDeleteFile(Table table, String deleteFile part.partition(rowsToDelete.get(0)); EqualityDeleteWriter eqWriter = appenderFactory.newEqDeleteWriter(outputFile, fileFormat, part); try (EqualityDeleteWriter writer = eqWriter) { - writer.deleteAll(rowsToDelete); + writer.write(rowsToDelete); } return eqWriter.toDeleteFile(); } @@ -386,7 +386,11 @@ public static DeleteFile createPositionalDeleteFile(Table table, String deleteFi PositionDeleteWriter posWriter = appenderFactory.newPosDeleteWriter(outputFile, fileFormat, partitionKey); try (PositionDeleteWriter writer = posWriter) { - deletes.forEach(del -> writer.delete(del.path(), del.pos(), del.row())); + deletes.forEach(del -> { + PositionDelete positionDelete = PositionDelete.create(); + positionDelete.set(del.path(), del.pos(), del.row()); + writer.write(positionDelete); + }); } return posWriter.toDeleteFile(); } diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java index b00017726b16..97a4678bae5c 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java @@ -247,6 +247,7 @@ public void testSetPartitionTransform() { .build(); Table table = testTables.loadTable(identifier); + Assert.assertEquals(spec.specId(), table.spec().specId()); Assert.assertEquals(spec, table.spec()); shell.executeStatement("ALTER TABLE default.part_test SET PARTITION SPEC(year(year_field), month(month_field), " + @@ -264,7 +265,13 @@ public void testSetPartitionTransform() { .build(); table.refresh(); - Assert.assertEquals(spec, table.spec()); + Assert.assertEquals(spec.specId(), table.spec().specId()); + + for (PartitionField field : + spec.fields()) { + Assert.assertTrue(field.name(), table.spec().fields().stream().anyMatch( + tableField -> tableField.name().equals(field.name()))); + } } @Test diff --git a/iceberg/iceberg-handler/src/test/results/positive/describe_iceberg_metadata_tables.q.out b/iceberg/iceberg-handler/src/test/results/positive/describe_iceberg_metadata_tables.q.out index bb64e68cd494..656f0a0954d6 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/describe_iceberg_metadata_tables.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/describe_iceberg_metadata_tables.q.out @@ -80,6 +80,7 @@ POSTHOOK: Input: default@ice_meta_desc status int snapshot_id bigint sequence_number bigint +file_sequence_number bigint data_file struct,value_counts:map,null_value_counts:map,nan_value_counts:map,lower_bounds:map,upper_bounds:map,key_metadata:binary,split_offsets:array,equality_ids:array,sort_order_id:int> PREHOOK: query: describe default.ice_meta_desc.history PREHOOK: type: DESCTABLE @@ -178,6 +179,7 @@ POSTHOOK: Input: default@ice_meta_desc status int snapshot_id bigint sequence_number bigint +file_sequence_number bigint data_file struct,value_counts:map,null_value_counts:map,nan_value_counts:map,lower_bounds:map,upper_bounds:map,key_metadata:binary,split_offsets:array,equality_ids:array,sort_order_id:int> PREHOOK: query: describe formatted default.ice_meta_desc.files PREHOOK: type: DESCTABLE @@ -211,6 +213,7 @@ POSTHOOK: Input: default@ice_meta_desc status int snapshot_id bigint sequence_number bigint +file_sequence_number bigint data_file struct,value_counts:map,null_value_counts:map,nan_value_counts:map,lower_bounds:map,upper_bounds:map,key_metadata:binary,split_offsets:array,equality_ids:array,sort_order_id:int> PREHOOK: query: describe formatted default.ice_meta_desc.history PREHOOK: type: DESCTABLE @@ -316,6 +319,7 @@ POSTHOOK: Input: default@ice_meta_desc status int snapshot_id bigint sequence_number bigint +file_sequence_number bigint data_file struct,value_counts:map,null_value_counts:map,nan_value_counts:map,lower_bounds:map,upper_bounds:map,key_metadata:binary,split_offsets:array,equality_ids:array,sort_order_id:int> PREHOOK: query: describe extended default.ice_meta_desc.files PREHOOK: type: DESCTABLE @@ -347,6 +351,7 @@ POSTHOOK: Input: default@ice_meta_desc status int snapshot_id bigint sequence_number bigint +file_sequence_number bigint data_file struct,value_counts:map,null_value_counts:map,nan_value_counts:map,lower_bounds:map,upper_bounds:map,key_metadata:binary,split_offsets:array,equality_ids:array,sort_order_id:int> PREHOOK: query: describe extended default.ice_meta_desc.history PREHOOK: type: DESCTABLE @@ -445,6 +450,7 @@ POSTHOOK: Input: default@ice_meta_desc status int snapshot_id bigint sequence_number bigint +file_sequence_number bigint data_file struct,value_counts:map,null_value_counts:map,nan_value_counts:map,lower_bounds:map,upper_bounds:map,key_metadata:binary,split_offsets:array,equality_ids:array,sort_order_id:int> PREHOOK: query: drop table ice_meta_desc PREHOOK: type: DROPTABLE diff --git a/iceberg/patched-iceberg-core/pom.xml b/iceberg/patched-iceberg-core/pom.xml index ec1bdc39afc4..82f44a7ee4fc 100644 --- a/iceberg/patched-iceberg-core/pom.xml +++ b/iceberg/patched-iceberg-core/pom.xml @@ -76,6 +76,7 @@ ${project.build.directory}/classes **/HadoopInputFile.class + **/BaseUpdatePartitionSpec.class diff --git a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java new file mode 100644 index 000000000000..f448f5314443 --- /dev/null +++ b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java @@ -0,0 +1,555 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.expressions.BoundReference; +import org.apache.iceberg.expressions.BoundTerm; +import org.apache.iceberg.expressions.BoundTransform; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.Term; +import org.apache.iceberg.expressions.UnboundTerm; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.transforms.PartitionSpecVisitor; +import org.apache.iceberg.transforms.Transform; +import org.apache.iceberg.transforms.Transforms; +import org.apache.iceberg.transforms.UnknownTransform; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.util.Pair; + +class BaseUpdatePartitionSpec implements UpdatePartitionSpec { + private final TableOperations ops; + private final TableMetadata base; + private final int formatVersion; + private final PartitionSpec spec; + private final Schema schema; + private final Map nameToField; + private final Map, PartitionField> transformToField; + + private final List adds = Lists.newArrayList(); + private final Map addedTimeFields = Maps.newHashMap(); + private final Map, PartitionField> transformToAddedField = + Maps.newHashMap(); + private final Map nameToAddedField = Maps.newHashMap(); + private final Set deletes = Sets.newHashSet(); + private final Map renames = Maps.newHashMap(); + + private boolean caseSensitive; + private int lastAssignedPartitionId; + + BaseUpdatePartitionSpec(TableOperations ops) { + this.ops = ops; + this.caseSensitive = true; + this.base = ops.current(); + this.formatVersion = base.formatVersion(); + this.spec = base.spec(); + this.schema = spec.schema(); + this.nameToField = indexSpecByName(spec); + this.transformToField = indexSpecByTransform(spec); + this.lastAssignedPartitionId = base.lastAssignedPartitionId(); + + spec.fields().stream() + .filter(field -> field.transform() instanceof UnknownTransform) + .findAny() + .ifPresent( + field -> { + throw new IllegalArgumentException( + "Cannot update partition spec with unknown transform: " + field); + }); + } + + /** For testing only. */ + @VisibleForTesting + BaseUpdatePartitionSpec(int formatVersion, PartitionSpec spec) { + this(formatVersion, spec, spec.lastAssignedFieldId()); + } + + /** For testing only. */ + @VisibleForTesting + BaseUpdatePartitionSpec(int formatVersion, PartitionSpec spec, int lastAssignedPartitionId) { + this.ops = null; + this.base = null; + this.formatVersion = formatVersion; + this.caseSensitive = true; + this.spec = spec; + this.schema = spec.schema(); + this.nameToField = indexSpecByName(spec); + this.transformToField = indexSpecByTransform(spec); + this.lastAssignedPartitionId = lastAssignedPartitionId; + } + + private int assignFieldId() { + this.lastAssignedPartitionId += 1; + return lastAssignedPartitionId; + } + + /** + * In V2 it searches for a similar partition field in historical partition specs. Tries to match + * on source field ID, transform type and target name (optional). If not found or in V1 cases it + * creates a new PartitionField. + * + * @param sourceTransform pair of source ID and transform for this PartitionField addition + * @param name target partition field name, if specified + * @return the recycled or newly created partition field + */ + private PartitionField recycleOrCreatePartitionField( + Pair> sourceTransform, String name) { + if (formatVersion == 2 && base != null) { + int sourceId = sourceTransform.first(); + Transform transform = sourceTransform.second(); + + Set allHistoricalFields = Sets.newHashSet(); + for (PartitionSpec partitionSpec : base.specs()) { + allHistoricalFields.addAll(partitionSpec.fields()); + } + + for (PartitionField field : allHistoricalFields) { + if (field.sourceId() == sourceId && field.transform().equals(transform)) { + // if target name is specified then consider it too, otherwise not + if (name == null || field.name().equals(name)) { + return field; + } + } + } + } + return new PartitionField( + sourceTransform.first(), assignFieldId(), name, sourceTransform.second()); + } + + @Override + public UpdatePartitionSpec caseSensitive(boolean isCaseSensitive) { + this.caseSensitive = isCaseSensitive; + return this; + } + + @Override + public BaseUpdatePartitionSpec addField(String sourceName) { + return addField(Expressions.ref(sourceName)); + } + + @Override + public BaseUpdatePartitionSpec addField(Term term) { + return addField(null, term); + } + + private BaseUpdatePartitionSpec rewriteDeleteAndAddField( + PartitionField existing, String name, Pair> sourceTransform) { + deletes.remove(existing.fieldId()); + if (name == null || existing.name().equals(name)) { + return this; + } else { + return renameField(existing.name(), name); + } + } + + @Override + public BaseUpdatePartitionSpec addField(String name, Term term) { + PartitionField alreadyAdded = nameToAddedField.get(name); + Preconditions.checkArgument( + alreadyAdded == null, "Cannot add duplicate partition field: %s", alreadyAdded); + + Pair> sourceTransform = resolve(term); + Pair validationKey = + Pair.of(sourceTransform.first(), sourceTransform.second().toString()); + + PartitionField existing = transformToField.get(validationKey); + if (existing != null && + deletes.contains(existing.fieldId()) && + existing.transform().equals(sourceTransform.second())) { + return rewriteDeleteAndAddField(existing, name, sourceTransform); + } + + Preconditions.checkArgument( + existing == null || + (deletes.contains(existing.fieldId()) && + !existing.transform().toString().equals(sourceTransform.second().toString())), + "Cannot add duplicate partition field %s=%s, conflicts with %s", + name, + term, + existing); + + PartitionField added = transformToAddedField.get(validationKey); + Preconditions.checkArgument( + added == null, + "Cannot add duplicate partition field %s=%s, already added: %s", + name, + term, + added); + + PartitionField newField = recycleOrCreatePartitionField(sourceTransform, name); + if (newField.name() == null) { + String partitionName = + PartitionSpecVisitor.visit(schema, newField, PartitionNameGenerator.INSTANCE); + newField = + new PartitionField( + newField.sourceId(), newField.fieldId(), partitionName, newField.transform()); + } + + checkForRedundantAddedPartitions(newField); + transformToAddedField.put(validationKey, newField); + + PartitionField existingField = nameToField.get(newField.name()); + if (existingField != null && !deletes.contains(existingField.fieldId())) { + if (isVoidTransform(existingField)) { + // rename the old deleted field that is being replaced by the new field + renameField(existingField.name(), existingField.name() + "_" + existingField.fieldId()); + } else { + throw new IllegalArgumentException( + String.format("Cannot add duplicate partition field name: %s", name)); + } + } else if (existingField != null && deletes.contains(existingField.fieldId())) { + renames.put(existingField.name(), existingField.name() + "_" + existingField.fieldId()); + } + + nameToAddedField.put(newField.name(), newField); + + adds.add(newField); + + return this; + } + + @Override + public BaseUpdatePartitionSpec removeField(String name) { + PartitionField alreadyAdded = nameToAddedField.get(name); + Preconditions.checkArgument( + alreadyAdded == null, "Cannot delete newly added field: %s", alreadyAdded); + + Preconditions.checkArgument( + renames.get(name) == null, "Cannot rename and delete partition field: %s", name); + + PartitionField field = nameToField.get(name); + Preconditions.checkArgument(field != null, "Cannot find partition field to remove: %s", name); + + deletes.add(field.fieldId()); + + return this; + } + + @Override + public BaseUpdatePartitionSpec removeField(Term term) { + Pair> sourceTransform = resolve(term); + Pair key = + Pair.of(sourceTransform.first(), sourceTransform.second().toString()); + + PartitionField added = transformToAddedField.get(key); + Preconditions.checkArgument(added == null, "Cannot delete newly added field: %s", added); + + PartitionField field = transformToField.get(key); + Preconditions.checkArgument(field != null, "Cannot find partition field to remove: %s", term); + Preconditions.checkArgument( + renames.get(field.name()) == null, + "Cannot rename and delete partition field: %s", + field.name()); + + deletes.add(field.fieldId()); + + return this; + } + + @Override + public BaseUpdatePartitionSpec renameField(String name, String newName) { + PartitionField existingField = nameToField.get(newName); + if (existingField != null && isVoidTransform(existingField)) { + // rename the old deleted field that is being replaced by the new field + renameField(existingField.name(), existingField.name() + "_" + existingField.fieldId()); + } + + PartitionField added = nameToAddedField.get(name); + Preconditions.checkArgument( + added == null, "Cannot rename newly added partition field: %s", name); + + PartitionField field = nameToField.get(name); + Preconditions.checkArgument(field != null, "Cannot find partition field to rename: %s", name); + Preconditions.checkArgument( + !deletes.contains(field.fieldId()), "Cannot delete and rename partition field: %s", name); + + renames.put(name, newName); + + return this; + } + + @Override + public PartitionSpec apply() { + PartitionSpec.Builder builder = PartitionSpec.builderFor(schema); + + for (PartitionField field : spec.fields()) { + if (!deletes.contains(field.fieldId())) { + String newName = renames.get(field.name()); + if (newName != null) { + builder.add(field.sourceId(), field.fieldId(), newName, field.transform()); + } else { + builder.add(field.sourceId(), field.fieldId(), field.name(), field.transform()); + } + } else if (formatVersion < 2) { + // field IDs were not required for v1 and were assigned sequentially in each partition spec + // starting at 1,000. + // to maintain consistent field ids across partition specs in v1 tables, any partition field + // that is removed + // must be replaced with a null transform. null values are always allowed in partition data. + String newName = renames.get(field.name()); + if (newName != null) { + builder.add(field.sourceId(), field.fieldId(), newName, Transforms.alwaysNull()); + } else { + builder.add(field.sourceId(), field.fieldId(), field.name(), Transforms.alwaysNull()); + } + } + } + + for (PartitionField newField : adds) { + builder.add(newField.sourceId(), newField.fieldId(), newField.name(), newField.transform()); + } + + return builder.build(); + } + + @Override + public void commit() { + TableMetadata update = base.updatePartitionSpec(apply()); + ops.commit(base, update); + } + + private Pair> resolve(Term term) { + Preconditions.checkArgument(term instanceof UnboundTerm, "Term must be unbound"); + + BoundTerm boundTerm = ((UnboundTerm) term).bind(schema.asStruct(), caseSensitive); + int sourceId = boundTerm.ref().fieldId(); + Transform transform = toTransform(boundTerm); + + Type fieldType = schema.findType(sourceId); + if (fieldType != null) { + transform = Transforms.fromString(fieldType, transform.toString()); + } else { + transform = Transforms.fromString(transform.toString()); + } + + return Pair.of(sourceId, transform); + } + + private Transform toTransform(BoundTerm term) { + if (term instanceof BoundReference) { + return Transforms.identity(); + } else if (term instanceof BoundTransform) { + return ((BoundTransform) term).transform(); + } else { + throw new ValidationException( + "Invalid term: %s, expected either a bound reference or transform", term); + } + } + + private void checkForRedundantAddedPartitions(PartitionField field) { + if (isTimeTransform(field)) { + PartitionField timeField = addedTimeFields.get(field.sourceId()); + Preconditions.checkArgument( + timeField == null, + "Cannot add redundant partition field: %s conflicts with %s", + timeField, + field); + addedTimeFields.put(field.sourceId(), field); + } + } + + private static Map indexSpecByName(PartitionSpec spec) { + ImmutableMap.Builder builder = ImmutableMap.builder(); + List fields = spec.fields(); + for (PartitionField field : fields) { + builder.put(field.name(), field); + } + + return builder.build(); + } + + private static Map, PartitionField> indexSpecByTransform( + PartitionSpec spec) { + Map, PartitionField> indexSpecs = Maps.newHashMap(); + List fields = spec.fields(); + for (PartitionField field : fields) { + indexSpecs.put(Pair.of(field.sourceId(), field.transform().toString()), field); + } + + return indexSpecs; + } + + private boolean isTimeTransform(PartitionField field) { + return PartitionSpecVisitor.visit(schema, field, IsTimeTransform.INSTANCE); + } + + private static class IsTimeTransform implements PartitionSpecVisitor { + private static final IsTimeTransform INSTANCE = new IsTimeTransform(); + + private IsTimeTransform() { + } + + @Override + public Boolean identity(int fieldId, String sourceName, int sourceId) { + return false; + } + + @Override + public Boolean bucket(int fieldId, String sourceName, int sourceId, int numBuckets) { + return false; + } + + @Override + public Boolean truncate(int fieldId, String sourceName, int sourceId, int width) { + return false; + } + + @Override + public Boolean year(int fieldId, String sourceName, int sourceId) { + return true; + } + + @Override + public Boolean month(int fieldId, String sourceName, int sourceId) { + return true; + } + + @Override + public Boolean day(int fieldId, String sourceName, int sourceId) { + return true; + } + + @Override + public Boolean hour(int fieldId, String sourceName, int sourceId) { + return true; + } + + @Override + public Boolean alwaysNull(int fieldId, String sourceName, int sourceId) { + return false; + } + + @Override + public Boolean unknown(int fieldId, String sourceName, int sourceId, String transform) { + return false; + } + } + + private boolean isVoidTransform(PartitionField field) { + return PartitionSpecVisitor.visit(schema, field, IsVoidTransform.INSTANCE); + } + + private static class IsVoidTransform implements PartitionSpecVisitor { + private static final IsVoidTransform INSTANCE = new IsVoidTransform(); + + private IsVoidTransform() { + } + + @Override + public Boolean identity(int fieldId, String sourceName, int sourceId) { + return false; + } + + @Override + public Boolean bucket(int fieldId, String sourceName, int sourceId, int numBuckets) { + return false; + } + + @Override + public Boolean truncate(int fieldId, String sourceName, int sourceId, int width) { + return false; + } + + @Override + public Boolean year(int fieldId, String sourceName, int sourceId) { + return false; + } + + @Override + public Boolean month(int fieldId, String sourceName, int sourceId) { + return false; + } + + @Override + public Boolean day(int fieldId, String sourceName, int sourceId) { + return false; + } + + @Override + public Boolean hour(int fieldId, String sourceName, int sourceId) { + return false; + } + + @Override + public Boolean alwaysNull(int fieldId, String sourceName, int sourceId) { + return true; + } + + @Override + public Boolean unknown(int fieldId, String sourceName, int sourceId, String transform) { + return false; + } + } + + private static class PartitionNameGenerator implements PartitionSpecVisitor { + private static final PartitionNameGenerator INSTANCE = new PartitionNameGenerator(); + + private PartitionNameGenerator() { + } + + @Override + public String identity(int fieldId, String sourceName, int sourceId) { + return sourceName; + } + + @Override + public String bucket(int fieldId, String sourceName, int sourceId, int numBuckets) { + return sourceName + "_bucket_" + numBuckets; + } + + @Override + public String truncate(int fieldId, String sourceName, int sourceId, int width) { + return sourceName + "_trunc_" + width; + } + + @Override + public String year(int fieldId, String sourceName, int sourceId) { + return sourceName + "_year"; + } + + @Override + public String month(int fieldId, String sourceName, int sourceId) { + return sourceName + "_month"; + } + + @Override + public String day(int fieldId, String sourceName, int sourceId) { + return sourceName + "_day"; + } + + @Override + public String hour(int fieldId, String sourceName, int sourceId) { + return sourceName + "_hour"; + } + + @Override + public String alwaysNull(int fieldId, String sourceName, int sourceId) { + return sourceName + "_null"; + } + } +} diff --git a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/hadoop/HadoopInputFile.java b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/hadoop/HadoopInputFile.java index 34d2e691497a..bd310b4c2657 100644 --- a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/hadoop/HadoopInputFile.java +++ b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/hadoop/HadoopInputFile.java @@ -167,6 +167,8 @@ private FileStatus lazyStat() { if (stat == null) { try { this.stat = fs.getFileStatus(path); + } catch (FileNotFoundException e) { + throw new NotFoundException(e, "File does not exist: %s", path); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to get status for file: %s", path); } @@ -238,9 +240,9 @@ public String location() { @Override public boolean exists() { try { - return fs.exists(path); - } catch (IOException e) { - throw new RuntimeIOException(e, "Failed to check existence for file: %s", path); + return lazyStat() != null; + } catch (NotFoundException e) { + return false; } } diff --git a/iceberg/pom.xml b/iceberg/pom.xml index 348bf5b64f4d..f6426e605989 100644 --- a/iceberg/pom.xml +++ b/iceberg/pom.xml @@ -25,7 +25,7 @@ .. . - 1.0.0 + 1.1.0 4.0.2 3.4.4 1.9.2