Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ArrayUtil;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.StructLikeSet;
import org.apache.iceberg.util.StructProjection;
Expand Down Expand Up @@ -193,7 +194,7 @@ public void testPositionDeletes() throws IOException {
Pair.of(dataFile.path(), 6L) // id = 122
);

Pair<DeleteFile, Set<CharSequence>> posDeletes = FileHelpers.writeDeleteFile(
Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
table, Files.localOutput(temp.newFile()), Row.of(0), deletes);

table.newRowDelta()
Expand Down Expand Up @@ -225,7 +226,7 @@ public void testMixedPositionAndEqualityDeletes() throws IOException {
Pair.of(dataFile.path(), 5L) // id = 121
);

Pair<DeleteFile, Set<CharSequence>> posDeletes = FileHelpers.writeDeleteFile(
Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
table, Files.localOutput(temp.newFile()), Row.of(0), deletes);

table.newRowDelta()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,48 +22,53 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.Locale;
import java.util.Map;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.Pair;

import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;

public class FileHelpers {
private FileHelpers() {
}

public static Pair<DeleteFile, Set<CharSequence>> writeDeleteFile(Table table, OutputFile out,
List<Pair<CharSequence, Long>> deletes)
public static Pair<DeleteFile, CharSequenceSet> writeDeleteFile(Table table, OutputFile out,
List<Pair<CharSequence, Long>> deletes)
throws IOException {
return writeDeleteFile(table, out, null, deletes);
}

public static Pair<DeleteFile, Set<CharSequence>> writeDeleteFile(Table table, OutputFile out, StructLike partition,
List<Pair<CharSequence, Long>> deletes)
public static Pair<DeleteFile, CharSequenceSet> writeDeleteFile(Table table, OutputFile out, StructLike partition,
List<Pair<CharSequence, Long>> deletes)
throws IOException {
PositionDeleteWriter<?> writer = Parquet.writeDeletes(out)
.withSpec(table.spec())
.setAll(table.properties())
.metricsConfig(MetricsConfig.forTable(table))
.withPartition(partition)
.overwrite()
.buildPositionWriter();
FileFormat format = defaultFormat(table.properties());
FileAppenderFactory<Record> factory = new GenericAppenderFactory(table.schema(), table.spec());

PositionDeleteWriter<Record> writer =
factory.newPosDeleteWriter(encrypt(out), format, partition);
try (Closeable toClose = writer) {
for (Pair<CharSequence, Long> delete : deletes) {
writer.delete(delete.first(), delete.second());
PositionDelete<Record> posDelete = PositionDelete.create();
writer.write(posDelete.set(delete.first(), delete.second(), null));
}
}

Expand All @@ -76,36 +81,32 @@ public static DeleteFile writeDeleteFile(Table table, OutputFile out, List<Recor
}

public static DeleteFile writeDeleteFile(Table table, OutputFile out, StructLike partition,
List<Record> deletes, Schema deleteRowSchema) throws IOException {
EqualityDeleteWriter<Record> writer = Parquet.writeDeletes(out)
.forTable(table)
.withPartition(partition)
.rowSchema(deleteRowSchema)
.createWriterFunc(GenericParquetWriter::buildWriter)
.overwrite()
.equalityFieldIds(deleteRowSchema.columns().stream().mapToInt(Types.NestedField::fieldId).toArray())
.buildEqualityWriter();
List<Record> deletes, Schema deleteRowSchema)
throws IOException {
FileFormat format = defaultFormat(table.properties());
int[] equalityFieldIds = deleteRowSchema.columns().stream().mapToInt(Types.NestedField::fieldId).toArray();
FileAppenderFactory<Record> factory = new GenericAppenderFactory(table.schema(), table.spec(),
equalityFieldIds, deleteRowSchema, null);

EqualityDeleteWriter<Record> writer = factory.newEqDeleteWriter(encrypt(out), format, partition);
try (Closeable toClose = writer) {
writer.deleteAll(deletes);
writer.write(deletes);
}

return writer.toDeleteFile();
}

public static DataFile writeDataFile(Table table, OutputFile out, List<Record> rows) throws IOException {
FileAppender<Record> writer = Parquet.write(out)
.createWriterFunc(GenericParquetWriter::buildWriter)
.schema(table.schema())
.overwrite()
.build();
FileFormat format = defaultFormat(table.properties());
GenericAppenderFactory factory = new GenericAppenderFactory(table.schema());

FileAppender<Record> writer = factory.newAppender(out, format);
try (Closeable toClose = writer) {
writer.addAll(rows);
}

return DataFiles.builder(table.spec())
.withFormat(FileFormat.PARQUET)
.withFormat(format)
.withPath(out.location())
.withFileSizeInBytes(writer.length())
.withSplitOffsets(writer.splitOffsets())
Expand All @@ -115,23 +116,30 @@ public static DataFile writeDataFile(Table table, OutputFile out, List<Record> r

public static DataFile writeDataFile(Table table, OutputFile out, StructLike partition, List<Record> rows)
throws IOException {
FileAppender<Record> writer = Parquet.write(out)
.createWriterFunc(GenericParquetWriter::buildWriter)
.schema(table.schema())
.overwrite()
.build();
FileFormat format = defaultFormat(table.properties());
GenericAppenderFactory factory = new GenericAppenderFactory(table.schema(), table.spec());

FileAppender<Record> writer = factory.newAppender(out, format);
try (Closeable toClose = writer) {
writer.addAll(rows);
}

return DataFiles.builder(table.spec())
.withFormat(FileFormat.PARQUET)
.withFormat(format)
.withPath(out.location())
.withPartition(partition)
.withFileSizeInBytes(writer.length())
.withSplitOffsets(writer.splitOffsets())
.withMetrics(writer.metrics())
.build();
}

private static EncryptedOutputFile encrypt(OutputFile out) {
return EncryptedFiles.encryptedOutput(out, EncryptionKeyMetadata.EMPTY);
}

private static FileFormat defaultFormat(Map<String, String> properties) {
String formatString = properties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT);
return FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ public static DeleteFile createEqualityDeleteFile(Table table, String deleteFile
part.partition(rowsToDelete.get(0));
EqualityDeleteWriter<Record> eqWriter = appenderFactory.newEqDeleteWriter(outputFile, fileFormat, part);
try (EqualityDeleteWriter<Record> writer = eqWriter) {
writer.deleteAll(rowsToDelete);
writer.write(rowsToDelete);
}
return eqWriter.toDeleteFile();
}
Expand Down Expand Up @@ -386,7 +386,11 @@ public static DeleteFile createPositionalDeleteFile(Table table, String deleteFi

PositionDeleteWriter<Record> posWriter = appenderFactory.newPosDeleteWriter(outputFile, fileFormat, partitionKey);
try (PositionDeleteWriter<Record> writer = posWriter) {
deletes.forEach(del -> writer.delete(del.path(), del.pos(), del.row()));
deletes.forEach(del -> {
PositionDelete<Record> positionDelete = PositionDelete.create();
positionDelete.set(del.path(), del.pos(), del.row());
writer.write(positionDelete);
});
}
return posWriter.toDeleteFile();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ public void testSetPartitionTransform() {
.build();

Table table = testTables.loadTable(identifier);
Assert.assertEquals(spec.specId(), table.spec().specId());
Assert.assertEquals(spec, table.spec());

shell.executeStatement("ALTER TABLE default.part_test SET PARTITION SPEC(year(year_field), month(month_field), " +
Expand All @@ -264,7 +265,13 @@ public void testSetPartitionTransform() {
.build();

table.refresh();
Assert.assertEquals(spec, table.spec());
Assert.assertEquals(spec.specId(), table.spec().specId());

for (PartitionField field :
spec.fields()) {
Assert.assertTrue(field.name(), table.spec().fields().stream().anyMatch(
tableField -> tableField.name().equals(field.name())));
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ POSTHOOK: Input: default@ice_meta_desc
status int
snapshot_id bigint
sequence_number bigint
file_sequence_number bigint
data_file struct<content:int,file_path:string,file_format:string,spec_id:int,record_count:bigint,file_size_in_bytes:bigint,column_sizes:map<int,bigint>,value_counts:map<int,bigint>,null_value_counts:map<int,bigint>,nan_value_counts:map<int,bigint>,lower_bounds:map<int,binary>,upper_bounds:map<int,binary>,key_metadata:binary,split_offsets:array<bigint>,equality_ids:array<int>,sort_order_id:int>
PREHOOK: query: describe default.ice_meta_desc.history
PREHOOK: type: DESCTABLE
Expand Down Expand Up @@ -178,6 +179,7 @@ POSTHOOK: Input: default@ice_meta_desc
status int
snapshot_id bigint
sequence_number bigint
file_sequence_number bigint
data_file struct<content:int,file_path:string,file_format:string,spec_id:int,record_count:bigint,file_size_in_bytes:bigint,column_sizes:map<int,bigint>,value_counts:map<int,bigint>,null_value_counts:map<int,bigint>,nan_value_counts:map<int,bigint>,lower_bounds:map<int,binary>,upper_bounds:map<int,binary>,key_metadata:binary,split_offsets:array<bigint>,equality_ids:array<int>,sort_order_id:int>
PREHOOK: query: describe formatted default.ice_meta_desc.files
PREHOOK: type: DESCTABLE
Expand Down Expand Up @@ -211,6 +213,7 @@ POSTHOOK: Input: default@ice_meta_desc
status int
snapshot_id bigint
sequence_number bigint
file_sequence_number bigint
data_file struct<content:int,file_path:string,file_format:string,spec_id:int,record_count:bigint,file_size_in_bytes:bigint,column_sizes:map<int,bigint>,value_counts:map<int,bigint>,null_value_counts:map<int,bigint>,nan_value_counts:map<int,bigint>,lower_bounds:map<int,binary>,upper_bounds:map<int,binary>,key_metadata:binary,split_offsets:array<bigint>,equality_ids:array<int>,sort_order_id:int>
PREHOOK: query: describe formatted default.ice_meta_desc.history
PREHOOK: type: DESCTABLE
Expand Down Expand Up @@ -316,6 +319,7 @@ POSTHOOK: Input: default@ice_meta_desc
status int
snapshot_id bigint
sequence_number bigint
file_sequence_number bigint
data_file struct<content:int,file_path:string,file_format:string,spec_id:int,record_count:bigint,file_size_in_bytes:bigint,column_sizes:map<int,bigint>,value_counts:map<int,bigint>,null_value_counts:map<int,bigint>,nan_value_counts:map<int,bigint>,lower_bounds:map<int,binary>,upper_bounds:map<int,binary>,key_metadata:binary,split_offsets:array<bigint>,equality_ids:array<int>,sort_order_id:int>
PREHOOK: query: describe extended default.ice_meta_desc.files
PREHOOK: type: DESCTABLE
Expand Down Expand Up @@ -347,6 +351,7 @@ POSTHOOK: Input: default@ice_meta_desc
status int
snapshot_id bigint
sequence_number bigint
file_sequence_number bigint
data_file struct<content:int,file_path:string,file_format:string,spec_id:int,record_count:bigint,file_size_in_bytes:bigint,column_sizes:map<int,bigint>,value_counts:map<int,bigint>,null_value_counts:map<int,bigint>,nan_value_counts:map<int,bigint>,lower_bounds:map<int,binary>,upper_bounds:map<int,binary>,key_metadata:binary,split_offsets:array<bigint>,equality_ids:array<int>,sort_order_id:int>
PREHOOK: query: describe extended default.ice_meta_desc.history
PREHOOK: type: DESCTABLE
Expand Down Expand Up @@ -445,6 +450,7 @@ POSTHOOK: Input: default@ice_meta_desc
status int
snapshot_id bigint
sequence_number bigint
file_sequence_number bigint
data_file struct<content:int,file_path:string,file_format:string,spec_id:int,record_count:bigint,file_size_in_bytes:bigint,column_sizes:map<int,bigint>,value_counts:map<int,bigint>,null_value_counts:map<int,bigint>,nan_value_counts:map<int,bigint>,lower_bounds:map<int,binary>,upper_bounds:map<int,binary>,key_metadata:binary,split_offsets:array<bigint>,equality_ids:array<int>,sort_order_id:int>
PREHOOK: query: drop table ice_meta_desc
PREHOOK: type: DROPTABLE
Expand Down
1 change: 1 addition & 0 deletions iceberg/patched-iceberg-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<excludes>
**/HadoopInputFile.class
**/BaseUpdatePartitionSpec.class
</excludes>
</artifactItem>
</artifactItems>
Expand Down
Loading