diff --git a/core/src/main/java/org/apache/iceberg/io/FanoutPositionDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/FanoutPositionDeleteWriter.java new file mode 100644 index 000000000000..1968c2c31064 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/FanoutPositionDeleteWriter.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.CharSequenceSet; + +/** + * A position delete writer capable of writing to multiple specs and partitions that keeps position delete writers + * for each seen spec/partition pair open until this writer is closed. + */ +public class FanoutPositionDeleteWriter extends FanoutWriter, DeleteWriteResult> { + + private final FileWriterFactory writerFactory; + private final OutputFileFactory fileFactory; + private final FileIO io; + private final long targetFileSizeInBytes; + private final List deleteFiles; + private final CharSequenceSet referencedDataFiles; + + public FanoutPositionDeleteWriter(FileWriterFactory writerFactory, OutputFileFactory fileFactory, + FileIO io, long targetFileSizeInBytes) { + this.writerFactory = writerFactory; + this.fileFactory = fileFactory; + this.io = io; + this.targetFileSizeInBytes = targetFileSizeInBytes; + this.deleteFiles = Lists.newArrayList(); + this.referencedDataFiles = CharSequenceSet.empty(); + } + + @Override + protected FileWriter, DeleteWriteResult> newWriter(PartitionSpec spec, StructLike partition) { + return new RollingPositionDeleteWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition); + } + + @Override + protected void addResult(DeleteWriteResult result) { + deleteFiles.addAll(result.deleteFiles()); + referencedDataFiles.addAll(result.referencedDataFiles()); + } + + @Override + protected DeleteWriteResult aggregatedResult() { + return new DeleteWriteResult(deleteFiles, referencedDataFiles); + } +} diff --git a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java index 054881af458b..2788b800cb48 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java +++ b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java @@ -468,6 +468,18 @@ public void testFanoutDataWriterNoRecords() throws IOException { Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size()); } + @Test + public void testFanoutDeleteWriterNoRecords() throws IOException { + FileWriterFactory writerFactory = newWriterFactory(table.schema()); + FanoutPositionDeleteWriter writer = + new FanoutPositionDeleteWriter<>(writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE); + + writer.close(); + DeleteWriteResult result = writer.result(); + Assert.assertEquals("Result should contain 0 delete files", 0, result.deleteFiles().size()); + Assert.assertEquals("Result should reference 0 data files", 0, result.referencedDataFiles().size()); + } + @Test public void testFanoutDataWriterMultiplePartitions() throws IOException { table.updateSpec() @@ -505,4 +517,92 @@ public void testFanoutDataWriterMultiplePartitions() throws IOException { ); Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*")); } + + @Test + public void testFanoutDeleteWriterMultipleSpecs() throws IOException { + FileWriterFactory writerFactory = newWriterFactory(table.schema()); + + // insert some unpartitioned data + ImmutableList unpartRows = ImmutableList.of( + toRow(1, "aaa"), + toRow(2, "bbb"), + toRow(3, "ccc") + ); + DataFile dataFile1 = writeData(writerFactory, fileFactory, unpartRows, table.spec(), null); + table.newFastAppend() + .appendFile(dataFile1) + .commit(); + + // identity partition using the 'data' column + table.updateSpec().addField("data").commit(); + // insert some partitioned data + ImmutableList identityRows1 = ImmutableList.of( + toRow(4, "fff"), + toRow(5, "fff"), + toRow(6, "fff") + ); + ImmutableList identityRows2 = ImmutableList.of( + toRow(7, "rrr"), + toRow(8, "rrr"), + toRow(9, "rrr") + ); + DataFile dataFile2 = + writeData(writerFactory, fileFactory, identityRows1, table.spec(), partitionKey(table.spec(), "fff")); + DataFile dataFile3 = + writeData(writerFactory, fileFactory, identityRows2, table.spec(), partitionKey(table.spec(), "rrr")); + table.newFastAppend() + .appendFile(dataFile2) + .appendFile(dataFile3) + .commit(); + + // switch to using bucket partitioning on the 'data' column + table.updateSpec().removeField("data").addField(Expressions.bucket("data", 16)).commit(); + // insert some data + ImmutableList bucketedRows = ImmutableList.of( + toRow(10, "rrr"), + toRow(11, "rrr"), + toRow(12, "rrr") + ); + DataFile dataFile4 = + writeData(writerFactory, fileFactory, bucketedRows, table.spec(), partitionKey(table.spec(), "rrr")); + table.newFastAppend() + .appendFile(dataFile4) + .commit(); + + PartitionSpec unpartitionedSpec = table.specs().get(0); + PartitionSpec identitySpec = table.specs().get(1); + PartitionSpec bucketedSpec = table.specs().get(2); + + // delete some records + FanoutPositionDeleteWriter writer = + new FanoutPositionDeleteWriter<>(writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE); + writer.write(positionDelete(dataFile1.path(), 0L, null), unpartitionedSpec, null); + writer.write(positionDelete(dataFile2.path(), 0L, null), identitySpec, partitionKey(identitySpec, "fff")); + writer.write(positionDelete(dataFile3.path(), 2L, null), identitySpec, partitionKey(identitySpec, "rrr")); + // test out-of-order partition + writer.write(positionDelete(dataFile2.path(), 1L, null), identitySpec, partitionKey(identitySpec, "fff")); + writer.write(positionDelete(dataFile4.path(), 0L, null), bucketedSpec, partitionKey(bucketedSpec, "rrr")); + // test out-of-order spec + writer.write(positionDelete(dataFile1.path(), 1L, null), unpartitionedSpec, null); + writer.write(positionDelete(dataFile2.path(), 2L, null), identitySpec, partitionKey(identitySpec, "fff")); + writer.close(); + + DeleteWriteResult result = writer.result(); + Assert.assertEquals("Result should contain 4 delete files", 4, result.deleteFiles().size()); + Assert.assertEquals("Result should reference 4 data files", 4, result.referencedDataFiles().size()); + + RowDelta rowDelta = table.newRowDelta(); + result.deleteFiles().forEach(rowDelta::addDeletes); + rowDelta.commit(); + + // check if correct records are read back + List expectedRows = ImmutableList.of( + toRow(3, "ccc"), + toRow(7, "rrr"), + toRow(8, "rrr"), + toRow(11, "rrr"), + toRow(12, "rrr") + ); + Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*")); + } }