From 1c7e2508180b73345bf5cb3eea8a14ec15115c54 Mon Sep 17 00:00:00 2001 From: Marton Bod Date: Tue, 26 Oct 2021 13:40:55 +0200 Subject: [PATCH 1/4] Core: Implement FanoutDeleteWriter --- .../apache/iceberg/io/FanoutDeleteWriter.java | 68 ++++++++++++++ .../iceberg/io/TestPartitioningWriters.java | 93 +++++++++++++++++++ 2 files changed, 161 insertions(+) create mode 100644 core/src/main/java/org/apache/iceberg/io/FanoutDeleteWriter.java diff --git a/core/src/main/java/org/apache/iceberg/io/FanoutDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/FanoutDeleteWriter.java new file mode 100644 index 000000000000..bb734fdf2fd5 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/FanoutDeleteWriter.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.CharSequenceSet; + +/** + * A delete writer capable of writing to multiple specs and partitions that keeps delete writers for each + * seen spec/partition pair open until this writer is closed. + */ +public class FanoutDeleteWriter extends FanoutWriter, DeleteWriteResult> { + + private final FileWriterFactory writerFactory; + private final OutputFileFactory fileFactory; + private final FileIO io; + private final long targetFileSizeInBytes; + private final List deleteFiles; + private final CharSequenceSet referencedDataFiles; + + public FanoutDeleteWriter(FileWriterFactory writerFactory, OutputFileFactory fileFactory, + FileIO io, long targetFileSizeInBytes) { + this.writerFactory = writerFactory; + this.fileFactory = fileFactory; + this.io = io; + this.targetFileSizeInBytes = targetFileSizeInBytes; + this.deleteFiles = Lists.newArrayList(); + this.referencedDataFiles = CharSequenceSet.empty(); + } + + @Override + protected FileWriter, DeleteWriteResult> newWriter(PartitionSpec spec, StructLike partition) { + return new RollingPositionDeleteWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition); + } + + @Override + protected void addResult(DeleteWriteResult result) { + deleteFiles.addAll(result.deleteFiles()); + referencedDataFiles.addAll(result.referencedDataFiles()); + } + + @Override + protected DeleteWriteResult aggregatedResult() { + return new DeleteWriteResult(deleteFiles, referencedDataFiles); + } +} diff --git a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java index 054881af458b..0c7c122b5258 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java +++ b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java @@ -468,6 +468,17 @@ public void testFanoutDataWriterNoRecords() throws IOException { Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size()); } + @Test + public void testFanoutDeleteWriterNoRecords() throws IOException { + FileWriterFactory writerFactory = newWriterFactory(table.schema()); + FanoutDeleteWriter writer = new FanoutDeleteWriter<>(writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE); + + writer.close(); + DeleteWriteResult result = writer.result(); + Assert.assertEquals("Result should contain 0 delete files", 0, result.deleteFiles().size()); + Assert.assertEquals("Result should reference 0 data files", 0, result.referencedDataFiles().size()); + } + @Test public void testFanoutDataWriterMultiplePartitions() throws IOException { table.updateSpec() @@ -505,4 +516,86 @@ public void testFanoutDataWriterMultiplePartitions() throws IOException { ); Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*")); } + + @Test + public void testFanoutDeleteWriterMultipleSpecs() throws IOException { + FileWriterFactory writerFactory = newWriterFactory(table.schema()); + + // insert some unpartitioned data + ImmutableList unpart_rows = ImmutableList.of( + toRow(1, "aaa"), + toRow(2, "bbb"), + toRow(3, "ccc") + ); + DataFile dataFile1 = writeData(writerFactory, fileFactory, unpart_rows, table.spec(), null); + table.newFastAppend() + .appendFile(dataFile1) + .commit(); + + // identity partition using the 'data' column + table.updateSpec().addField("data").commit(); + // insert some partitioned data + ImmutableList part_fff = ImmutableList.of( + toRow(4, "fff"), + toRow(5, "fff"), + toRow(6, "fff") + ); + ImmutableList part_rrr = ImmutableList.of( + toRow(7, "rrr"), + toRow(8, "rrr"), + toRow(9, "rrr") + ); + DataFile dataFile2 = writeData(writerFactory, fileFactory, part_fff, table.spec(), partitionKey(table.spec(), "fff")); + DataFile dataFile3 = writeData(writerFactory, fileFactory, part_rrr, table.spec(), partitionKey(table.spec(), "rrr")); + table.newFastAppend() + .appendFile(dataFile2) + .appendFile(dataFile3) + .commit(); + + // switch to using bucket partitioning on the 'data' column + table.updateSpec().removeField("data").addField(Expressions.bucket("data", 16)).commit(); + // insert some data + ImmutableList bucketed_rows = ImmutableList.of( + toRow(10, "rrr"), + toRow(11, "rrr"), + toRow(12, "rrr") + ); + DataFile dataFile4 = writeData(writerFactory, fileFactory, bucketed_rows, table.spec(), partitionKey(table.spec(), "rrr")); + table.newFastAppend() + .appendFile(dataFile4) + .commit(); + + PartitionSpec unpartitionedSpec = table.specs().get(0); + PartitionSpec identitySpec = table.specs().get(1); + PartitionSpec bucketedSpec = table.specs().get(2); + + // delete some records + FanoutDeleteWriter writer = new FanoutDeleteWriter<>(writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE); + writer.write(positionDelete(dataFile1.path(), 0L, null), unpartitionedSpec, null); + writer.write(positionDelete(dataFile2.path(), 1L, null), identitySpec, partitionKey(identitySpec, "fff")); + writer.write(positionDelete(dataFile2.path(), 2L, null), identitySpec, partitionKey(identitySpec, "fff")); + writer.write(positionDelete(dataFile3.path(), 2L, null), identitySpec, partitionKey(identitySpec, "rrr")); + writer.write(positionDelete(dataFile4.path(), 0L, null), bucketedSpec, partitionKey(bucketedSpec, "rrr")); + writer.close(); + + DeleteWriteResult result = writer.result(); + Assert.assertEquals("Result should contain 4 delete files", 4, result.deleteFiles().size()); + Assert.assertEquals("Result should reference 4 data files", 4, result.referencedDataFiles().size()); + + RowDelta rowDelta = table.newRowDelta(); + result.deleteFiles().forEach(rowDelta::addDeletes); + rowDelta.commit(); + + // check if correct records are read back + List expectedRows = ImmutableList.of( + toRow(2, "bbb"), + toRow(3, "ccc"), + toRow(4, "fff"), + toRow(7, "rrr"), + toRow(8, "rrr"), + toRow(11, "rrr"), + toRow(12, "rrr") + ); + Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*")); + } } From f3e08a1427bd46a45efcc2f802199e5673119f3e Mon Sep 17 00:00:00 2001 From: Marton Bod Date: Tue, 26 Oct 2021 13:59:26 +0200 Subject: [PATCH 2/4] Add out of order spec test --- .../iceberg/io/TestPartitioningWriters.java | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java index 0c7c122b5258..a3e86f8f87bb 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java +++ b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java @@ -522,12 +522,12 @@ public void testFanoutDeleteWriterMultipleSpecs() throws IOException { FileWriterFactory writerFactory = newWriterFactory(table.schema()); // insert some unpartitioned data - ImmutableList unpart_rows = ImmutableList.of( + ImmutableList unpartRows = ImmutableList.of( toRow(1, "aaa"), toRow(2, "bbb"), toRow(3, "ccc") ); - DataFile dataFile1 = writeData(writerFactory, fileFactory, unpart_rows, table.spec(), null); + DataFile dataFile1 = writeData(writerFactory, fileFactory, unpartRows, table.spec(), null); table.newFastAppend() .appendFile(dataFile1) .commit(); @@ -535,18 +535,20 @@ public void testFanoutDeleteWriterMultipleSpecs() throws IOException { // identity partition using the 'data' column table.updateSpec().addField("data").commit(); // insert some partitioned data - ImmutableList part_fff = ImmutableList.of( + ImmutableList identityRows1 = ImmutableList.of( toRow(4, "fff"), toRow(5, "fff"), toRow(6, "fff") ); - ImmutableList part_rrr = ImmutableList.of( + ImmutableList identityRows2 = ImmutableList.of( toRow(7, "rrr"), toRow(8, "rrr"), toRow(9, "rrr") ); - DataFile dataFile2 = writeData(writerFactory, fileFactory, part_fff, table.spec(), partitionKey(table.spec(), "fff")); - DataFile dataFile3 = writeData(writerFactory, fileFactory, part_rrr, table.spec(), partitionKey(table.spec(), "rrr")); + DataFile dataFile2 = + writeData(writerFactory, fileFactory, identityRows1, table.spec(), partitionKey(table.spec(), "fff")); + DataFile dataFile3 = + writeData(writerFactory, fileFactory, identityRows2, table.spec(), partitionKey(table.spec(), "rrr")); table.newFastAppend() .appendFile(dataFile2) .appendFile(dataFile3) @@ -555,15 +557,16 @@ public void testFanoutDeleteWriterMultipleSpecs() throws IOException { // switch to using bucket partitioning on the 'data' column table.updateSpec().removeField("data").addField(Expressions.bucket("data", 16)).commit(); // insert some data - ImmutableList bucketed_rows = ImmutableList.of( + ImmutableList bucketedRows = ImmutableList.of( toRow(10, "rrr"), toRow(11, "rrr"), toRow(12, "rrr") ); - DataFile dataFile4 = writeData(writerFactory, fileFactory, bucketed_rows, table.spec(), partitionKey(table.spec(), "rrr")); + DataFile dataFile4 = + writeData(writerFactory, fileFactory, bucketedRows, table.spec(), partitionKey(table.spec(), "rrr")); table.newFastAppend() - .appendFile(dataFile4) - .commit(); + .appendFile(dataFile4) + .commit(); PartitionSpec unpartitionedSpec = table.specs().get(0); PartitionSpec identitySpec = table.specs().get(1); @@ -572,10 +575,13 @@ public void testFanoutDeleteWriterMultipleSpecs() throws IOException { // delete some records FanoutDeleteWriter writer = new FanoutDeleteWriter<>(writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE); writer.write(positionDelete(dataFile1.path(), 0L, null), unpartitionedSpec, null); + writer.write(positionDelete(dataFile2.path(), 0L, null), identitySpec, partitionKey(identitySpec, "fff")); writer.write(positionDelete(dataFile2.path(), 1L, null), identitySpec, partitionKey(identitySpec, "fff")); - writer.write(positionDelete(dataFile2.path(), 2L, null), identitySpec, partitionKey(identitySpec, "fff")); writer.write(positionDelete(dataFile3.path(), 2L, null), identitySpec, partitionKey(identitySpec, "rrr")); writer.write(positionDelete(dataFile4.path(), 0L, null), bucketedSpec, partitionKey(bucketedSpec, "rrr")); + // pepper in some out-of-order spec deletes, which shouldn't cause problems for fanout writer + writer.write(positionDelete(dataFile1.path(), 1L, null), unpartitionedSpec, null); + writer.write(positionDelete(dataFile2.path(), 2L, null), identitySpec, partitionKey(identitySpec, "fff")); writer.close(); DeleteWriteResult result = writer.result(); @@ -588,9 +594,7 @@ public void testFanoutDeleteWriterMultipleSpecs() throws IOException { // check if correct records are read back List expectedRows = ImmutableList.of( - toRow(2, "bbb"), toRow(3, "ccc"), - toRow(4, "fff"), toRow(7, "rrr"), toRow(8, "rrr"), toRow(11, "rrr"), From 339821f4daa525fbb51c2347af9e1e5513f994a5 Mon Sep 17 00:00:00 2001 From: Marton Bod Date: Tue, 26 Oct 2021 14:14:45 +0200 Subject: [PATCH 3/4] Rename class --- ...leteWriter.java => FanoutPositionDeleteWriter.java} | 10 +++++----- .../org/apache/iceberg/io/TestPartitioningWriters.java | 6 ++++-- 2 files changed, 9 insertions(+), 7 deletions(-) rename core/src/main/java/org/apache/iceberg/io/{FanoutDeleteWriter.java => FanoutPositionDeleteWriter.java} (82%) diff --git a/core/src/main/java/org/apache/iceberg/io/FanoutDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/FanoutPositionDeleteWriter.java similarity index 82% rename from core/src/main/java/org/apache/iceberg/io/FanoutDeleteWriter.java rename to core/src/main/java/org/apache/iceberg/io/FanoutPositionDeleteWriter.java index bb734fdf2fd5..1968c2c31064 100644 --- a/core/src/main/java/org/apache/iceberg/io/FanoutDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/FanoutPositionDeleteWriter.java @@ -28,10 +28,10 @@ import org.apache.iceberg.util.CharSequenceSet; /** - * A delete writer capable of writing to multiple specs and partitions that keeps delete writers for each - * seen spec/partition pair open until this writer is closed. + * A position delete writer capable of writing to multiple specs and partitions that keeps position delete writers + * for each seen spec/partition pair open until this writer is closed. */ -public class FanoutDeleteWriter extends FanoutWriter, DeleteWriteResult> { +public class FanoutPositionDeleteWriter extends FanoutWriter, DeleteWriteResult> { private final FileWriterFactory writerFactory; private final OutputFileFactory fileFactory; @@ -40,8 +40,8 @@ public class FanoutDeleteWriter extends FanoutWriter, Delet private final List deleteFiles; private final CharSequenceSet referencedDataFiles; - public FanoutDeleteWriter(FileWriterFactory writerFactory, OutputFileFactory fileFactory, - FileIO io, long targetFileSizeInBytes) { + public FanoutPositionDeleteWriter(FileWriterFactory writerFactory, OutputFileFactory fileFactory, + FileIO io, long targetFileSizeInBytes) { this.writerFactory = writerFactory; this.fileFactory = fileFactory; this.io = io; diff --git a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java index a3e86f8f87bb..6dff2b2945ba 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java +++ b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java @@ -471,7 +471,8 @@ public void testFanoutDataWriterNoRecords() throws IOException { @Test public void testFanoutDeleteWriterNoRecords() throws IOException { FileWriterFactory writerFactory = newWriterFactory(table.schema()); - FanoutDeleteWriter writer = new FanoutDeleteWriter<>(writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE); + FanoutPositionDeleteWriter writer = + new FanoutPositionDeleteWriter<>(writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE); writer.close(); DeleteWriteResult result = writer.result(); @@ -573,7 +574,8 @@ public void testFanoutDeleteWriterMultipleSpecs() throws IOException { PartitionSpec bucketedSpec = table.specs().get(2); // delete some records - FanoutDeleteWriter writer = new FanoutDeleteWriter<>(writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE); + FanoutPositionDeleteWriter writer = + new FanoutPositionDeleteWriter<>(writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE); writer.write(positionDelete(dataFile1.path(), 0L, null), unpartitionedSpec, null); writer.write(positionDelete(dataFile2.path(), 0L, null), identitySpec, partitionKey(identitySpec, "fff")); writer.write(positionDelete(dataFile2.path(), 1L, null), identitySpec, partitionKey(identitySpec, "fff")); From 39e1079c7f5b3b36aea22b008f8501dcf7b05c44 Mon Sep 17 00:00:00 2001 From: Marton Bod Date: Wed, 27 Oct 2021 09:53:05 +0200 Subject: [PATCH 4/4] Test out of order partition too --- .../java/org/apache/iceberg/io/TestPartitioningWriters.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java index 6dff2b2945ba..2788b800cb48 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java +++ b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java @@ -578,10 +578,11 @@ public void testFanoutDeleteWriterMultipleSpecs() throws IOException { new FanoutPositionDeleteWriter<>(writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE); writer.write(positionDelete(dataFile1.path(), 0L, null), unpartitionedSpec, null); writer.write(positionDelete(dataFile2.path(), 0L, null), identitySpec, partitionKey(identitySpec, "fff")); - writer.write(positionDelete(dataFile2.path(), 1L, null), identitySpec, partitionKey(identitySpec, "fff")); writer.write(positionDelete(dataFile3.path(), 2L, null), identitySpec, partitionKey(identitySpec, "rrr")); + // test out-of-order partition + writer.write(positionDelete(dataFile2.path(), 1L, null), identitySpec, partitionKey(identitySpec, "fff")); writer.write(positionDelete(dataFile4.path(), 0L, null), bucketedSpec, partitionKey(bucketedSpec, "rrr")); - // pepper in some out-of-order spec deletes, which shouldn't cause problems for fanout writer + // test out-of-order spec writer.write(positionDelete(dataFile1.path(), 1L, null), unpartitionedSpec, null); writer.write(positionDelete(dataFile2.path(), 2L, null), identitySpec, partitionKey(identitySpec, "fff")); writer.close();