diff --git a/build.gradle b/build.gradle index 53a2bee15c79..d6a9403074d9 100644 --- a/build.gradle +++ b/build.gradle @@ -337,6 +337,7 @@ project(':iceberg-flink') { testCompile project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts') testCompile project(path: ':iceberg-api', configuration: 'testArtifacts') + testCompile project(path: ':iceberg-core', configuration: 'testArtifacts') testCompile project(path: ':iceberg-data', configuration: 'testArtifacts') // By default, hive-exec is a fat/uber jar and it exports a guava library @@ -766,6 +767,7 @@ project(':iceberg-spark') { } testCompile project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts') testCompile project(path: ':iceberg-api', configuration: 'testArtifacts') + testCompile project(path: ':iceberg-core', configuration: 'testArtifacts') testCompile project(path: ':iceberg-data', configuration: 'testArtifacts') } diff --git a/core/src/main/java/org/apache/iceberg/io/DataWriter.java b/core/src/main/java/org/apache/iceberg/io/DataWriter.java new file mode 100644 index 000000000000..3b01bdcdbc94 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/DataWriter.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class DataWriter implements Closeable { + private final FileAppender appender; + private final FileFormat format; + private final String location; + private final PartitionSpec spec; + private final StructLike partition; + private final ByteBuffer keyMetadata; + private DataFile dataFile = null; + + public DataWriter(FileAppender appender, FileFormat format, String location, + PartitionSpec spec, StructLike partition, EncryptionKeyMetadata keyMetadata) { + this.appender = appender; + this.format = format; + this.location = location; + this.spec = spec; + this.partition = partition; + this.keyMetadata = keyMetadata != null ? keyMetadata.buffer() : null; + } + + public void add(T row) { + appender.add(row); + } + + public long length() { + return appender.length(); + } + + @Override + public void close() throws IOException { + if (dataFile == null) { + appender.close(); + this.dataFile = DataFiles.builder(spec) + .withFormat(format) + .withPath(location) + .withPartition(partition) + .withEncryptionKeyMetadata(keyMetadata) + .withFileSizeInBytes(appender.length()) + .withMetrics(appender.metrics()) + .withSplitOffsets(appender.splitOffsets()) + .build(); + } + } + + public DataFile toDataFile() { + Preconditions.checkState(dataFile != null, "Cannot create data file from unclosed writer"); + return dataFile; + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/DeleteSchemaUtil.java b/core/src/main/java/org/apache/iceberg/io/DeleteSchemaUtil.java new file mode 100644 index 000000000000..466a2c01d76a --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/DeleteSchemaUtil.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Schema; +import org.apache.iceberg.types.Types; + +public class DeleteSchemaUtil { + private DeleteSchemaUtil() { + } + + private static Schema pathPosSchema(Schema rowSchema) { + return new Schema( + MetadataColumns.DELETE_FILE_PATH, + MetadataColumns.DELETE_FILE_POS, + Types.NestedField.required( + MetadataColumns.DELETE_FILE_ROW_FIELD_ID, "row", rowSchema.asStruct(), + MetadataColumns.DELETE_FILE_ROW_DOC)); + } + + public static Schema pathPosSchema() { + return new Schema( + MetadataColumns.DELETE_FILE_PATH, + MetadataColumns.DELETE_FILE_POS); + } + + public static Schema posDeleteSchema(Schema rowSchema) { + return rowSchema == null ? pathPosSchema() : pathPosSchema(rowSchema); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java b/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java index 9afdca460f0a..b093eab447fe 100644 --- a/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java +++ b/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java @@ -20,6 +20,10 @@ package org.apache.iceberg.io; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; /** * Factory to create a new {@link FileAppender} to write records. @@ -36,4 +40,34 @@ public interface FileAppenderFactory { * @return a newly created {@link FileAppender} */ FileAppender newAppender(OutputFile outputFile, FileFormat fileFormat); + + /** + * Create a new {@link DataWriter}. + * + * @param outputFile an OutputFile used to create an output stream. + * @param format a file format + * @param partition a tuple of partition values + * @return a newly created {@link DataWriter} for rows + */ + DataWriter newDataWriter(EncryptedOutputFile outputFile, FileFormat format, StructLike partition); + + /** + * Create a new {@link EqualityDeleteWriter}. + * + * @param outputFile an OutputFile used to create an output stream. + * @param format a file format + * @param partition a tuple of partition values + * @return a newly created {@link EqualityDeleteWriter} for equality deletes + */ + EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile outputFile, FileFormat format, StructLike partition); + + /** + * Create a new {@link PositionDeleteWriter}. + * + * @param outputFile an OutputFile used to create an output stream. + * @param format a file format + * @param partition a tuple of partition values + * @return a newly created {@link EqualityDeleteWriter} for position deletes + */ + PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile outputFile, FileFormat format, StructLike partition); } diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java index 6009db38b858..184534fcc042 100644 --- a/core/src/test/java/org/apache/iceberg/TableTestBase.java +++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java @@ -50,7 +50,7 @@ public class TableTestBase { ); // Partition spec used to create tables - static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA) + protected static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA) .bucket("data", 16) .build(); @@ -104,8 +104,8 @@ public class TableTestBase { @Rule public TemporaryFolder temp = new TemporaryFolder(); - File tableDir = null; - File metadataDir = null; + protected File tableDir = null; + protected File metadataDir = null; public TestTables.TestTable table = null; protected final int formatVersion; @@ -143,7 +143,7 @@ List listManifestFiles(File tableDirToList) { !name.startsWith("snap") && Files.getFileExtension(name).equalsIgnoreCase("avro"))); } - TestTables.TestTable create(Schema schema, PartitionSpec spec) { + protected TestTables.TestTable create(Schema schema, PartitionSpec spec) { return TestTables.create(tableDir, "test", schema, spec, formatVersion); } diff --git a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java index e5d2652bcb11..b9b77fd571b5 100644 --- a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java +++ b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java @@ -24,16 +24,22 @@ import java.util.Map; import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.data.avro.DataWriter; import org.apache.iceberg.data.orc.GenericOrcWriter; import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; /** @@ -42,10 +48,29 @@ public class GenericAppenderFactory implements FileAppenderFactory { private final Schema schema; + private final PartitionSpec spec; + private final int[] equalityFieldIds; + private final Schema eqDeleteRowSchema; + private final Schema posDeleteRowSchema; private final Map config = Maps.newHashMap(); public GenericAppenderFactory(Schema schema) { + this(schema, PartitionSpec.unpartitioned(), null, null, null); + } + + public GenericAppenderFactory(Schema schema, PartitionSpec spec) { + this(schema, spec, null, null, null); + } + + public GenericAppenderFactory(Schema schema, PartitionSpec spec, + int[] equalityFieldIds, + Schema eqDeleteRowSchema, + Schema posDeleteRowSchema) { this.schema = schema; + this.spec = spec; + this.equalityFieldIds = equalityFieldIds; + this.eqDeleteRowSchema = eqDeleteRowSchema; + this.posDeleteRowSchema = posDeleteRowSchema; } public GenericAppenderFactory set(String property, String value) { @@ -89,7 +114,97 @@ public FileAppender newAppender(OutputFile outputFile, FileFormat fileFo .build(); default: - throw new UnsupportedOperationException("Cannot write format: " + fileFormat); + throw new UnsupportedOperationException("Cannot write unknown file format: " + fileFormat); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public org.apache.iceberg.io.DataWriter newDataWriter(EncryptedOutputFile file, FileFormat format, + StructLike partition) { + return new org.apache.iceberg.io.DataWriter<>( + newAppender(file.encryptingOutputFile(), format), format, + file.encryptingOutputFile().location(), spec, partition, file.keyMetadata()); + } + + @Override + public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile file, FileFormat format, + StructLike partition) { + Preconditions.checkState(equalityFieldIds != null && equalityFieldIds.length > 0, + "Equality field ids shouldn't be null or empty when creating equality-delete writer"); + Preconditions.checkNotNull(eqDeleteRowSchema, + "Equality delete row schema shouldn't be null when creating equality-delete writer"); + + MetricsConfig metricsConfig = MetricsConfig.fromProperties(config); + try { + switch (format) { + case AVRO: + return Avro.writeDeletes(file.encryptingOutputFile()) + .createWriterFunc(DataWriter::create) + .withPartition(partition) + .overwrite() + .setAll(config) + .rowSchema(eqDeleteRowSchema) + .withSpec(spec) + .withKeyMetadata(file.keyMetadata()) + .equalityFieldIds(equalityFieldIds) + .buildEqualityWriter(); + + case PARQUET: + return Parquet.writeDeletes(file.encryptingOutputFile()) + .createWriterFunc(GenericParquetWriter::buildWriter) + .withPartition(partition) + .overwrite() + .setAll(config) + .metricsConfig(metricsConfig) + .rowSchema(eqDeleteRowSchema) + .withSpec(spec) + .withKeyMetadata(file.keyMetadata()) + .equalityFieldIds(equalityFieldIds) + .buildEqualityWriter(); + + default: + throw new UnsupportedOperationException( + "Cannot write equality-deletes for unsupported file format: " + format); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile file, FileFormat format, + StructLike partition) { + MetricsConfig metricsConfig = MetricsConfig.fromProperties(config); + try { + switch (format) { + case AVRO: + return Avro.writeDeletes(file.encryptingOutputFile()) + .createWriterFunc(DataWriter::create) + .withPartition(partition) + .overwrite() + .setAll(config) + .rowSchema(posDeleteRowSchema) + .withSpec(spec) + .withKeyMetadata(file.keyMetadata()) + .buildPositionWriter(); + + case PARQUET: + return Parquet.writeDeletes(file.encryptingOutputFile()) + .createWriterFunc(GenericParquetWriter::buildWriter) + .withPartition(partition) + .overwrite() + .setAll(config) + .metricsConfig(metricsConfig) + .rowSchema(posDeleteRowSchema) + .withSpec(spec) + .withKeyMetadata(file.keyMetadata()) + .buildPositionWriter(); + + default: + throw new UnsupportedOperationException("Cannot write pos-deletes for unsupported file format: " + format); } } catch (IOException e) { throw new UncheckedIOException(e); diff --git a/data/src/test/java/org/apache/iceberg/TestGenericAppenderFactory.java b/data/src/test/java/org/apache/iceberg/TestGenericAppenderFactory.java new file mode 100644 index 000000000000..60dcd633eda4 --- /dev/null +++ b/data/src/test/java/org/apache/iceberg/TestGenericAppenderFactory.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.util.List; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.TestAppenderFactory; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.util.ArrayUtil; +import org.apache.iceberg.util.StructLikeSet; + +public class TestGenericAppenderFactory extends TestAppenderFactory { + + private final GenericRecord gRecord; + + public TestGenericAppenderFactory(String fileFormat, boolean partitioned) { + super(fileFormat, partitioned); + this.gRecord = GenericRecord.create(SCHEMA); + } + + @Override + protected FileAppenderFactory createAppenderFactory(List equalityFieldIds, + Schema eqDeleteSchema, + Schema posDeleteRowSchema) { + return new GenericAppenderFactory(table.schema(), table.spec(), ArrayUtil.toIntArray(equalityFieldIds), + eqDeleteSchema, posDeleteRowSchema); + } + + @Override + protected Record createRow(Integer id, String data) { + return gRecord.copy(ImmutableMap.of("id", id, "data", data)); + } + + @Override + protected StructLikeSet expectedRowSet(Iterable records) { + StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); + records.forEach(set::add); + return set; + } +} diff --git a/data/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java b/data/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java new file mode 100644 index 000000000000..44ac5190eeef --- /dev/null +++ b/data/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.TableTestBase; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.IcebergGenerics; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.avro.DataReader; +import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.StructLikeSet; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public abstract class TestAppenderFactory extends TableTestBase { + private static final int FORMAT_V2 = 2; + + private final FileFormat format; + private final boolean partitioned; + + private PartitionKey partition = null; + private OutputFileFactory fileFactory = null; + + @Parameterized.Parameters(name = "FileFormat={0}, Partitioned={1}") + public static Object[] parameters() { + return new Object[][] { + new Object[] {"avro", false}, + new Object[] {"avro", true}, + new Object[] {"parquet", false}, + new Object[] {"parquet", true} + }; + } + + + public TestAppenderFactory(String fileFormat, boolean partitioned) { + super(FORMAT_V2); + this.format = FileFormat.valueOf(fileFormat.toUpperCase(Locale.ENGLISH)); + this.partitioned = partitioned; + } + + @Before + public void setupTable() throws Exception { + this.tableDir = temp.newFolder(); + Assert.assertTrue(tableDir.delete()); // created by table create + + this.metadataDir = new File(tableDir, "metadata"); + + if (partitioned) { + this.table = create(SCHEMA, SPEC); + } else { + this.table = create(SCHEMA, PartitionSpec.unpartitioned()); + } + this.partition = createPartitionKey(); + this.fileFactory = new OutputFileFactory(table.spec(), format, table.locationProvider(), table.io(), + table.encryption(), 1, 1); + + table.updateProperties() + .defaultFormat(format) + .commit(); + } + + protected abstract FileAppenderFactory createAppenderFactory(List equalityFieldIds, + Schema eqDeleteSchema, + Schema posDeleteRowSchema); + + protected abstract T createRow(Integer id, String data); + + protected abstract StructLikeSet expectedRowSet(Iterable records) throws IOException; + + private StructLikeSet actualRowSet(String... columns) throws IOException { + StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); + try (CloseableIterable reader = IcebergGenerics.read(table).select(columns).build()) { + reader.forEach(set::add); + } + return set; + } + + private PartitionKey createPartitionKey() { + if (table.spec().isUnpartitioned()) { + return null; + } + + Record record = GenericRecord.create(table.schema()).copy(ImmutableMap.of("data", "aaa")); + + PartitionKey partitionKey = new PartitionKey(table.spec(), table.schema()); + partitionKey.partition(record); + + return partitionKey; + } + + private EncryptedOutputFile createEncryptedOutputFile() { + if (partition == null) { + return fileFactory.newOutputFile(); + } else { + return fileFactory.newOutputFile(partition); + } + } + + private List testRowSet() { + return Lists.newArrayList( + createRow(1, "aaa"), + createRow(2, "bbb"), + createRow(3, "ccc"), + createRow(4, "ddd"), + createRow(5, "eee") + ); + } + + private DataFile prepareDataFile(List rowSet, FileAppenderFactory appenderFactory) throws IOException { + DataWriter writer = appenderFactory.newDataWriter(createEncryptedOutputFile(), format, partition); + try (DataWriter closeableWriter = writer) { + for (T row : rowSet) { + closeableWriter.add(row); + } + } + + return writer.toDataFile(); + } + + @Test + public void testDataWriter() throws IOException { + FileAppenderFactory appenderFactory = createAppenderFactory(null, null, null); + + List rowSet = testRowSet(); + DataFile dataFile = prepareDataFile(rowSet, appenderFactory); + + table.newRowDelta() + .addRows(dataFile) + .commit(); + + Assert.assertEquals("Should have the expected records.", expectedRowSet(rowSet), actualRowSet("*")); + } + + @Test + public void testEqDeleteWriter() throws IOException { + List equalityFieldIds = Lists.newArrayList(table.schema().findField("id").fieldId()); + Schema eqDeleteRowSchema = table.schema().select("id"); + FileAppenderFactory appenderFactory = createAppenderFactory(equalityFieldIds, eqDeleteRowSchema, null); + + List rowSet = testRowSet(); + DataFile dataFile = prepareDataFile(rowSet, appenderFactory); + + table.newRowDelta() + .addRows(dataFile) + .commit(); + + // The equality field is 'id'. No matter what the value of 'data' field is, we should delete the 1th, 3th, 5th + // rows. + List deletes = Lists.newArrayList( + createRow(1, "aaa"), + createRow(3, "bbb"), + createRow(5, "ccc") + ); + EncryptedOutputFile out = createEncryptedOutputFile(); + EqualityDeleteWriter eqDeleteWriter = appenderFactory.newEqDeleteWriter(out, format, partition); + try (EqualityDeleteWriter closeableWriter = eqDeleteWriter) { + closeableWriter.deleteAll(deletes); + } + + // Check that the delete equality file has the expected equality deletes. + GenericRecord gRecord = GenericRecord.create(eqDeleteRowSchema); + Set expectedDeletes = Sets.newHashSet( + gRecord.copy("id", 1), + gRecord.copy("id", 3), + gRecord.copy("id", 5) + ); + Assert.assertEquals(expectedDeletes, + Sets.newHashSet(createReader(eqDeleteRowSchema, out.encryptingOutputFile().toInputFile()))); + + table.newRowDelta() + .addDeletes(eqDeleteWriter.toDeleteFile()) + .commit(); + + List expected = Lists.newArrayList( + createRow(2, "bbb"), + createRow(4, "ddd") + ); + Assert.assertEquals("Should have the expected records", expectedRowSet(expected), actualRowSet("*")); + } + + @Test + public void testPosDeleteWriter() throws IOException { + // Initialize FileAppenderFactory without pos-delete row schema. + FileAppenderFactory appenderFactory = createAppenderFactory(null, null, null); + + List rowSet = testRowSet(); + DataFile dataFile = prepareDataFile(rowSet, appenderFactory); + + List> deletes = Lists.newArrayList( + Pair.of(dataFile.path(), 0L), + Pair.of(dataFile.path(), 2L), + Pair.of(dataFile.path(), 4L) + ); + + EncryptedOutputFile out = createEncryptedOutputFile(); + PositionDeleteWriter eqDeleteWriter = appenderFactory.newPosDeleteWriter(out, format, partition); + try (PositionDeleteWriter closeableWriter = eqDeleteWriter) { + for (Pair delete : deletes) { + closeableWriter.delete(delete.first(), delete.second()); + } + } + + // Check that the pos delete file has the expected pos deletes. + Schema pathPosSchema = DeleteSchemaUtil.pathPosSchema(); + GenericRecord gRecord = GenericRecord.create(pathPosSchema); + Set expectedDeletes = Sets.newHashSet( + gRecord.copy("file_path", dataFile.path(), "pos", 0L), + gRecord.copy("file_path", dataFile.path(), "pos", 2L), + gRecord.copy("file_path", dataFile.path(), "pos", 4L) + ); + Assert.assertEquals(expectedDeletes, + Sets.newHashSet(createReader(pathPosSchema, out.encryptingOutputFile().toInputFile()))); + + table.newRowDelta() + .addRows(dataFile) + .addDeletes(eqDeleteWriter.toDeleteFile()) + .validateDataFilesExist(eqDeleteWriter.referencedDataFiles()) + .validateDeletedFiles() + .commit(); + + List expected = Lists.newArrayList( + createRow(2, "bbb"), + createRow(4, "ddd") + ); + Assert.assertEquals("Should have the expected records", expectedRowSet(expected), actualRowSet("*")); + } + + @Test + public void testPosDeleteWriterWithRowSchema() throws IOException { + FileAppenderFactory appenderFactory = createAppenderFactory(null, null, table.schema()); + + List rowSet = testRowSet(); + DataFile dataFile = prepareDataFile(rowSet, appenderFactory); + + List> deletes = Lists.newArrayList( + new PositionDelete().set(dataFile.path(), 0, rowSet.get(0)), + new PositionDelete().set(dataFile.path(), 2, rowSet.get(2)), + new PositionDelete().set(dataFile.path(), 4, rowSet.get(4)) + ); + + EncryptedOutputFile out = createEncryptedOutputFile(); + PositionDeleteWriter eqDeleteWriter = appenderFactory.newPosDeleteWriter(out, format, partition); + try (PositionDeleteWriter closeableWriter = eqDeleteWriter) { + for (PositionDelete delete : deletes) { + closeableWriter.delete(delete.path(), delete.pos(), delete.row()); + } + } + + // Check that the pos delete file has the expected pos deletes. + Schema pathPosRowSchema = DeleteSchemaUtil.posDeleteSchema(table.schema()); + GenericRecord gRecord = GenericRecord.create(pathPosRowSchema); + GenericRecord rowRecord = GenericRecord.create(table.schema()); + Set expectedDeletes = Sets.newHashSet( + gRecord.copy("file_path", dataFile.path(), "pos", 0L, "row", rowRecord.copy("id", 1, "data", "aaa")), + gRecord.copy("file_path", dataFile.path(), "pos", 2L, "row", rowRecord.copy("id", 3, "data", "ccc")), + gRecord.copy("file_path", dataFile.path(), "pos", 4L, "row", rowRecord.copy("id", 5, "data", "eee")) + ); + Assert.assertEquals(expectedDeletes, + Sets.newHashSet(createReader(pathPosRowSchema, out.encryptingOutputFile().toInputFile()))); + + table.newRowDelta() + .addRows(dataFile) + .addDeletes(eqDeleteWriter.toDeleteFile()) + .validateDataFilesExist(eqDeleteWriter.referencedDataFiles()) + .validateDeletedFiles() + .commit(); + + List expected = Lists.newArrayList( + createRow(2, "bbb"), + createRow(4, "ddd") + ); + Assert.assertEquals("Should have the expected records", expectedRowSet(expected), actualRowSet("*")); + } + + private CloseableIterable createReader(Schema schema, InputFile inputFile) { + switch (format) { + case PARQUET: + return Parquet.read(inputFile) + .project(schema) + .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema)) + .build(); + + case AVRO: + return Avro.read(inputFile) + .project(schema) + .createReaderFunc(DataReader::create) + .build(); + + default: + throw new UnsupportedOperationException("Unsupported file format: " + format); + } + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java new file mode 100644 index 000000000000..1ddf0929fbdc --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.sink; + +import java.io.IOException; +import java.io.Serializable; +import java.io.UncheckedIOException; +import java.util.Map; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.data.FlinkAvroWriter; +import org.apache.iceberg.flink.data.FlinkOrcWriter; +import org.apache.iceberg.flink.data.FlinkParquetWriters; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.DeleteSchemaUtil; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class FlinkAppenderFactory implements FileAppenderFactory, Serializable { + private final Schema schema; + private final RowType flinkSchema; + private final Map props; + private final PartitionSpec spec; + private final int[] equalityFieldIds; + private final Schema eqDeleteRowSchema; + private final Schema posDeleteRowSchema; + + private RowType eqDeleteFlinkSchema = null; + private RowType posDeleteFlinkSchema = null; + + public FlinkAppenderFactory(Schema schema, RowType flinkSchema, Map props, PartitionSpec spec) { + this(schema, flinkSchema, props, spec, null, null, null); + } + + public FlinkAppenderFactory(Schema schema, RowType flinkSchema, Map props, + PartitionSpec spec, int[] equalityFieldIds, + Schema eqDeleteRowSchema, Schema posDeleteRowSchema) { + this.schema = schema; + this.flinkSchema = flinkSchema; + this.props = props; + this.spec = spec; + this.equalityFieldIds = equalityFieldIds; + this.eqDeleteRowSchema = eqDeleteRowSchema; + this.posDeleteRowSchema = posDeleteRowSchema; + } + + private RowType lazyEqDeleteFlinkSchema() { + if (eqDeleteFlinkSchema == null) { + Preconditions.checkNotNull(eqDeleteRowSchema, "Equality delete row schema shouldn't be null"); + this.eqDeleteFlinkSchema = FlinkSchemaUtil.convert(eqDeleteRowSchema); + } + return eqDeleteFlinkSchema; + } + + private RowType lazyPosDeleteFlinkSchema() { + if (posDeleteFlinkSchema == null) { + Preconditions.checkNotNull(posDeleteRowSchema, "Pos-delete row schema shouldn't be null"); + this.posDeleteFlinkSchema = FlinkSchemaUtil.convert(posDeleteRowSchema); + } + return this.posDeleteFlinkSchema; + } + + @Override + public FileAppender newAppender(OutputFile outputFile, FileFormat format) { + MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); + try { + switch (format) { + case AVRO: + return Avro.write(outputFile) + .createWriterFunc(ignore -> new FlinkAvroWriter(flinkSchema)) + .setAll(props) + .schema(schema) + .overwrite() + .build(); + + case ORC: + return ORC.write(outputFile) + .createWriterFunc((iSchema, typDesc) -> FlinkOrcWriter.buildWriter(flinkSchema, iSchema)) + .setAll(props) + .schema(schema) + .overwrite() + .build(); + + case PARQUET: + return Parquet.write(outputFile) + .createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(flinkSchema, msgType)) + .setAll(props) + .metricsConfig(metricsConfig) + .schema(schema) + .overwrite() + .build(); + + default: + throw new UnsupportedOperationException("Cannot write unknown file format: " + format); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public DataWriter newDataWriter(EncryptedOutputFile file, FileFormat format, StructLike partition) { + return new DataWriter<>( + newAppender(file.encryptingOutputFile(), format), format, + file.encryptingOutputFile().location(), spec, partition, file.keyMetadata()); + } + + @Override + public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile outputFile, FileFormat format, + StructLike partition) { + Preconditions.checkState(equalityFieldIds != null && equalityFieldIds.length > 0, + "Equality field ids shouldn't be null or empty when creating equality-delete writer"); + Preconditions.checkNotNull(eqDeleteRowSchema, + "Equality delete row schema shouldn't be null when creating equality-delete writer"); + + MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); + try { + switch (format) { + case AVRO: + return Avro.writeDeletes(outputFile.encryptingOutputFile()) + .createWriterFunc(ignore -> new FlinkAvroWriter(lazyEqDeleteFlinkSchema())) + .withPartition(partition) + .overwrite() + .setAll(props) + .rowSchema(eqDeleteRowSchema) + .withSpec(spec) + .withKeyMetadata(outputFile.keyMetadata()) + .equalityFieldIds(equalityFieldIds) + .buildEqualityWriter(); + + case PARQUET: + return Parquet.writeDeletes(outputFile.encryptingOutputFile()) + .createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(lazyEqDeleteFlinkSchema(), msgType)) + .withPartition(partition) + .overwrite() + .setAll(props) + .metricsConfig(metricsConfig) + .rowSchema(eqDeleteRowSchema) + .withSpec(spec) + .withKeyMetadata(outputFile.keyMetadata()) + .equalityFieldIds(equalityFieldIds) + .buildEqualityWriter(); + + default: + throw new UnsupportedOperationException( + "Cannot write equality-deletes for unsupported file format: " + format); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile outputFile, FileFormat format, + StructLike partition) { + MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); + try { + switch (format) { + case AVRO: + return Avro.writeDeletes(outputFile.encryptingOutputFile()) + .createWriterFunc(ignore -> new FlinkAvroWriter(lazyPosDeleteFlinkSchema())) + .withPartition(partition) + .overwrite() + .setAll(props) + .rowSchema(posDeleteRowSchema) + .withSpec(spec) + .withKeyMetadata(outputFile.keyMetadata()) + .buildPositionWriter(); + + case PARQUET: + RowType flinkPosDeleteSchema = FlinkSchemaUtil.convert(DeleteSchemaUtil.posDeleteSchema(posDeleteRowSchema)); + return Parquet.writeDeletes(outputFile.encryptingOutputFile()) + .createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(flinkPosDeleteSchema, msgType)) + .withPartition(partition) + .overwrite() + .setAll(props) + .metricsConfig(metricsConfig) + .rowSchema(posDeleteRowSchema) + .withSpec(spec) + .withKeyMetadata(outputFile.keyMetadata()) + .transformPaths(path -> StringData.fromString(path.toString())) + .buildPositionWriter(); + + default: + throw new UnsupportedOperationException("Cannot write pos-deletes for unsupported file format: " + format); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java b/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java index 72e7a7941b77..c409e81fbc13 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java @@ -19,33 +19,21 @@ package org.apache.iceberg.flink.sink; -import java.io.IOException; -import java.io.Serializable; -import java.io.UncheckedIOException; import java.util.Map; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; -import org.apache.iceberg.avro.Avro; import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.flink.RowDataWrapper; -import org.apache.iceberg.flink.data.FlinkAvroWriter; -import org.apache.iceberg.flink.data.FlinkOrcWriter; -import org.apache.iceberg.flink.data.FlinkParquetWriters; -import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.LocationProvider; -import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.io.UnpartitionedWriter; -import org.apache.iceberg.orc.ORC; -import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; public class RowDataTaskWriterFactory implements TaskWriterFactory { @@ -78,7 +66,7 @@ public RowDataTaskWriterFactory(Schema schema, this.encryptionManager = encryptionManager; this.targetFileSizeBytes = targetFileSizeBytes; this.format = format; - this.appenderFactory = new FlinkFileAppenderFactory(schema, flinkSchema, tableProperties); + this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, tableProperties, spec); } @Override @@ -118,54 +106,4 @@ protected PartitionKey partition(RowData row) { return partitionKey; } } - - public static class FlinkFileAppenderFactory implements FileAppenderFactory, Serializable { - private final Schema schema; - private final RowType flinkSchema; - private final Map props; - - public FlinkFileAppenderFactory(Schema schema, RowType flinkSchema, Map props) { - this.schema = schema; - this.flinkSchema = flinkSchema; - this.props = props; - } - - @Override - public FileAppender newAppender(OutputFile outputFile, FileFormat format) { - MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); - try { - switch (format) { - case AVRO: - return Avro.write(outputFile) - .createWriterFunc(ignore -> new FlinkAvroWriter(flinkSchema)) - .setAll(props) - .schema(schema) - .overwrite() - .build(); - - case ORC: - return ORC.write(outputFile) - .createWriterFunc((iSchema, typDesc) -> FlinkOrcWriter.buildWriter(flinkSchema, iSchema)) - .setAll(props) - .schema(schema) - .overwrite() - .build(); - - case PARQUET: - return Parquet.write(outputFile) - .createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(flinkSchema, msgType)) - .setAll(props) - .metricsConfig(metricsConfig) - .schema(schema) - .overwrite() - .build(); - - default: - throw new UnsupportedOperationException("Cannot write unknown file format: " + format); - } - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - } } diff --git a/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java index dc14fb683a91..c47e3e30850b 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java +++ b/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java @@ -39,7 +39,7 @@ import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.IcebergGenerics; import org.apache.iceberg.data.Record; -import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory; +import org.apache.iceberg.flink.sink.FlinkAppenderFactory; import org.apache.iceberg.hadoop.HadoopInputFile; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.CloseableIterable; @@ -102,8 +102,8 @@ public static DataFile writeFile(Schema schema, PartitionSpec spec, Configuratio Preconditions.checkNotNull(fileFormat, "Cannot determine format for file: %s", filename); RowType flinkSchema = FlinkSchemaUtil.convert(schema); - FileAppenderFactory appenderFactory = new RowDataTaskWriterFactory.FlinkFileAppenderFactory(schema, - flinkSchema, ImmutableMap.of()); + FileAppenderFactory appenderFactory = + new FlinkAppenderFactory(schema, flinkSchema, ImmutableMap.of(), spec); FileAppender appender = appenderFactory.newAppender(fromPath(path, conf), fileFormat); try (FileAppender closeableAppender = appender) { diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkAppenderFactory.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkAppenderFactory.java new file mode 100644 index 000000000000..8d7fa86eac50 --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkAppenderFactory.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.sink; + +import java.util.List; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.Schema; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.RowDataWrapper; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.TestAppenderFactory; +import org.apache.iceberg.util.ArrayUtil; +import org.apache.iceberg.util.StructLikeSet; + +public class TestFlinkAppenderFactory extends TestAppenderFactory { + + private final RowType rowType; + + public TestFlinkAppenderFactory(String fileFormat, boolean partitioned) { + super(fileFormat, partitioned); + this.rowType = FlinkSchemaUtil.convert(SCHEMA); + } + + @Override + protected FileAppenderFactory createAppenderFactory(List equalityFieldIds, + Schema eqDeleteSchema, + Schema posDeleteRowSchema) { + return new FlinkAppenderFactory(table.schema(), rowType, table.properties(), table.spec(), + ArrayUtil.toIntArray(equalityFieldIds), eqDeleteSchema, posDeleteRowSchema); + } + + @Override + protected RowData createRow(Integer id, String data) { + return SimpleDataUtil.createRowData(id, data); + } + + @Override + protected StructLikeSet expectedRowSet(Iterable rows) { + StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); + for (RowData row : rows) { + RowDataWrapper wrapper = new RowDataWrapper(rowType, table.schema().asStruct()); + set.add(wrapper.wrap(row)); + } + return set; + } +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java index b116faff23d1..cd0a89ba7c76 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java +++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java @@ -20,17 +20,18 @@ package org.apache.iceberg.flink.source; import java.io.IOException; -import java.util.HashMap; import java.util.List; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.TestMergingMetrics; import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.RowDataConverter; -import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory; +import org.apache.iceberg.flink.sink.FlinkAppenderFactory; import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; public class TestFlinkMergingMetrics extends TestMergingMetrics { @@ -39,8 +40,8 @@ protected FileAppender writeAndGetAppender(List records) throws RowType flinkSchema = FlinkSchemaUtil.convert(SCHEMA); FileAppender appender = - new RowDataTaskWriterFactory.FlinkFileAppenderFactory(SCHEMA, flinkSchema, new HashMap<>()).newAppender( - org.apache.iceberg.Files.localOutput(temp.newFile()), FileFormat.PARQUET); + new FlinkAppenderFactory(SCHEMA, flinkSchema, ImmutableMap.of(), PartitionSpec.unpartitioned()) + .newAppender(org.apache.iceberg.Files.localOutput(temp.newFile()), FileFormat.PARQUET); try (FileAppender fileAppender = appender) { records.stream().map(r -> RowDataConverter.convert(SCHEMA, r)).forEach(fileAppender::add); } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index fc35adc63f18..17e1275216e0 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -32,7 +32,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Files; -import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -49,6 +48,7 @@ import org.apache.iceberg.hadoop.HadoopInputFile; import org.apache.iceberg.hadoop.HadoopOutputFile; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.DeleteSchemaUtil; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; @@ -58,7 +58,6 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.util.ArrayUtil; import org.apache.parquet.HadoopReadOptions; import org.apache.parquet.ParquetReadOptions; @@ -281,6 +280,7 @@ public static class DeleteWriteBuilder { private StructLike partition = null; private EncryptionKeyMetadata keyMetadata = null; private int[] equalityFieldIds = null; + private Function pathTransformFunc = Function.identity(); private DeleteWriteBuilder(OutputFile file) { this.appenderBuilder = write(file); @@ -360,6 +360,11 @@ public DeleteWriteBuilder equalityFieldIds(int... fieldIds) { return this; } + public DeleteWriteBuilder transformPaths(Function newPathTransformFunc) { + this.pathTransformFunc = newPathTransformFunc; + return this; + } + public EqualityDeleteWriter buildEqualityWriter() throws IOException { Preconditions.checkState(rowSchema != null, "Cannot create equality delete file without a schema`"); Preconditions.checkState(equalityFieldIds != null, "Cannot create equality delete file without delete field ids"); @@ -379,7 +384,6 @@ public EqualityDeleteWriter buildEqualityWriter() throws IOException { appenderBuilder.build(), FileFormat.PARQUET, location, spec, partition, keyMetadata, equalityFieldIds); } - public PositionDeleteWriter buildPositionWriter() throws IOException { Preconditions.checkState(equalityFieldIds == null, "Cannot create position delete file using delete field ids"); @@ -387,29 +391,23 @@ public PositionDeleteWriter buildPositionWriter() throws IOException { if (rowSchema != null && createWriterFunc != null) { // the appender uses the row schema wrapped with position fields - appenderBuilder.schema(new org.apache.iceberg.Schema( - MetadataColumns.DELETE_FILE_PATH, - MetadataColumns.DELETE_FILE_POS, - NestedField.optional( - MetadataColumns.DELETE_FILE_ROW_FIELD_ID, "row", rowSchema.asStruct(), - MetadataColumns.DELETE_FILE_ROW_DOC))); + appenderBuilder.schema(DeleteSchemaUtil.posDeleteSchema(rowSchema)); appenderBuilder.createWriterFunc(parquetSchema -> { ParquetValueWriter writer = createWriterFunc.apply(parquetSchema); if (writer instanceof StructWriter) { - return new PositionDeleteStructWriter((StructWriter) writer); + return new PositionDeleteStructWriter((StructWriter) writer, pathTransformFunc); } else { throw new UnsupportedOperationException("Cannot wrap writer for position deletes: " + writer.getClass()); } }); } else { - appenderBuilder.schema(new org.apache.iceberg.Schema( - MetadataColumns.DELETE_FILE_PATH, - MetadataColumns.DELETE_FILE_POS)); + appenderBuilder.schema(DeleteSchemaUtil.pathPosSchema()); appenderBuilder.createWriterFunc(parquetSchema -> - new PositionDeleteStructWriter((StructWriter) GenericParquetWriter.buildWriter(parquetSchema))); + new PositionDeleteStructWriter((StructWriter) GenericParquetWriter.buildWriter(parquetSchema), + Function.identity())); } return new PositionDeleteWriter<>( diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java index c17a2ffa173d..7d77996e9968 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java @@ -27,6 +27,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.function.Function; import java.util.stream.Stream; import org.apache.avro.util.Utf8; import org.apache.iceberg.FieldMetrics; @@ -562,15 +563,18 @@ public Stream metrics() { } public static class PositionDeleteStructWriter extends StructWriter> { - public PositionDeleteStructWriter(StructWriter replacedWriter) { + private final Function pathTransformFunc; + + public PositionDeleteStructWriter(StructWriter replacedWriter, Function pathTransformFunc) { super(Arrays.asList(replacedWriter.writers)); + this.pathTransformFunc = pathTransformFunc; } @Override protected Object get(PositionDelete delete, int index) { switch (index) { case 0: - return delete.path(); + return pathTransformFunc.apply(delete.path()); case 1: return delete.pos(); case 2: diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java index 37ca56c700a4..6393061a6cd8 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java @@ -98,7 +98,7 @@ private List rewriteDataForTask(CombinedScanTask task) throws Exceptio task, schema, schema, nameMapping, io.value(), encryptionManager.value(), caseSensitive); StructType structType = SparkSchemaUtil.convert(schema); - SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, schema, structType); + SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, schema, structType, spec); OutputFileFactory fileFactory = new OutputFileFactory( spec, format, locations, io.value(), encryptionManager.value(), partitionId, taskId); diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java index 3383fe7c29f7..3fe7513db6cd 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java @@ -20,32 +20,79 @@ package org.apache.iceberg.spark.source; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Map; import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.DeleteSchemaUtil; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.data.SparkAvroWriter; import org.apache.iceberg.spark.data.SparkOrcWriter; import org.apache.iceberg.spark.data.SparkParquetWriters; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.UTF8String; class SparkAppenderFactory implements FileAppenderFactory { private final Map properties; private final Schema writeSchema; private final StructType dsSchema; + private final PartitionSpec spec; + private final int[] equalityFieldIds; + private final Schema eqDeleteRowSchema; + private final Schema posDeleteRowSchema; + + private StructType eqDeleteSparkType = null; + private StructType posDeleteSparkType = null; SparkAppenderFactory(Map properties, Schema writeSchema, StructType dsSchema) { + this(properties, writeSchema, dsSchema, PartitionSpec.unpartitioned(), null, null, null); + } + + SparkAppenderFactory(Map properties, Schema writeSchema, StructType dsSchema, PartitionSpec spec) { + this(properties, writeSchema, dsSchema, spec, null, null, null); + } + + SparkAppenderFactory(Map properties, Schema writeSchema, StructType dsSchema, PartitionSpec spec, + int[] equalityFieldIds, Schema eqDeleteRowSchema, Schema posDeleteRowSchema) { this.properties = properties; this.writeSchema = writeSchema; this.dsSchema = dsSchema; + this.spec = spec; + this.equalityFieldIds = equalityFieldIds; + this.eqDeleteRowSchema = eqDeleteRowSchema; + this.posDeleteRowSchema = posDeleteRowSchema; + } + + private StructType lazyEqDeleteSparkType() { + if (eqDeleteSparkType == null) { + Preconditions.checkNotNull(eqDeleteRowSchema, "Equality delete row schema shouldn't be null"); + this.eqDeleteSparkType = SparkSchemaUtil.convert(eqDeleteRowSchema); + } + return eqDeleteSparkType; + } + + private StructType lazyPosDeleteSparkType() { + if (posDeleteSparkType == null) { + Preconditions.checkNotNull(posDeleteRowSchema, "Position delete row schema shouldn't be null"); + this.posDeleteSparkType = SparkSchemaUtil.convert(posDeleteRowSchema); + } + return posDeleteSparkType; } @Override @@ -86,4 +133,89 @@ public FileAppender newAppender(OutputFile file, FileFormat fileFor throw new RuntimeIOException(e); } } + + @Override + public DataWriter newDataWriter(EncryptedOutputFile file, FileFormat format, StructLike partition) { + return new DataWriter<>(newAppender(file.encryptingOutputFile(), format), format, + file.encryptingOutputFile().location(), spec, partition, file.keyMetadata()); + } + + @Override + public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile file, FileFormat format, + StructLike partition) { + Preconditions.checkState(equalityFieldIds != null && equalityFieldIds.length > 0, + "Equality field ids shouldn't be null or empty when creating equality-delete writer"); + Preconditions.checkNotNull(eqDeleteRowSchema, + "Equality delete row schema shouldn't be null when creating equality-delete writer"); + + try { + switch (format) { + case PARQUET: + return Parquet.writeDeletes(file.encryptingOutputFile()) + .createWriterFunc(msgType -> SparkParquetWriters.buildWriter(lazyEqDeleteSparkType(), msgType)) + .overwrite() + .rowSchema(eqDeleteRowSchema) + .withSpec(spec) + .withPartition(partition) + .equalityFieldIds(equalityFieldIds) + .withKeyMetadata(file.keyMetadata()) + .buildEqualityWriter(); + + case AVRO: + return Avro.writeDeletes(file.encryptingOutputFile()) + .createWriterFunc(ignored -> new SparkAvroWriter(lazyEqDeleteSparkType())) + .overwrite() + .rowSchema(eqDeleteRowSchema) + .withSpec(spec) + .withPartition(partition) + .equalityFieldIds(equalityFieldIds) + .withKeyMetadata(file.keyMetadata()) + .buildEqualityWriter(); + + default: + throw new UnsupportedOperationException( + "Cannot write equality-deletes for unsupported file format: " + format); + } + } catch (IOException e) { + throw new UncheckedIOException("Failed to create new equality delete writer", e); + } + } + + @Override + public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile file, FileFormat format, + StructLike partition) { + try { + switch (format) { + case PARQUET: + StructType sparkPosDeleteSchema = + SparkSchemaUtil.convert(DeleteSchemaUtil.posDeleteSchema(posDeleteRowSchema)); + return Parquet.writeDeletes(file.encryptingOutputFile()) + .createWriterFunc(msgType -> SparkParquetWriters.buildWriter(sparkPosDeleteSchema, msgType)) + .overwrite() + .rowSchema(posDeleteRowSchema) + .withSpec(spec) + .withPartition(partition) + .withKeyMetadata(file.keyMetadata()) + .transformPaths(path -> UTF8String.fromString(path.toString())) + .buildPositionWriter(); + + case AVRO: + return Avro.writeDeletes(file.encryptingOutputFile()) + .createWriterFunc(ignored -> new SparkAvroWriter(lazyPosDeleteSparkType())) + .overwrite() + .rowSchema(posDeleteRowSchema) + .withSpec(spec) + .withPartition(partition) + .withKeyMetadata(file.keyMetadata()) + .buildPositionWriter(); + + default: + throw new UnsupportedOperationException( + "Cannot write pos-deletes for unsupported file format: " + format); + } + + } catch (IOException e) { + throw new UncheckedIOException("Failed to create new equality delete writer", e); + } + } } diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java new file mode 100644 index 000000000000..cac021d57feb --- /dev/null +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.util.List; +import org.apache.iceberg.Schema; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.TestAppenderFactory; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.util.ArrayUtil; +import org.apache.iceberg.util.StructLikeSet; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.UTF8String; + +public class TestSparkAppenderFactory extends TestAppenderFactory { + + private final StructType sparkType; + + public TestSparkAppenderFactory(String fileFormat, boolean partitioned) { + super(fileFormat, partitioned); + this.sparkType = SparkSchemaUtil.convert(SCHEMA); + } + + @Override + protected FileAppenderFactory createAppenderFactory(List equalityFieldIds, + Schema eqDeleteSchema, + Schema posDeleteRowSchema) { + return new SparkAppenderFactory(table.properties(), table.schema(), sparkType, table.spec(), + ArrayUtil.toIntArray(equalityFieldIds), eqDeleteSchema, posDeleteRowSchema); + } + + @Override + protected InternalRow createRow(Integer id, String data) { + InternalRow row = new GenericInternalRow(2); + row.update(0, id); + row.update(1, UTF8String.fromString(data)); + return row; + } + + @Override + protected StructLikeSet expectedRowSet(Iterable rows) { + StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); + for (InternalRow row : rows) { + InternalRowWrapper wrapper = new InternalRowWrapper(sparkType); + set.add(wrapper.wrap(row)); + } + return set; + } +} diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java b/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java index 6b6739a06a25..65b123b1e831 100644 --- a/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java +++ b/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java @@ -266,7 +266,7 @@ static class WriterFactory implements DataWriterFactory { public DataWriter createDataWriter(int partitionId, long taskId, long epochId) { OutputFileFactory fileFactory = new OutputFileFactory( spec, format, locations, io.value(), encryptionManager.value(), partitionId, taskId); - SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, writeSchema, dsSchema); + SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, writeSchema, dsSchema, spec); if (spec.fields().isEmpty()) { return new Unpartitioned24Writer(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize); diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index efbc319197f0..fd6b611aefbd 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -416,7 +416,7 @@ public DataWriter createWriter(int partitionId, long taskId) { public DataWriter createWriter(int partitionId, long taskId, long epochId) { OutputFileFactory fileFactory = new OutputFileFactory( spec, format, locations, io.value(), encryptionManager.value(), partitionId, taskId); - SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, writeSchema, dsSchema); + SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, writeSchema, dsSchema, spec); if (spec.fields().isEmpty()) { return new Unpartitioned3Writer(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize); } else {