From 609049e1e9a05d3fa76baca3fdd13b9f0a1dce99 Mon Sep 17 00:00:00 2001 From: huzheng Date: Wed, 25 Nov 2020 18:25:42 +0800 Subject: [PATCH 1/8] Core: Add data and delete writers in FileAppenderFactory. --- build.gradle | 2 + .../org/apache/iceberg/io/DataWriter.java | 80 +++++ .../apache/iceberg/io/DeleteSchemaUtil.java | 48 +++ .../iceberg/io/FileAppenderFactory.java | 34 ++ .../org/apache/iceberg/TableTestBase.java | 8 +- .../iceberg/io/TestAppenderFactory.java | 295 ++++++++++++++++++ .../iceberg/data/GenericAppenderFactory.java | 106 ++++++- .../iceberg/TestGenericAppenderFactory.java | 77 +++++ .../org/apache/iceberg/TestSplitScan.java | 4 +- .../iceberg/data/GenericAppenderHelper.java | 7 +- .../apache/iceberg/data/TestLocalScan.java | 5 +- .../parquet/TestParquetMergingMetrics.java | 5 +- .../flink/sink/FlinkAppenderFactory.java | 200 ++++++++++++ .../flink/sink/RowDataTaskWriterFactory.java | 64 +--- .../apache/iceberg/flink/SimpleDataUtil.java | 6 +- .../flink/sink/TestFlinkAppenderFactory.java | 78 +++++ .../flink/source/TestFlinkMergingMetrics.java | 9 +- .../org/apache/iceberg/parquet/Parquet.java | 14 +- .../iceberg/parquet/ParquetValueWriters.java | 7 +- .../iceberg/spark/source/RowDataRewriter.java | 2 +- .../spark/source/SparkAppenderFactory.java | 97 +++++- .../source/TestSparkAppenderFactory.java | 81 +++++ .../TestSparkParquetMergingMetrics.java | 8 +- .../spark/source/TestSparkReadProjection.java | 4 +- .../apache/iceberg/spark/source/Writer.java | 2 +- .../spark/source/TestFilteredScan.java | 4 +- .../iceberg/spark/source/SparkWrite.java | 2 +- .../spark/source/TestFilteredScan.java | 4 +- 28 files changed, 1142 insertions(+), 111 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/io/DataWriter.java create mode 100644 core/src/main/java/org/apache/iceberg/io/DeleteSchemaUtil.java create mode 100644 core/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java create mode 100644 data/src/test/java/org/apache/iceberg/TestGenericAppenderFactory.java create mode 100644 flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java create mode 100644 flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkAppenderFactory.java create mode 100644 spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java diff --git a/build.gradle b/build.gradle index 53a2bee15c79..d6a9403074d9 100644 --- a/build.gradle +++ b/build.gradle @@ -337,6 +337,7 @@ project(':iceberg-flink') { testCompile project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts') testCompile project(path: ':iceberg-api', configuration: 'testArtifacts') + testCompile project(path: ':iceberg-core', configuration: 'testArtifacts') testCompile project(path: ':iceberg-data', configuration: 'testArtifacts') // By default, hive-exec is a fat/uber jar and it exports a guava library @@ -766,6 +767,7 @@ project(':iceberg-spark') { } testCompile project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts') testCompile project(path: ':iceberg-api', configuration: 'testArtifacts') + testCompile project(path: ':iceberg-core', configuration: 'testArtifacts') testCompile project(path: ':iceberg-data', configuration: 'testArtifacts') } diff --git a/core/src/main/java/org/apache/iceberg/io/DataWriter.java b/core/src/main/java/org/apache/iceberg/io/DataWriter.java new file mode 100644 index 000000000000..3b01bdcdbc94 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/DataWriter.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class DataWriter implements Closeable { + private final FileAppender appender; + private final FileFormat format; + private final String location; + private final PartitionSpec spec; + private final StructLike partition; + private final ByteBuffer keyMetadata; + private DataFile dataFile = null; + + public DataWriter(FileAppender appender, FileFormat format, String location, + PartitionSpec spec, StructLike partition, EncryptionKeyMetadata keyMetadata) { + this.appender = appender; + this.format = format; + this.location = location; + this.spec = spec; + this.partition = partition; + this.keyMetadata = keyMetadata != null ? keyMetadata.buffer() : null; + } + + public void add(T row) { + appender.add(row); + } + + public long length() { + return appender.length(); + } + + @Override + public void close() throws IOException { + if (dataFile == null) { + appender.close(); + this.dataFile = DataFiles.builder(spec) + .withFormat(format) + .withPath(location) + .withPartition(partition) + .withEncryptionKeyMetadata(keyMetadata) + .withFileSizeInBytes(appender.length()) + .withMetrics(appender.metrics()) + .withSplitOffsets(appender.splitOffsets()) + .build(); + } + } + + public DataFile toDataFile() { + Preconditions.checkState(dataFile != null, "Cannot create data file from unclosed writer"); + return dataFile; + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/DeleteSchemaUtil.java b/core/src/main/java/org/apache/iceberg/io/DeleteSchemaUtil.java new file mode 100644 index 000000000000..3da96d765643 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/DeleteSchemaUtil.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Schema; +import org.apache.iceberg.types.Types; + +public class DeleteSchemaUtil { + private DeleteSchemaUtil() { + } + + public static Schema pathPosSchema(Schema rowSchema) { + return new Schema( + MetadataColumns.DELETE_FILE_PATH, + MetadataColumns.DELETE_FILE_POS, + Types.NestedField.optional( + MetadataColumns.DELETE_FILE_ROW_FIELD_ID, "row", rowSchema.asStruct(), + MetadataColumns.DELETE_FILE_ROW_DOC)); + } + + public static Schema pathPosSchema() { + return new Schema( + MetadataColumns.DELETE_FILE_PATH, + MetadataColumns.DELETE_FILE_POS); + } + + public static Schema posDeleteSchema(Schema rowSchema) { + return rowSchema == null ? pathPosSchema() : pathPosSchema(rowSchema); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java b/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java index 9afdca460f0a..b093eab447fe 100644 --- a/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java +++ b/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java @@ -20,6 +20,10 @@ package org.apache.iceberg.io; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; /** * Factory to create a new {@link FileAppender} to write records. @@ -36,4 +40,34 @@ public interface FileAppenderFactory { * @return a newly created {@link FileAppender} */ FileAppender newAppender(OutputFile outputFile, FileFormat fileFormat); + + /** + * Create a new {@link DataWriter}. + * + * @param outputFile an OutputFile used to create an output stream. + * @param format a file format + * @param partition a tuple of partition values + * @return a newly created {@link DataWriter} for rows + */ + DataWriter newDataWriter(EncryptedOutputFile outputFile, FileFormat format, StructLike partition); + + /** + * Create a new {@link EqualityDeleteWriter}. + * + * @param outputFile an OutputFile used to create an output stream. + * @param format a file format + * @param partition a tuple of partition values + * @return a newly created {@link EqualityDeleteWriter} for equality deletes + */ + EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile outputFile, FileFormat format, StructLike partition); + + /** + * Create a new {@link PositionDeleteWriter}. + * + * @param outputFile an OutputFile used to create an output stream. + * @param format a file format + * @param partition a tuple of partition values + * @return a newly created {@link EqualityDeleteWriter} for position deletes + */ + PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile outputFile, FileFormat format, StructLike partition); } diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java index 6009db38b858..184534fcc042 100644 --- a/core/src/test/java/org/apache/iceberg/TableTestBase.java +++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java @@ -50,7 +50,7 @@ public class TableTestBase { ); // Partition spec used to create tables - static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA) + protected static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA) .bucket("data", 16) .build(); @@ -104,8 +104,8 @@ public class TableTestBase { @Rule public TemporaryFolder temp = new TemporaryFolder(); - File tableDir = null; - File metadataDir = null; + protected File tableDir = null; + protected File metadataDir = null; public TestTables.TestTable table = null; protected final int formatVersion; @@ -143,7 +143,7 @@ List listManifestFiles(File tableDirToList) { !name.startsWith("snap") && Files.getFileExtension(name).equalsIgnoreCase("avro"))); } - TestTables.TestTable create(Schema schema, PartitionSpec spec) { + protected TestTables.TestTable create(Schema schema, PartitionSpec spec) { return TestTables.create(tableDir, "test", schema, spec, formatVersion); } diff --git a/core/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java b/core/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java new file mode 100644 index 000000000000..6fdadd6959a4 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Locale; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.TableTestBase; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.StructLikeSet; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public abstract class TestAppenderFactory extends TableTestBase { + private static final int FORMAT_V2 = 2; + + private final FileFormat format; + private final boolean partitioned; + + private PartitionKey partition = null; + + @Parameterized.Parameters(name = "FileFormat={0}, Partitioned={1}") + public static Object[] parameters() { + return new Object[][] { + new Object[] {"avro", false}, + new Object[] {"avro", true}, + new Object[] {"parquet", false}, + new Object[] {"parquet", true} + }; + } + + + public TestAppenderFactory(String fileFormat, boolean partitioned) { + super(FORMAT_V2); + this.format = FileFormat.valueOf(fileFormat.toUpperCase(Locale.ENGLISH)); + this.partitioned = partitioned; + } + + @Before + public void setupTable() throws Exception { + this.tableDir = temp.newFolder(); + Assert.assertTrue(tableDir.delete()); // created by table create + + this.metadataDir = new File(tableDir, "metadata"); + + if (partitioned) { + this.table = create(SCHEMA, SPEC); + } else { + this.table = create(SCHEMA, PartitionSpec.unpartitioned()); + } + this.partition = createPartitionKey(); + + table.updateProperties() + .defaultFormat(format) + .commit(); + } + + protected abstract FileAppenderFactory createAppenderFactory(List equalityFieldIds, + Schema eqDeleteSchema, + Schema posDeleteRowSchema); + + protected abstract T createRow(Integer id, String data); + + protected abstract StructLikeSet expectedRowSet(Iterable records) throws IOException; + + protected abstract StructLikeSet actualRowSet(String... columns) throws IOException; + + + private OutputFileFactory createFileFactory() { + return new OutputFileFactory(SPEC, format, table.locationProvider(), table.io(), + table.encryption(), 1, 1); + } + + private PartitionKey createPartitionKey() { + if (table.spec().isUnpartitioned()) { + return null; + } + + Record record = GenericRecord.create(table.spec().schema()) + .copy(ImmutableMap.of("data", "aaa")); + + PartitionKey partitionKey = new PartitionKey(table.spec(), table.schema()); + partitionKey.partition(record); + + return partitionKey; + } + + @Test + public void testDataWriter() throws IOException { + FileAppenderFactory appenderFactory = createAppenderFactory(null, null, null); + OutputFileFactory outputFileFactory = createFileFactory(); + + List records = Lists.newArrayList( + createRow(1, "aaa"), + createRow(2, "bbb"), + createRow(3, "ccc"), + createRow(4, "ddd"), + createRow(5, "eee") + ); + + DataWriter writer = appenderFactory.newDataWriter(outputFileFactory.newOutputFile(), format, partition); + try (DataWriter closeableWriter = writer) { + for (T record : records) { + closeableWriter.add(record); + } + } + + table.newRowDelta() + .addRows(writer.toDataFile()) + .commit(); + + Assert.assertEquals("Should have the expected records.", expectedRowSet(records), actualRowSet("*")); + } + + @Test + public void testEqDeleteWriter() throws IOException { + List equalityFieldIds = Lists.newArrayList(table.schema().findField("id").fieldId()); + FileAppenderFactory appenderFactory = + createAppenderFactory(equalityFieldIds, table.schema().select("id"), null); + OutputFileFactory outputFileFactory = createFileFactory(); + + List records = Lists.newArrayList( + createRow(1, "aaa"), + createRow(2, "bbb"), + createRow(3, "ccc"), + createRow(4, "ddd"), + createRow(5, "eee") + ); + + DataWriter writer = appenderFactory.newDataWriter(outputFileFactory.newOutputFile(), format, partition); + try (DataWriter closeableWriter = writer) { + for (T record : records) { + closeableWriter.add(record); + } + } + + table.newRowDelta() + .addRows(writer.toDataFile()) + .commit(); + + List deletes = Lists.newArrayList( + createRow(1, "aaa"), + createRow(3, "bbb"), + createRow(5, "ccc") + ); + EqualityDeleteWriter eqDeleteWriter = + appenderFactory.newEqDeleteWriter(outputFileFactory.newOutputFile(), format, partition); + try (EqualityDeleteWriter closeableWriter = eqDeleteWriter) { + closeableWriter.deleteAll(deletes); + } + + table.newRowDelta() + .addDeletes(eqDeleteWriter.toDeleteFile()) + .commit(); + + List expected = Lists.newArrayList( + createRow(2, "bbb"), + createRow(4, "ddd") + ); + Assert.assertEquals("Should have the expected records", expectedRowSet(expected), actualRowSet("*")); + } + + @Test + public void testPosDeleteWriter() throws IOException { + // Initialize FileAppenderFactory without pos-delete row schema. + FileAppenderFactory appenderFactory = createAppenderFactory(null, null, null); + OutputFileFactory outputFileFactory = createFileFactory(); + + List records = Lists.newArrayList( + createRow(1, "aaa"), + createRow(2, "bbb"), + createRow(3, "ccc"), + createRow(4, "ddd"), + createRow(5, "eee") + ); + + DataWriter writer = appenderFactory.newDataWriter(outputFileFactory.newOutputFile(), format, partition); + try (DataWriter closeableWriter = writer) { + for (T record : records) { + closeableWriter.add(record); + } + } + DataFile dataFile = writer.toDataFile(); + + List> deletes = Lists.newArrayList( + Pair.of(dataFile.path(), 0L), + Pair.of(dataFile.path(), 2L), + Pair.of(dataFile.path(), 4L) + ); + + PositionDeleteWriter eqDeleteWriter = + appenderFactory.newPosDeleteWriter(outputFileFactory.newOutputFile(), format, partition); + try (PositionDeleteWriter closeableWriter = eqDeleteWriter) { + for (Pair delete : deletes) { + closeableWriter.delete(delete.first(), delete.second()); + } + } + + table.newRowDelta() + .addRows(dataFile) + .addDeletes(eqDeleteWriter.toDeleteFile()) + .validateDataFilesExist(eqDeleteWriter.referencedDataFiles()) + .validateDeletedFiles() + .commit(); + + List expected = Lists.newArrayList( + createRow(2, "bbb"), + createRow(4, "ddd") + ); + Assert.assertEquals("Should have the expected records", expectedRowSet(expected), actualRowSet("*")); + } + + @Test + public void testPosDeleteWriterWithRowSchema() throws IOException { + FileAppenderFactory appenderFactory = createAppenderFactory(null, null, table.schema()); + OutputFileFactory outputFileFactory = createFileFactory(); + + List records = Lists.newArrayList( + createRow(1, "aaa"), + createRow(2, "bbb"), + createRow(3, "ccc"), + createRow(4, "ddd"), + createRow(5, "eee") + ); + + DataWriter writer = appenderFactory.newDataWriter(outputFileFactory.newOutputFile(), format, partition); + try (DataWriter closeableWriter = writer) { + for (T record : records) { + closeableWriter.add(record); + } + } + DataFile dataFile = writer.toDataFile(); + + List> deletes = Lists.newArrayList( + new PositionDelete().set(dataFile.path(), 0, records.get(0)), + new PositionDelete().set(dataFile.path(), 2, records.get(2)), + new PositionDelete().set(dataFile.path(), 4, records.get(4)) + ); + + PositionDeleteWriter eqDeleteWriter = + appenderFactory.newPosDeleteWriter(outputFileFactory.newOutputFile(), format, partition); + try (PositionDeleteWriter closeableWriter = eqDeleteWriter) { + for (PositionDelete delete : deletes) { + closeableWriter.delete(delete.path(), delete.pos(), delete.row()); + } + } + + table.newRowDelta() + .addRows(dataFile) + .addDeletes(eqDeleteWriter.toDeleteFile()) + .validateDataFilesExist(eqDeleteWriter.referencedDataFiles()) + .validateDeletedFiles() + .commit(); + + List expected = Lists.newArrayList( + createRow(2, "bbb"), + createRow(4, "ddd") + ); + Assert.assertEquals("Should have the expected records", expectedRowSet(expected), actualRowSet("*")); + } +} diff --git a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java index e5d2652bcb11..cc99af4952b9 100644 --- a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java +++ b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java @@ -24,11 +24,16 @@ import java.util.Map; import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.data.avro.DataWriter; import org.apache.iceberg.data.orc.GenericOrcWriter; import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.OutputFile; @@ -42,10 +47,25 @@ public class GenericAppenderFactory implements FileAppenderFactory { private final Schema schema; + private final PartitionSpec spec; + private final int[] equalityFieldIds; + private final Schema eqDeleteRowSchema; + private final Schema posDeleteRowSchema; private final Map config = Maps.newHashMap(); - public GenericAppenderFactory(Schema schema) { + public GenericAppenderFactory(Schema schema, PartitionSpec spec) { + this(schema, spec, null, schema, null); + } + + public GenericAppenderFactory(Schema schema, PartitionSpec spec, + int[] equalityFieldIds, + Schema eqDeleteRowSchema, + Schema posDeleteRowSchema) { this.schema = schema; + this.spec = spec; + this.equalityFieldIds = equalityFieldIds; + this.eqDeleteRowSchema = eqDeleteRowSchema; + this.posDeleteRowSchema = posDeleteRowSchema; } public GenericAppenderFactory set(String property, String value) { @@ -95,4 +115,88 @@ public FileAppender newAppender(OutputFile outputFile, FileFormat fileFo throw new UncheckedIOException(e); } } + + @Override + public org.apache.iceberg.io.DataWriter newDataWriter(EncryptedOutputFile file, FileFormat format, + StructLike partition) { + return new org.apache.iceberg.io.DataWriter<>( + newAppender(file.encryptingOutputFile(), format), format, + file.encryptingOutputFile().location(), spec, partition, file.keyMetadata()); + } + + @Override + public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile file, FileFormat format, + StructLike partition) { + MetricsConfig metricsConfig = MetricsConfig.fromProperties(config); + try { + switch (format) { + case AVRO: + return Avro.writeDeletes(file.encryptingOutputFile()) + .createWriterFunc(DataWriter::create) + .withPartition(partition) + .overwrite() + .setAll(config) + .rowSchema(eqDeleteRowSchema) + .withSpec(spec) + .withKeyMetadata(file.keyMetadata()) + .equalityFieldIds(equalityFieldIds) + .buildEqualityWriter(); + + case PARQUET: + return Parquet.writeDeletes(file.encryptingOutputFile()) + .createWriterFunc(GenericParquetWriter::buildWriter) + .withPartition(partition) + .overwrite() + .setAll(config) + .metricsConfig(metricsConfig) + .rowSchema(eqDeleteRowSchema) + .withSpec(spec) + .withKeyMetadata(file.keyMetadata()) + .equalityFieldIds(equalityFieldIds) + .buildEqualityWriter(); + + default: + throw new UnsupportedOperationException("Cannot write unknown file format: " + format); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile file, FileFormat format, + StructLike partition) { + MetricsConfig metricsConfig = MetricsConfig.fromProperties(config); + try { + switch (format) { + case AVRO: + return Avro.writeDeletes(file.encryptingOutputFile()) + .createWriterFunc(DataWriter::create) + .withPartition(partition) + .overwrite() + .setAll(config) + .rowSchema(posDeleteRowSchema) + .withSpec(spec) + .withKeyMetadata(file.keyMetadata()) + .buildPositionWriter(); + + case PARQUET: + return Parquet.writeDeletes(file.encryptingOutputFile()) + .createWriterFunc(GenericParquetWriter::buildWriter) + .withPartition(partition) + .overwrite() + .setAll(config) + .metricsConfig(metricsConfig) + .rowSchema(posDeleteRowSchema) + .withSpec(spec) + .withKeyMetadata(file.keyMetadata()) + .buildPositionWriter(); + + default: + throw new UnsupportedOperationException("Cannot write unknown file format: " + format); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } } diff --git a/data/src/test/java/org/apache/iceberg/TestGenericAppenderFactory.java b/data/src/test/java/org/apache/iceberg/TestGenericAppenderFactory.java new file mode 100644 index 000000000000..6c1456965d26 --- /dev/null +++ b/data/src/test/java/org/apache/iceberg/TestGenericAppenderFactory.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.IcebergGenerics; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.TestAppenderFactory; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.util.ArrayUtil; +import org.apache.iceberg.util.StructLikeSet; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestGenericAppenderFactory extends TestAppenderFactory { + + private final GenericRecord gRecord; + + public TestGenericAppenderFactory(String fileFormat, boolean partitioned) { + super(fileFormat, partitioned); + this.gRecord = GenericRecord.create(SCHEMA); + } + + @Override + protected FileAppenderFactory createAppenderFactory(List equalityFieldIds, + Schema eqDeleteSchema, + Schema posDeleteRowSchema) { + return new GenericAppenderFactory(table.schema(), table.spec(), ArrayUtil.toIntArray(equalityFieldIds), + eqDeleteSchema, posDeleteRowSchema); + } + + @Override + protected Record createRow(Integer id, String data) { + return gRecord.copy(ImmutableMap.of("id", id, "data", data)); + } + + @Override + protected StructLikeSet expectedRowSet(Iterable records) { + StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); + for (Record record : records) { + set.add(record); + } + return set; + } + + @Override + protected StructLikeSet actualRowSet(String... columns) throws IOException { + StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); + try (CloseableIterable reader = IcebergGenerics.read(table).select(columns).build()) { + reader.forEach(set::add); + } + return set; + } +} diff --git a/data/src/test/java/org/apache/iceberg/TestSplitScan.java b/data/src/test/java/org/apache/iceberg/TestSplitScan.java index 2f191f6160d7..920b80f9f37a 100644 --- a/data/src/test/java/org/apache/iceberg/TestSplitScan.java +++ b/data/src/test/java/org/apache/iceberg/TestSplitScan.java @@ -116,8 +116,8 @@ private File writeToFile(List records, FileFormat fileFormat) throws IOE File file = temp.newFile(); Assert.assertTrue(file.delete()); - GenericAppenderFactory factory = new GenericAppenderFactory(SCHEMA).set( - TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, String.valueOf(SPLIT_SIZE)); + GenericAppenderFactory factory = new GenericAppenderFactory(SCHEMA, PartitionSpec.unpartitioned()) + .set(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, String.valueOf(SPLIT_SIZE)); try (FileAppender appender = factory.newAppender(Files.localOutput(file), fileFormat)) { appender.addAll(records); } diff --git a/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java b/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java index c32be08263a9..55d778d0ba9d 100644 --- a/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java +++ b/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java @@ -27,6 +27,7 @@ import org.apache.iceberg.DataFiles; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Files; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.io.FileAppender; @@ -73,13 +74,13 @@ public DataFile writeFile(StructLike partition, List records) throws IOE Preconditions.checkNotNull(table, "table not set"); File file = tmp.newFile(); Assert.assertTrue(file.delete()); - return appendToLocalFile(table, file, fileFormat, partition, records); + return appendToLocalFile(table, file, fileFormat, partition, records, table.spec()); } private static DataFile appendToLocalFile( - Table table, File file, FileFormat format, StructLike partition, List records) + Table table, File file, FileFormat format, StructLike partition, List records, PartitionSpec spec) throws IOException { - FileAppender appender = new GenericAppenderFactory(table.schema()).newAppender( + FileAppender appender = new GenericAppenderFactory(table.schema(), spec).newAppender( Files.localOutput(file), format); try (FileAppender fileAppender = appender) { fileAppender.addAll(records); diff --git a/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java b/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java index 1cc74ba029ff..efa37bbae066 100644 --- a/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java +++ b/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java @@ -449,8 +449,9 @@ private DataFile writeFile(String location, String filename, Schema schema, List FileFormat fileFormat = FileFormat.fromFileName(filename); Preconditions.checkNotNull(fileFormat, "Cannot determine format for file: %s", filename); - FileAppender fileAppender = new GenericAppenderFactory(schema).newAppender( - fromPath(path, CONF), fileFormat); + FileAppender fileAppender = + new GenericAppenderFactory(schema, PartitionSpec.unpartitioned()) + .newAppender(fromPath(path, CONF), fileFormat); try (FileAppender appender = fileAppender) { appender.addAll(records); } diff --git a/data/src/test/java/org/apache/iceberg/parquet/TestParquetMergingMetrics.java b/data/src/test/java/org/apache/iceberg/parquet/TestParquetMergingMetrics.java index 03118e3a65f2..9297ee092794 100644 --- a/data/src/test/java/org/apache/iceberg/parquet/TestParquetMergingMetrics.java +++ b/data/src/test/java/org/apache/iceberg/parquet/TestParquetMergingMetrics.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.List; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.TestMergingMetrics; import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.Record; @@ -31,8 +32,8 @@ public class TestParquetMergingMetrics extends TestMergingMetrics { @Override protected FileAppender writeAndGetAppender(List records) throws IOException { - FileAppender appender = new GenericAppenderFactory(SCHEMA).newAppender( - org.apache.iceberg.Files.localOutput(temp.newFile()), FileFormat.PARQUET); + FileAppender appender = new GenericAppenderFactory(SCHEMA, PartitionSpec.unpartitioned()) + .newAppender(org.apache.iceberg.Files.localOutput(temp.newFile()), FileFormat.PARQUET); try (FileAppender fileAppender = appender) { records.forEach(fileAppender::add); } diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java new file mode 100644 index 000000000000..f30f1e8716e6 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.sink; + +import java.io.IOException; +import java.io.Serializable; +import java.io.UncheckedIOException; +import java.util.Map; +import java.util.function.Function; +import org.apache.avro.io.DatumWriter; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.data.FlinkAvroWriter; +import org.apache.iceberg.flink.data.FlinkOrcWriter; +import org.apache.iceberg.flink.data.FlinkParquetWriters; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.parquet.Parquet; + +public class FlinkAppenderFactory implements FileAppenderFactory, Serializable { + private final Schema schema; + private final RowType flinkSchema; + private final Map props; + private final PartitionSpec spec; + private final int[] equalityFieldIds; + private final Schema eqDeleteRowSchema; + private final Schema posDeleteRowSchema; + + public FlinkAppenderFactory(Schema schema, RowType flinkSchema, Map props, PartitionSpec spec) { + this(schema, flinkSchema, props, spec, null, schema, null); + } + + public FlinkAppenderFactory(Schema schema, RowType flinkSchema, Map props, + PartitionSpec spec, int[] equalityFieldIds, + Schema eqDeleteRowSchema, Schema posDeleteRowSchema) { + this.schema = schema; + this.flinkSchema = flinkSchema; + this.props = props; + this.spec = spec; + this.equalityFieldIds = equalityFieldIds; + this.eqDeleteRowSchema = eqDeleteRowSchema; + this.posDeleteRowSchema = posDeleteRowSchema; + } + + @Override + public FileAppender newAppender(OutputFile outputFile, FileFormat format) { + MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); + try { + switch (format) { + case AVRO: + return Avro.write(outputFile) + .createWriterFunc(ignore -> new FlinkAvroWriter(flinkSchema)) + .setAll(props) + .schema(schema) + .overwrite() + .build(); + + case ORC: + return ORC.write(outputFile) + .createWriterFunc((iSchema, typDesc) -> FlinkOrcWriter.buildWriter(flinkSchema, iSchema)) + .setAll(props) + .schema(schema) + .overwrite() + .build(); + + case PARQUET: + return Parquet.write(outputFile) + .createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(flinkSchema, msgType)) + .setAll(props) + .metricsConfig(metricsConfig) + .schema(schema) + .overwrite() + .build(); + + default: + throw new UnsupportedOperationException("Cannot write unknown file format: " + format); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public DataWriter newDataWriter(EncryptedOutputFile file, FileFormat format, StructLike partition) { + return new DataWriter<>( + newAppender(file.encryptingOutputFile(), format), format, + file.encryptingOutputFile().location(), spec, partition, file.keyMetadata()); + } + + @Override + public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile outputFile, FileFormat format, + StructLike partition) { + MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); + try { + switch (format) { + case AVRO: + return Avro.writeDeletes(outputFile.encryptingOutputFile()) + .createWriterFunc(ignore -> new FlinkAvroWriter(flinkSchema)) + .withPartition(partition) + .overwrite() + .setAll(props) + .rowSchema(eqDeleteRowSchema) + .withSpec(spec) + .withKeyMetadata(outputFile.keyMetadata()) + .equalityFieldIds(equalityFieldIds) + .buildEqualityWriter(); + + case PARQUET: + return Parquet.writeDeletes(outputFile.encryptingOutputFile()) + .createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(flinkSchema, msgType)) + .withPartition(partition) + .overwrite() + .setAll(props) + .metricsConfig(metricsConfig) + .rowSchema(eqDeleteRowSchema) + .withSpec(spec) + .withKeyMetadata(outputFile.keyMetadata()) + .equalityFieldIds(equalityFieldIds) + .buildEqualityWriter(); + + default: + throw new UnsupportedOperationException("Cannot write unknown file format: " + format); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile outputFile, FileFormat format, + StructLike partition) { + MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); + try { + switch (format) { + case AVRO: + Function> writeFunc = null; + if (posDeleteRowSchema != null) { + writeFunc = ignore -> new FlinkAvroWriter(FlinkSchemaUtil.convert(posDeleteRowSchema)); + } + + return Avro.writeDeletes(outputFile.encryptingOutputFile()) + .createWriterFunc(writeFunc) + .withPartition(partition) + .overwrite() + .setAll(props) + .rowSchema(posDeleteRowSchema) + .withSpec(spec) + .withKeyMetadata(outputFile.keyMetadata()) + .buildPositionWriter(); + + case PARQUET: + return Parquet.writeDeletes(outputFile.encryptingOutputFile()) + .createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(flinkSchema, msgType)) + .withPartition(partition) + .overwrite() + .setAll(props) + .metricsConfig(metricsConfig) + .rowSchema(posDeleteRowSchema) + .withSpec(spec) + .withKeyMetadata(outputFile.keyMetadata()) + .buildPositionWriter(); + + default: + throw new UnsupportedOperationException("Cannot write unknown file format: " + format); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java b/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java index 72e7a7941b77..c409e81fbc13 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java @@ -19,33 +19,21 @@ package org.apache.iceberg.flink.sink; -import java.io.IOException; -import java.io.Serializable; -import java.io.UncheckedIOException; import java.util.Map; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; -import org.apache.iceberg.avro.Avro; import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.flink.RowDataWrapper; -import org.apache.iceberg.flink.data.FlinkAvroWriter; -import org.apache.iceberg.flink.data.FlinkOrcWriter; -import org.apache.iceberg.flink.data.FlinkParquetWriters; -import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.LocationProvider; -import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.io.UnpartitionedWriter; -import org.apache.iceberg.orc.ORC; -import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; public class RowDataTaskWriterFactory implements TaskWriterFactory { @@ -78,7 +66,7 @@ public RowDataTaskWriterFactory(Schema schema, this.encryptionManager = encryptionManager; this.targetFileSizeBytes = targetFileSizeBytes; this.format = format; - this.appenderFactory = new FlinkFileAppenderFactory(schema, flinkSchema, tableProperties); + this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, tableProperties, spec); } @Override @@ -118,54 +106,4 @@ protected PartitionKey partition(RowData row) { return partitionKey; } } - - public static class FlinkFileAppenderFactory implements FileAppenderFactory, Serializable { - private final Schema schema; - private final RowType flinkSchema; - private final Map props; - - public FlinkFileAppenderFactory(Schema schema, RowType flinkSchema, Map props) { - this.schema = schema; - this.flinkSchema = flinkSchema; - this.props = props; - } - - @Override - public FileAppender newAppender(OutputFile outputFile, FileFormat format) { - MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); - try { - switch (format) { - case AVRO: - return Avro.write(outputFile) - .createWriterFunc(ignore -> new FlinkAvroWriter(flinkSchema)) - .setAll(props) - .schema(schema) - .overwrite() - .build(); - - case ORC: - return ORC.write(outputFile) - .createWriterFunc((iSchema, typDesc) -> FlinkOrcWriter.buildWriter(flinkSchema, iSchema)) - .setAll(props) - .schema(schema) - .overwrite() - .build(); - - case PARQUET: - return Parquet.write(outputFile) - .createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(flinkSchema, msgType)) - .setAll(props) - .metricsConfig(metricsConfig) - .schema(schema) - .overwrite() - .build(); - - default: - throw new UnsupportedOperationException("Cannot write unknown file format: " + format); - } - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - } } diff --git a/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java index dc14fb683a91..c47e3e30850b 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java +++ b/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java @@ -39,7 +39,7 @@ import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.IcebergGenerics; import org.apache.iceberg.data.Record; -import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory; +import org.apache.iceberg.flink.sink.FlinkAppenderFactory; import org.apache.iceberg.hadoop.HadoopInputFile; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.CloseableIterable; @@ -102,8 +102,8 @@ public static DataFile writeFile(Schema schema, PartitionSpec spec, Configuratio Preconditions.checkNotNull(fileFormat, "Cannot determine format for file: %s", filename); RowType flinkSchema = FlinkSchemaUtil.convert(schema); - FileAppenderFactory appenderFactory = new RowDataTaskWriterFactory.FlinkFileAppenderFactory(schema, - flinkSchema, ImmutableMap.of()); + FileAppenderFactory appenderFactory = + new FlinkAppenderFactory(schema, flinkSchema, ImmutableMap.of(), spec); FileAppender appender = appenderFactory.newAppender(fromPath(path, conf), fileFormat); try (FileAppender closeableAppender = appender) { diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkAppenderFactory.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkAppenderFactory.java new file mode 100644 index 000000000000..e69b580f795e --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkAppenderFactory.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.sink; + +import java.io.IOException; +import java.util.List; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.IcebergGenerics; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.RowDataWrapper; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.TestAppenderFactory; +import org.apache.iceberg.util.ArrayUtil; +import org.apache.iceberg.util.StructLikeSet; + +public class TestFlinkAppenderFactory extends TestAppenderFactory { + + private final RowType rowType; + + public TestFlinkAppenderFactory(String fileFormat, boolean partitioned) { + super(fileFormat, partitioned); + this.rowType = FlinkSchemaUtil.convert(SCHEMA); + } + + @Override + protected FileAppenderFactory createAppenderFactory(List equalityFieldIds, + Schema eqDeleteSchema, + Schema posDeleteRowSchema) { + return new FlinkAppenderFactory(table.schema(), rowType, table.properties(), table.spec(), + ArrayUtil.toIntArray(equalityFieldIds), eqDeleteSchema, posDeleteRowSchema); + } + + @Override + protected RowData createRow(Integer id, String data) { + return SimpleDataUtil.createRowData(id, data); + } + + @Override + protected StructLikeSet expectedRowSet(Iterable rows) { + StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); + for (RowData row : rows) { + RowDataWrapper wrapper = new RowDataWrapper(rowType, table.schema().asStruct()); + set.add(wrapper.wrap(row)); + } + return set; + } + + @Override + protected StructLikeSet actualRowSet(String... columns) throws IOException { + StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); + try (CloseableIterable reader = IcebergGenerics.read(table).select(columns).build()) { + reader.forEach(set::add); + } + return set; + } +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java index b116faff23d1..cd0a89ba7c76 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java +++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java @@ -20,17 +20,18 @@ package org.apache.iceberg.flink.source; import java.io.IOException; -import java.util.HashMap; import java.util.List; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.TestMergingMetrics; import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.RowDataConverter; -import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory; +import org.apache.iceberg.flink.sink.FlinkAppenderFactory; import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; public class TestFlinkMergingMetrics extends TestMergingMetrics { @@ -39,8 +40,8 @@ protected FileAppender writeAndGetAppender(List records) throws RowType flinkSchema = FlinkSchemaUtil.convert(SCHEMA); FileAppender appender = - new RowDataTaskWriterFactory.FlinkFileAppenderFactory(SCHEMA, flinkSchema, new HashMap<>()).newAppender( - org.apache.iceberg.Files.localOutput(temp.newFile()), FileFormat.PARQUET); + new FlinkAppenderFactory(SCHEMA, flinkSchema, ImmutableMap.of(), PartitionSpec.unpartitioned()) + .newAppender(org.apache.iceberg.Files.localOutput(temp.newFile()), FileFormat.PARQUET); try (FileAppender fileAppender = appender) { records.stream().map(r -> RowDataConverter.convert(SCHEMA, r)).forEach(fileAppender::add); } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index fc35adc63f18..f2cd9e50d1e3 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -32,7 +32,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Files; -import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -49,6 +48,7 @@ import org.apache.iceberg.hadoop.HadoopInputFile; import org.apache.iceberg.hadoop.HadoopOutputFile; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.DeleteSchemaUtil; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; @@ -58,7 +58,6 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.util.ArrayUtil; import org.apache.parquet.HadoopReadOptions; import org.apache.parquet.ParquetReadOptions; @@ -387,12 +386,7 @@ public PositionDeleteWriter buildPositionWriter() throws IOException { if (rowSchema != null && createWriterFunc != null) { // the appender uses the row schema wrapped with position fields - appenderBuilder.schema(new org.apache.iceberg.Schema( - MetadataColumns.DELETE_FILE_PATH, - MetadataColumns.DELETE_FILE_POS, - NestedField.optional( - MetadataColumns.DELETE_FILE_ROW_FIELD_ID, "row", rowSchema.asStruct(), - MetadataColumns.DELETE_FILE_ROW_DOC))); + appenderBuilder.schema(DeleteSchemaUtil.posDeleteSchema(rowSchema)); appenderBuilder.createWriterFunc(parquetSchema -> { ParquetValueWriter writer = createWriterFunc.apply(parquetSchema); @@ -404,9 +398,7 @@ public PositionDeleteWriter buildPositionWriter() throws IOException { }); } else { - appenderBuilder.schema(new org.apache.iceberg.Schema( - MetadataColumns.DELETE_FILE_PATH, - MetadataColumns.DELETE_FILE_POS)); + appenderBuilder.schema(DeleteSchemaUtil.pathPosSchema()); appenderBuilder.createWriterFunc(parquetSchema -> new PositionDeleteStructWriter((StructWriter) GenericParquetWriter.buildWriter(parquetSchema))); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java index c17a2ffa173d..0a007a8ef42a 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java @@ -154,7 +154,7 @@ public void writeInteger(int repetitionLevel, int value) { column.writeInteger(repetitionLevel, value); } - public void writeLong(int repetitionLevel, long value) { + public void writeLong(int repetitionLevel, long value) { column.writeLong(repetitionLevel, value); } @@ -561,8 +561,9 @@ public Stream metrics() { } } - public static class PositionDeleteStructWriter extends StructWriter> { - public PositionDeleteStructWriter(StructWriter replacedWriter) { + static class PositionDeleteStructWriter extends StructWriter> { + + PositionDeleteStructWriter(StructWriter replacedWriter) { super(Arrays.asList(replacedWriter.writers)); } diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java index 37ca56c700a4..6393061a6cd8 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java @@ -98,7 +98,7 @@ private List rewriteDataForTask(CombinedScanTask task) throws Exceptio task, schema, schema, nameMapping, io.value(), encryptionManager.value(), caseSensitive); StructType structType = SparkSchemaUtil.convert(schema); - SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, schema, structType); + SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, schema, structType, spec); OutputFileFactory fileFactory = new OutputFileFactory( spec, format, locations, io.value(), encryptionManager.value(), partitionId, taskId); diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java index 3383fe7c29f7..025ff96e123b 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java @@ -20,12 +20,19 @@ package org.apache.iceberg.spark.source; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Map; import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.io.DataWriter; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.OutputFile; @@ -41,11 +48,24 @@ class SparkAppenderFactory implements FileAppenderFactory { private final Map properties; private final Schema writeSchema; private final StructType dsSchema; + private final PartitionSpec spec; + private final int[] equalityFieldIds; + private final Schema eqDeleteRowSchema; + private final Schema posDeleteRowSchema; - SparkAppenderFactory(Map properties, Schema writeSchema, StructType dsSchema) { + SparkAppenderFactory(Map properties, Schema writeSchema, StructType dsSchema, PartitionSpec spec) { + this(properties, writeSchema, dsSchema, spec, null, null, null); + } + + SparkAppenderFactory(Map properties, Schema writeSchema, StructType dsSchema, PartitionSpec spec, + int[] equalityFieldIds, Schema eqDeleteRowSchema, Schema posDeleteRowSchema) { this.properties = properties; this.writeSchema = writeSchema; this.dsSchema = dsSchema; + this.spec = spec; + this.equalityFieldIds = equalityFieldIds; + this.eqDeleteRowSchema = eqDeleteRowSchema; + this.posDeleteRowSchema = posDeleteRowSchema; } @Override @@ -86,4 +106,79 @@ public FileAppender newAppender(OutputFile file, FileFormat fileFor throw new RuntimeIOException(e); } } + + @Override + public DataWriter newDataWriter(EncryptedOutputFile file, FileFormat format, StructLike partition) { + return new DataWriter<>(newAppender(file.encryptingOutputFile(), format), format, + file.encryptingOutputFile().location(), spec, partition, file.keyMetadata()); + } + + @Override + public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile file, FileFormat format, + StructLike partition) { + try { + switch (format) { + case PARQUET: + return Parquet.writeDeletes(file.encryptingOutputFile()) + .createWriterFunc(msgType -> SparkParquetWriters.buildWriter(dsSchema, msgType)) + .overwrite() + .rowSchema(eqDeleteRowSchema) + .withSpec(spec) + .withPartition(partition) + .equalityFieldIds(equalityFieldIds) + .withKeyMetadata(file.keyMetadata()) + .buildEqualityWriter(); + + case AVRO: + return Avro.writeDeletes(file.encryptingOutputFile()) + .createWriterFunc(ignored -> new SparkAvroWriter(dsSchema)) + .overwrite() + .rowSchema(eqDeleteRowSchema) + .withSpec(spec) + .withPartition(partition) + .equalityFieldIds(equalityFieldIds) + .withKeyMetadata(file.keyMetadata()) + .buildEqualityWriter(); + + default: + throw new UnsupportedOperationException("Cannot write unsupported format: " + format); + } + } catch (IOException e) { + throw new UncheckedIOException("Failed to create new equality delete writer", e); + } + } + + @Override + public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile file, FileFormat format, + StructLike partition) { + try { + switch (format) { + case PARQUET: + return Parquet.writeDeletes(file.encryptingOutputFile()) + .createWriterFunc(msgType -> SparkParquetWriters.buildWriter(dsSchema, msgType)) + .overwrite() + .rowSchema(posDeleteRowSchema) + .withSpec(spec) + .withPartition(partition) + .withKeyMetadata(file.keyMetadata()) + .buildPositionWriter(); + + case AVRO: + return Avro.writeDeletes(file.encryptingOutputFile()) + .createWriterFunc(ignored -> new SparkAvroWriter(dsSchema)) + .overwrite() + .rowSchema(posDeleteRowSchema) + .withSpec(spec) + .withPartition(partition) + .withKeyMetadata(file.keyMetadata()) + .buildPositionWriter(); + + default: + throw new UnsupportedOperationException("Cannot write unsupported format: " + format); + } + + } catch (IOException e) { + throw new UncheckedIOException("Failed to create new equality delete writer", e); + } + } } diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java new file mode 100644 index 000000000000..ae2b04779280 --- /dev/null +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.IcebergGenerics; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.TestAppenderFactory; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.util.ArrayUtil; +import org.apache.iceberg.util.StructLikeSet; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.UTF8String; + +public class TestSparkAppenderFactory extends TestAppenderFactory { + + private final StructType sparkType; + + public TestSparkAppenderFactory(String fileFormat, boolean partitioned) { + super(fileFormat, partitioned); + this.sparkType = SparkSchemaUtil.convert(SCHEMA); + } + + @Override + protected FileAppenderFactory createAppenderFactory(List equalityFieldIds, + Schema eqDeleteSchema, + Schema posDeleteRowSchema) { + return new SparkAppenderFactory(table.properties(), table.schema(), sparkType, table.spec(), + ArrayUtil.toIntArray(equalityFieldIds), eqDeleteSchema, posDeleteRowSchema); + } + + @Override + protected InternalRow createRow(Integer id, String data) { + InternalRow row = new GenericInternalRow(2); + row.update(0, id); + row.update(1, UTF8String.fromString(data)); + return row; + } + + @Override + protected StructLikeSet expectedRowSet(Iterable rows) { + StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); + for (InternalRow row : rows) { + InternalRowWrapper wrapper = new InternalRowWrapper(sparkType); + set.add(wrapper.wrap(row)); + } + return set; + } + + @Override + protected StructLikeSet actualRowSet(String... columns) throws IOException { + StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); + try (CloseableIterable reader = IcebergGenerics.read(table).select(columns).build()) { + reader.forEach(set::add); + } + return set; + } +} diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkParquetMergingMetrics.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkParquetMergingMetrics.java index e49a18be5338..f1215056243b 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkParquetMergingMetrics.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkParquetMergingMetrics.java @@ -20,12 +20,13 @@ package org.apache.iceberg.spark.source; import java.io.IOException; -import java.util.HashMap; import java.util.List; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.TestMergingMetrics; import org.apache.iceberg.data.Record; import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.spark.sql.catalyst.InternalRow; @@ -34,8 +35,9 @@ public class TestSparkParquetMergingMetrics extends TestMergingMetrics writeAndGetAppender(List records) throws IOException { FileAppender appender = - new SparkAppenderFactory(new HashMap<>(), SCHEMA, SparkSchemaUtil.convert(SCHEMA)).newAppender( - org.apache.iceberg.Files.localOutput(temp.newFile()), FileFormat.PARQUET); + new SparkAppenderFactory(ImmutableMap.of(), SCHEMA, SparkSchemaUtil.convert(SCHEMA), + PartitionSpec.unpartitioned()) + .newAppender(org.apache.iceberg.Files.localOutput(temp.newFile()), FileFormat.PARQUET); try (FileAppender fileAppender = appender) { records.stream().map(r -> new StructInternalRow(SCHEMA.asStruct()).setStruct(r)).forEach(fileAppender::add); } diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java index b3c8e3134490..804ad9c94680 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java @@ -106,8 +106,8 @@ protected Record writeAndRead(String desc, Schema writeSchema, Schema readSchema // When tables are created, the column ids are reassigned. Schema tableSchema = table.schema(); - try (FileAppender writer = new GenericAppenderFactory(tableSchema).newAppender( - localOutput(testFile), format)) { + try (FileAppender writer = new GenericAppenderFactory(tableSchema, table.spec()) + .newAppender(localOutput(testFile), format)) { writer.add(record); } diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java b/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java index 6b6739a06a25..65b123b1e831 100644 --- a/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java +++ b/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java @@ -266,7 +266,7 @@ static class WriterFactory implements DataWriterFactory { public DataWriter createDataWriter(int partitionId, long taskId, long epochId) { OutputFileFactory fileFactory = new OutputFileFactory( spec, format, locations, io.value(), encryptionManager.value(), partitionId, taskId); - SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, writeSchema, dsSchema); + SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, writeSchema, dsSchema, spec); if (spec.fields().isEmpty()) { return new Unpartitioned24Writer(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize); diff --git a/spark2/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java index 60d044aed801..3d8ffc555dd0 100644 --- a/spark2/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java +++ b/spark2/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java @@ -186,8 +186,8 @@ public void writeUnpartitionedTable() throws IOException { // create records using the table's schema this.records = testRecords(tableSchema); - try (FileAppender writer = new GenericAppenderFactory(tableSchema).newAppender( - localOutput(testFile), fileFormat)) { + try (FileAppender writer = new GenericAppenderFactory(tableSchema, table.spec()) + .newAppender(localOutput(testFile), fileFormat)) { writer.addAll(records); } diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index efbc319197f0..fd6b611aefbd 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -416,7 +416,7 @@ public DataWriter createWriter(int partitionId, long taskId) { public DataWriter createWriter(int partitionId, long taskId, long epochId) { OutputFileFactory fileFactory = new OutputFileFactory( spec, format, locations, io.value(), encryptionManager.value(), partitionId, taskId); - SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, writeSchema, dsSchema); + SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, writeSchema, dsSchema, spec); if (spec.fields().isEmpty()) { return new Unpartitioned3Writer(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize); } else { diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java index 5824dca7a4e1..c3bf1606676a 100644 --- a/spark3/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java @@ -183,8 +183,8 @@ public void writeUnpartitionedTable() throws IOException { this.records = testRecords(tableSchema); - try (FileAppender writer = new GenericAppenderFactory(tableSchema).newAppender( - localOutput(testFile), fileFormat)) { + try (FileAppender writer = new GenericAppenderFactory(tableSchema, table.spec()) + .newAppender(localOutput(testFile), fileFormat)) { writer.addAll(records); } From c1a124772f4e48fd38ea8b81a316464712057b09 Mon Sep 17 00:00:00 2001 From: huzheng Date: Fri, 27 Nov 2020 12:59:12 +0800 Subject: [PATCH 2/8] Minor fixes --- .../apache/iceberg/io/DeleteSchemaUtil.java | 2 +- .../iceberg/io/TestAppenderFactory.java | 98 +++++++------------ .../iceberg/TestGenericAppenderFactory.java | 4 +- .../iceberg/data/GenericAppenderHelper.java | 5 +- .../flink/sink/FlinkAppenderFactory.java | 7 +- .../iceberg/parquet/ParquetValueWriters.java | 7 +- 6 files changed, 42 insertions(+), 81 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/io/DeleteSchemaUtil.java b/core/src/main/java/org/apache/iceberg/io/DeleteSchemaUtil.java index 3da96d765643..cfab6eda404b 100644 --- a/core/src/main/java/org/apache/iceberg/io/DeleteSchemaUtil.java +++ b/core/src/main/java/org/apache/iceberg/io/DeleteSchemaUtil.java @@ -31,7 +31,7 @@ public static Schema pathPosSchema(Schema rowSchema) { return new Schema( MetadataColumns.DELETE_FILE_PATH, MetadataColumns.DELETE_FILE_POS, - Types.NestedField.optional( + Types.NestedField.required( MetadataColumns.DELETE_FILE_ROW_FIELD_ID, "row", rowSchema.asStruct(), MetadataColumns.DELETE_FILE_ROW_DOC)); } diff --git a/core/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java b/core/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java index 6fdadd6959a4..151195ec318c 100644 --- a/core/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java +++ b/core/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java @@ -99,9 +99,8 @@ protected abstract FileAppenderFactory createAppenderFactory(List eq protected abstract StructLikeSet actualRowSet(String... columns) throws IOException; - private OutputFileFactory createFileFactory() { - return new OutputFileFactory(SPEC, format, table.locationProvider(), table.io(), + return new OutputFileFactory(table.spec(), format, table.locationProvider(), table.io(), table.encryption(), 1, 1); } @@ -110,8 +109,7 @@ private PartitionKey createPartitionKey() { return null; } - Record record = GenericRecord.create(table.spec().schema()) - .copy(ImmutableMap.of("data", "aaa")); + Record record = GenericRecord.create(table.spec().schema()).copy(ImmutableMap.of("data", "aaa")); PartitionKey partitionKey = new PartitionKey(table.spec(), table.schema()); partitionKey.partition(record); @@ -119,57 +117,55 @@ private PartitionKey createPartitionKey() { return partitionKey; } - @Test - public void testDataWriter() throws IOException { - FileAppenderFactory appenderFactory = createAppenderFactory(null, null, null); - OutputFileFactory outputFileFactory = createFileFactory(); - - List records = Lists.newArrayList( + private List testRowSet() { + return Lists.newArrayList( createRow(1, "aaa"), createRow(2, "bbb"), createRow(3, "ccc"), createRow(4, "ddd"), createRow(5, "eee") ); + } + private DataFile prepareDataFile(List rowSet, FileAppenderFactory appenderFactory, + OutputFileFactory outputFileFactory) throws IOException { DataWriter writer = appenderFactory.newDataWriter(outputFileFactory.newOutputFile(), format, partition); try (DataWriter closeableWriter = writer) { - for (T record : records) { - closeableWriter.add(record); + for (T row : rowSet) { + closeableWriter.add(row); } } + return writer.toDataFile(); + } + + @Test + public void testDataWriter() throws IOException { + FileAppenderFactory appenderFactory = createAppenderFactory(null, null, null); + OutputFileFactory outputFileFactory = createFileFactory(); + + List rowSet = testRowSet(); + DataFile dataFile = prepareDataFile(rowSet, appenderFactory, outputFileFactory); + table.newRowDelta() - .addRows(writer.toDataFile()) + .addRows(dataFile) .commit(); - Assert.assertEquals("Should have the expected records.", expectedRowSet(records), actualRowSet("*")); + Assert.assertEquals("Should have the expected records.", expectedRowSet(rowSet), actualRowSet("*")); } @Test public void testEqDeleteWriter() throws IOException { List equalityFieldIds = Lists.newArrayList(table.schema().findField("id").fieldId()); - FileAppenderFactory appenderFactory = - createAppenderFactory(equalityFieldIds, table.schema().select("id"), null); + FileAppenderFactory appenderFactory = createAppenderFactory(equalityFieldIds, + table.schema().select("id"), null); OutputFileFactory outputFileFactory = createFileFactory(); - List records = Lists.newArrayList( - createRow(1, "aaa"), - createRow(2, "bbb"), - createRow(3, "ccc"), - createRow(4, "ddd"), - createRow(5, "eee") - ); - - DataWriter writer = appenderFactory.newDataWriter(outputFileFactory.newOutputFile(), format, partition); - try (DataWriter closeableWriter = writer) { - for (T record : records) { - closeableWriter.add(record); - } - } + List rowSet = testRowSet(); + DataFile dataFile = prepareDataFile(rowSet, appenderFactory, outputFileFactory); table.newRowDelta() - .addRows(writer.toDataFile()) + .addRows(dataFile) .commit(); List deletes = Lists.newArrayList( @@ -200,21 +196,8 @@ public void testPosDeleteWriter() throws IOException { FileAppenderFactory appenderFactory = createAppenderFactory(null, null, null); OutputFileFactory outputFileFactory = createFileFactory(); - List records = Lists.newArrayList( - createRow(1, "aaa"), - createRow(2, "bbb"), - createRow(3, "ccc"), - createRow(4, "ddd"), - createRow(5, "eee") - ); - - DataWriter writer = appenderFactory.newDataWriter(outputFileFactory.newOutputFile(), format, partition); - try (DataWriter closeableWriter = writer) { - for (T record : records) { - closeableWriter.add(record); - } - } - DataFile dataFile = writer.toDataFile(); + List rowSet = testRowSet(); + DataFile dataFile = prepareDataFile(rowSet, appenderFactory, outputFileFactory); List> deletes = Lists.newArrayList( Pair.of(dataFile.path(), 0L), @@ -249,26 +232,13 @@ public void testPosDeleteWriterWithRowSchema() throws IOException { FileAppenderFactory appenderFactory = createAppenderFactory(null, null, table.schema()); OutputFileFactory outputFileFactory = createFileFactory(); - List records = Lists.newArrayList( - createRow(1, "aaa"), - createRow(2, "bbb"), - createRow(3, "ccc"), - createRow(4, "ddd"), - createRow(5, "eee") - ); - - DataWriter writer = appenderFactory.newDataWriter(outputFileFactory.newOutputFile(), format, partition); - try (DataWriter closeableWriter = writer) { - for (T record : records) { - closeableWriter.add(record); - } - } - DataFile dataFile = writer.toDataFile(); + List rowSet = testRowSet(); + DataFile dataFile = prepareDataFile(rowSet, appenderFactory, outputFileFactory); List> deletes = Lists.newArrayList( - new PositionDelete().set(dataFile.path(), 0, records.get(0)), - new PositionDelete().set(dataFile.path(), 2, records.get(2)), - new PositionDelete().set(dataFile.path(), 4, records.get(4)) + new PositionDelete().set(dataFile.path(), 0, rowSet.get(0)), + new PositionDelete().set(dataFile.path(), 2, rowSet.get(2)), + new PositionDelete().set(dataFile.path(), 4, rowSet.get(4)) ); PositionDeleteWriter eqDeleteWriter = diff --git a/data/src/test/java/org/apache/iceberg/TestGenericAppenderFactory.java b/data/src/test/java/org/apache/iceberg/TestGenericAppenderFactory.java index 6c1456965d26..846b896fa543 100644 --- a/data/src/test/java/org/apache/iceberg/TestGenericAppenderFactory.java +++ b/data/src/test/java/org/apache/iceberg/TestGenericAppenderFactory.java @@ -60,9 +60,7 @@ protected Record createRow(Integer id, String data) { @Override protected StructLikeSet expectedRowSet(Iterable records) { StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); - for (Record record : records) { - set.add(record); - } + records.forEach(set::add); return set; } diff --git a/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java b/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java index 55d778d0ba9d..ddf8ce4784bb 100644 --- a/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java +++ b/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java @@ -77,9 +77,8 @@ public DataFile writeFile(StructLike partition, List records) throws IOE return appendToLocalFile(table, file, fileFormat, partition, records, table.spec()); } - private static DataFile appendToLocalFile( - Table table, File file, FileFormat format, StructLike partition, List records, PartitionSpec spec) - throws IOException { + private static DataFile appendToLocalFile(Table table, File file, FileFormat format, StructLike partition, + List records, PartitionSpec spec) throws IOException { FileAppender appender = new GenericAppenderFactory(table.schema(), spec).newAppender( Files.localOutput(file), format); try (FileAppender fileAppender = appender) { diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java index f30f1e8716e6..8f4482e6d32c 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java @@ -163,13 +163,8 @@ public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile outp try { switch (format) { case AVRO: - Function> writeFunc = null; - if (posDeleteRowSchema != null) { - writeFunc = ignore -> new FlinkAvroWriter(FlinkSchemaUtil.convert(posDeleteRowSchema)); - } - return Avro.writeDeletes(outputFile.encryptingOutputFile()) - .createWriterFunc(writeFunc) + .createWriterFunc(ignore -> new FlinkAvroWriter(FlinkSchemaUtil.convert(posDeleteRowSchema))) .withPartition(partition) .overwrite() .setAll(props) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java index 0a007a8ef42a..c17a2ffa173d 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java @@ -154,7 +154,7 @@ public void writeInteger(int repetitionLevel, int value) { column.writeInteger(repetitionLevel, value); } - public void writeLong(int repetitionLevel, long value) { + public void writeLong(int repetitionLevel, long value) { column.writeLong(repetitionLevel, value); } @@ -561,9 +561,8 @@ public Stream metrics() { } } - static class PositionDeleteStructWriter extends StructWriter> { - - PositionDeleteStructWriter(StructWriter replacedWriter) { + public static class PositionDeleteStructWriter extends StructWriter> { + public PositionDeleteStructWriter(StructWriter replacedWriter) { super(Arrays.asList(replacedWriter.writers)); } From d6cebc65cb8b2e9ede9feb37dd54cdcc880ab45c Mon Sep 17 00:00:00 2001 From: huzheng Date: Fri, 27 Nov 2020 14:46:56 +0800 Subject: [PATCH 3/8] Fix broken unit tests. --- .../flink/sink/FlinkAppenderFactory.java | 50 ++++++++++++++++--- .../org/apache/iceberg/parquet/Parquet.java | 13 +++-- .../iceberg/parquet/ParquetValueWriters.java | 29 +++++++++-- .../spark/source/SparkAppenderFactory.java | 50 +++++++++++++++++-- 4 files changed, 123 insertions(+), 19 deletions(-) diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java index 8f4482e6d32c..158086be2b75 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java @@ -23,9 +23,8 @@ import java.io.Serializable; import java.io.UncheckedIOException; import java.util.Map; -import java.util.function.Function; -import org.apache.avro.io.DatumWriter; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; import org.apache.flink.table.types.logical.RowType; import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetricsConfig; @@ -41,11 +40,14 @@ import org.apache.iceberg.flink.data.FlinkOrcWriter; import org.apache.iceberg.flink.data.FlinkParquetWriters; import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.DeleteSchemaUtil; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.parquet.ParquetValueWriters; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; public class FlinkAppenderFactory implements FileAppenderFactory, Serializable { private final Schema schema; @@ -56,6 +58,9 @@ public class FlinkAppenderFactory implements FileAppenderFactory, Seria private final Schema eqDeleteRowSchema; private final Schema posDeleteRowSchema; + private RowType eqDeleteFlinkSchema = null; + private RowType posDeleteFlinkSchema = null; + public FlinkAppenderFactory(Schema schema, RowType flinkSchema, Map props, PartitionSpec spec) { this(schema, flinkSchema, props, spec, null, schema, null); } @@ -72,6 +77,22 @@ public FlinkAppenderFactory(Schema schema, RowType flinkSchema, Map newAppender(OutputFile outputFile, FileFormat format) { MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); @@ -125,7 +146,7 @@ public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile outpu switch (format) { case AVRO: return Avro.writeDeletes(outputFile.encryptingOutputFile()) - .createWriterFunc(ignore -> new FlinkAvroWriter(flinkSchema)) + .createWriterFunc(ignore -> new FlinkAvroWriter(lazyEqDeleteFlinkSchema())) .withPartition(partition) .overwrite() .setAll(props) @@ -137,7 +158,7 @@ public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile outpu case PARQUET: return Parquet.writeDeletes(outputFile.encryptingOutputFile()) - .createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(flinkSchema, msgType)) + .createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(lazyEqDeleteFlinkSchema(), msgType)) .withPartition(partition) .overwrite() .setAll(props) @@ -164,7 +185,7 @@ public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile outp switch (format) { case AVRO: return Avro.writeDeletes(outputFile.encryptingOutputFile()) - .createWriterFunc(ignore -> new FlinkAvroWriter(FlinkSchemaUtil.convert(posDeleteRowSchema))) + .createWriterFunc(ignore -> new FlinkAvroWriter(lazyPosDeleteFlinkSchema())) .withPartition(partition) .overwrite() .setAll(props) @@ -174,8 +195,9 @@ public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile outp .buildPositionWriter(); case PARQUET: + RowType flinkPosDeleteSchema = FlinkSchemaUtil.convert(DeleteSchemaUtil.posDeleteSchema(posDeleteRowSchema)); return Parquet.writeDeletes(outputFile.encryptingOutputFile()) - .createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(flinkSchema, msgType)) + .createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(flinkPosDeleteSchema, msgType)) .withPartition(partition) .overwrite() .setAll(props) @@ -183,7 +205,7 @@ public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile outp .rowSchema(posDeleteRowSchema) .withSpec(spec) .withKeyMetadata(outputFile.keyMetadata()) - .buildPositionWriter(); + .buildPositionWriter(FlinkPosPathAccessor.INSTANCE); default: throw new UnsupportedOperationException("Cannot write unknown file format: " + format); @@ -192,4 +214,18 @@ public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile outp throw new UncheckedIOException(e); } } + + private static class FlinkPosPathAccessor implements ParquetValueWriters.PathPosAccessor { + private static final ParquetValueWriters.PathPosAccessor INSTANCE = new FlinkPosPathAccessor(); + + @Override + public StringData accessPath(CharSequence path) { + return StringData.fromString(path.toString()); + } + + @Override + public Long accessPos(Long pos) { + return pos; + } + } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index f2cd9e50d1e3..ee95fe0da76f 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -378,8 +378,8 @@ public EqualityDeleteWriter buildEqualityWriter() throws IOException { appenderBuilder.build(), FileFormat.PARQUET, location, spec, partition, keyMetadata, equalityFieldIds); } - - public PositionDeleteWriter buildPositionWriter() throws IOException { + public PositionDeleteWriter buildPositionWriter(ParquetValueWriters.PathPosAccessor accessor) + throws IOException { Preconditions.checkState(equalityFieldIds == null, "Cannot create position delete file using delete field ids"); meta("delete-type", "position"); @@ -391,7 +391,7 @@ public PositionDeleteWriter buildPositionWriter() throws IOException { appenderBuilder.createWriterFunc(parquetSchema -> { ParquetValueWriter writer = createWriterFunc.apply(parquetSchema); if (writer instanceof StructWriter) { - return new PositionDeleteStructWriter((StructWriter) writer); + return new PositionDeleteStructWriter((StructWriter) writer, accessor); } else { throw new UnsupportedOperationException("Cannot wrap writer for position deletes: " + writer.getClass()); } @@ -401,12 +401,17 @@ public PositionDeleteWriter buildPositionWriter() throws IOException { appenderBuilder.schema(DeleteSchemaUtil.pathPosSchema()); appenderBuilder.createWriterFunc(parquetSchema -> - new PositionDeleteStructWriter((StructWriter) GenericParquetWriter.buildWriter(parquetSchema))); + new PositionDeleteStructWriter((StructWriter) GenericParquetWriter.buildWriter(parquetSchema), + ParquetValueWriters.IdentifyPathPosAccessor.INSTANCE)); } return new PositionDeleteWriter<>( appenderBuilder.build(), FileFormat.PARQUET, location, spec, partition, keyMetadata); } + + public PositionDeleteWriter buildPositionWriter() throws IOException { + return buildPositionWriter(ParquetValueWriters.IdentifyPathPosAccessor.INSTANCE); + } } private static class ParquetWriteBuilder extends ParquetWriter.Builder> { diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java index c17a2ffa173d..f23dd8c763de 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java @@ -561,18 +561,41 @@ public Stream metrics() { } } + public interface PathPosAccessor { + PATH accessPath(CharSequence path); + + POS accessPos(Long pos); + } + + public static class IdentifyPathPosAccessor implements PathPosAccessor { + public static final PathPosAccessor INSTANCE = new IdentifyPathPosAccessor(); + + @Override + public CharSequence accessPath(CharSequence path) { + return path; + } + + @Override + public Long accessPos(Long pos) { + return pos; + } + } + public static class PositionDeleteStructWriter extends StructWriter> { - public PositionDeleteStructWriter(StructWriter replacedWriter) { + private final PathPosAccessor accessor; + + public PositionDeleteStructWriter(StructWriter replacedWriter, PathPosAccessor accessor) { super(Arrays.asList(replacedWriter.writers)); + this.accessor = accessor; } @Override protected Object get(PositionDelete delete, int index) { switch (index) { case 0: - return delete.path(); + return accessor.accessPath(delete.path()); case 1: - return delete.pos(); + return accessor.accessPos(delete.pos()); case 2: return delete.row(); } diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java index 025ff96e123b..0bc4a016801f 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java @@ -33,16 +33,21 @@ import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.DeleteSchemaUtil; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.parquet.ParquetValueWriters; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.data.SparkAvroWriter; import org.apache.iceberg.spark.data.SparkOrcWriter; import org.apache.iceberg.spark.data.SparkParquetWriters; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.UTF8String; class SparkAppenderFactory implements FileAppenderFactory { private final Map properties; @@ -53,6 +58,9 @@ class SparkAppenderFactory implements FileAppenderFactory { private final Schema eqDeleteRowSchema; private final Schema posDeleteRowSchema; + private StructType eqDeleteSparkType = null; + private StructType posDeleteSparkType = null; + SparkAppenderFactory(Map properties, Schema writeSchema, StructType dsSchema, PartitionSpec spec) { this(properties, writeSchema, dsSchema, spec, null, null, null); } @@ -68,6 +76,22 @@ class SparkAppenderFactory implements FileAppenderFactory { this.posDeleteRowSchema = posDeleteRowSchema; } + private StructType lazyEqDeleteSparkType() { + if (eqDeleteSparkType == null) { + Preconditions.checkNotNull(eqDeleteRowSchema, "Equality delete row schema shouldn't be null"); + this.eqDeleteSparkType = SparkSchemaUtil.convert(eqDeleteRowSchema); + } + return eqDeleteSparkType; + } + + private StructType lazyPosDeleteSparkType() { + if (posDeleteSparkType == null) { + Preconditions.checkNotNull(posDeleteRowSchema, "Position delete row schema shouldn't be null"); + this.posDeleteSparkType = SparkSchemaUtil.convert(posDeleteRowSchema); + } + return posDeleteSparkType; + } + @Override public FileAppender newAppender(OutputFile file, FileFormat fileFormat) { MetricsConfig metricsConfig = MetricsConfig.fromProperties(properties); @@ -120,7 +144,7 @@ public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile f switch (format) { case PARQUET: return Parquet.writeDeletes(file.encryptingOutputFile()) - .createWriterFunc(msgType -> SparkParquetWriters.buildWriter(dsSchema, msgType)) + .createWriterFunc(msgType -> SparkParquetWriters.buildWriter(lazyEqDeleteSparkType(), msgType)) .overwrite() .rowSchema(eqDeleteRowSchema) .withSpec(spec) @@ -131,7 +155,7 @@ public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile f case AVRO: return Avro.writeDeletes(file.encryptingOutputFile()) - .createWriterFunc(ignored -> new SparkAvroWriter(dsSchema)) + .createWriterFunc(ignored -> new SparkAvroWriter(lazyEqDeleteSparkType())) .overwrite() .rowSchema(eqDeleteRowSchema) .withSpec(spec) @@ -154,18 +178,20 @@ public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile try { switch (format) { case PARQUET: + StructType sparkPosDeleteSchema = + SparkSchemaUtil.convert(DeleteSchemaUtil.posDeleteSchema(posDeleteRowSchema)); return Parquet.writeDeletes(file.encryptingOutputFile()) - .createWriterFunc(msgType -> SparkParquetWriters.buildWriter(dsSchema, msgType)) + .createWriterFunc(msgType -> SparkParquetWriters.buildWriter(sparkPosDeleteSchema, msgType)) .overwrite() .rowSchema(posDeleteRowSchema) .withSpec(spec) .withPartition(partition) .withKeyMetadata(file.keyMetadata()) - .buildPositionWriter(); + .buildPositionWriter(SparkPosPathAccessor.INSTANCE); case AVRO: return Avro.writeDeletes(file.encryptingOutputFile()) - .createWriterFunc(ignored -> new SparkAvroWriter(dsSchema)) + .createWriterFunc(ignored -> new SparkAvroWriter(lazyPosDeleteSparkType())) .overwrite() .rowSchema(posDeleteRowSchema) .withSpec(spec) @@ -181,4 +207,18 @@ public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile throw new UncheckedIOException("Failed to create new equality delete writer", e); } } + + private static class SparkPosPathAccessor implements ParquetValueWriters.PathPosAccessor { + private static final SparkPosPathAccessor INSTANCE = new SparkPosPathAccessor(); + + @Override + public UTF8String accessPath(CharSequence path) { + return UTF8String.fromString(path.toString()); + } + + @Override + public Long accessPos(Long pos) { + return pos; + } + } } From 8006b63bb4f2539d4afc7e0c4ede4d56b2295979 Mon Sep 17 00:00:00 2001 From: huzheng Date: Tue, 1 Dec 2020 11:43:42 +0800 Subject: [PATCH 4/8] Address comment --- .../iceberg/data/GenericAppenderFactory.java | 19 ++- .../org/apache/iceberg/TestSplitScan.java | 4 +- .../iceberg/data/GenericAppenderHelper.java | 10 +- .../apache/iceberg/data/TestLocalScan.java | 5 +- .../iceberg/io/TestAppenderFactory.java | 112 ++++++++++++++---- .../parquet/TestParquetMergingMetrics.java | 5 +- .../flink/sink/FlinkAppenderFactory.java | 12 +- .../spark/source/SparkAppenderFactory.java | 15 ++- .../TestSparkParquetMergingMetrics.java | 8 +- .../spark/source/TestSparkReadProjection.java | 4 +- .../spark/source/TestFilteredScan.java | 4 +- 11 files changed, 142 insertions(+), 56 deletions(-) rename {core => data}/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java (64%) diff --git a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java index cc99af4952b9..8f27c0ccff94 100644 --- a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java +++ b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java @@ -39,6 +39,7 @@ import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; /** @@ -53,8 +54,12 @@ public class GenericAppenderFactory implements FileAppenderFactory { private final Schema posDeleteRowSchema; private final Map config = Maps.newHashMap(); + public GenericAppenderFactory(Schema schema) { + this(schema, PartitionSpec.unpartitioned(), null, null, null); + } + public GenericAppenderFactory(Schema schema, PartitionSpec spec) { - this(schema, spec, null, schema, null); + this(schema, spec, null, null, null); } public GenericAppenderFactory(Schema schema, PartitionSpec spec, @@ -109,7 +114,7 @@ public FileAppender newAppender(OutputFile outputFile, FileFormat fileFo .build(); default: - throw new UnsupportedOperationException("Cannot write format: " + fileFormat); + throw new UnsupportedOperationException("Cannot write unknown file format: " + fileFormat); } } catch (IOException e) { throw new UncheckedIOException(e); @@ -127,6 +132,11 @@ public org.apache.iceberg.io.DataWriter newDataWriter(EncryptedOutputFil @Override public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile file, FileFormat format, StructLike partition) { + Preconditions.checkState(equalityFieldIds != null && equalityFieldIds.length > 0, + "Equality field ids shouldn't be null when creating equality-delete writer"); + Preconditions.checkNotNull(eqDeleteRowSchema, + "Equality delete row schema shouldn't be null when creating equality-delete writer"); + MetricsConfig metricsConfig = MetricsConfig.fromProperties(config); try { switch (format) { @@ -156,7 +166,8 @@ public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile file, .buildEqualityWriter(); default: - throw new UnsupportedOperationException("Cannot write unknown file format: " + format); + throw new UnsupportedOperationException( + "Cannot write equality-deletes for unsupported file format: " + format); } } catch (IOException e) { throw new UncheckedIOException(e); @@ -193,7 +204,7 @@ public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile file, .buildPositionWriter(); default: - throw new UnsupportedOperationException("Cannot write unknown file format: " + format); + throw new UnsupportedOperationException("Cannot write pos-deletes for unsupported file format: " + format); } } catch (IOException e) { throw new UncheckedIOException(e); diff --git a/data/src/test/java/org/apache/iceberg/TestSplitScan.java b/data/src/test/java/org/apache/iceberg/TestSplitScan.java index 920b80f9f37a..2f191f6160d7 100644 --- a/data/src/test/java/org/apache/iceberg/TestSplitScan.java +++ b/data/src/test/java/org/apache/iceberg/TestSplitScan.java @@ -116,8 +116,8 @@ private File writeToFile(List records, FileFormat fileFormat) throws IOE File file = temp.newFile(); Assert.assertTrue(file.delete()); - GenericAppenderFactory factory = new GenericAppenderFactory(SCHEMA, PartitionSpec.unpartitioned()) - .set(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, String.valueOf(SPLIT_SIZE)); + GenericAppenderFactory factory = new GenericAppenderFactory(SCHEMA).set( + TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, String.valueOf(SPLIT_SIZE)); try (FileAppender appender = factory.newAppender(Files.localOutput(file), fileFormat)) { appender.addAll(records); } diff --git a/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java b/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java index ddf8ce4784bb..c32be08263a9 100644 --- a/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java +++ b/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java @@ -27,7 +27,6 @@ import org.apache.iceberg.DataFiles; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Files; -import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.io.FileAppender; @@ -74,12 +73,13 @@ public DataFile writeFile(StructLike partition, List records) throws IOE Preconditions.checkNotNull(table, "table not set"); File file = tmp.newFile(); Assert.assertTrue(file.delete()); - return appendToLocalFile(table, file, fileFormat, partition, records, table.spec()); + return appendToLocalFile(table, file, fileFormat, partition, records); } - private static DataFile appendToLocalFile(Table table, File file, FileFormat format, StructLike partition, - List records, PartitionSpec spec) throws IOException { - FileAppender appender = new GenericAppenderFactory(table.schema(), spec).newAppender( + private static DataFile appendToLocalFile( + Table table, File file, FileFormat format, StructLike partition, List records) + throws IOException { + FileAppender appender = new GenericAppenderFactory(table.schema()).newAppender( Files.localOutput(file), format); try (FileAppender fileAppender = appender) { fileAppender.addAll(records); diff --git a/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java b/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java index efa37bbae066..1cc74ba029ff 100644 --- a/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java +++ b/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java @@ -449,9 +449,8 @@ private DataFile writeFile(String location, String filename, Schema schema, List FileFormat fileFormat = FileFormat.fromFileName(filename); Preconditions.checkNotNull(fileFormat, "Cannot determine format for file: %s", filename); - FileAppender fileAppender = - new GenericAppenderFactory(schema, PartitionSpec.unpartitioned()) - .newAppender(fromPath(path, CONF), fileFormat); + FileAppender fileAppender = new GenericAppenderFactory(schema).newAppender( + fromPath(path, CONF), fileFormat); try (FileAppender appender = fileAppender) { appender.addAll(records); } diff --git a/core/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java b/data/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java similarity index 64% rename from core/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java rename to data/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java index 151195ec318c..93689d18c981 100644 --- a/core/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java +++ b/data/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java @@ -23,19 +23,26 @@ import java.io.IOException; import java.util.List; import java.util.Locale; +import java.util.Set; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.TableTestBase; +import org.apache.iceberg.avro.Avro; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.avro.DataReader; +import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.StructLikeSet; import org.junit.Assert; @@ -52,6 +59,7 @@ public abstract class TestAppenderFactory extends TableTestBase { private final boolean partitioned; private PartitionKey partition = null; + private OutputFileFactory fileFactory = null; @Parameterized.Parameters(name = "FileFormat={0}, Partitioned={1}") public static Object[] parameters() { @@ -83,6 +91,8 @@ public void setupTable() throws Exception { this.table = create(SCHEMA, PartitionSpec.unpartitioned()); } this.partition = createPartitionKey(); + this.fileFactory = new OutputFileFactory(table.spec(), format, table.locationProvider(), table.io(), + table.encryption(), 1, 1); table.updateProperties() .defaultFormat(format) @@ -99,17 +109,12 @@ protected abstract FileAppenderFactory createAppenderFactory(List eq protected abstract StructLikeSet actualRowSet(String... columns) throws IOException; - private OutputFileFactory createFileFactory() { - return new OutputFileFactory(table.spec(), format, table.locationProvider(), table.io(), - table.encryption(), 1, 1); - } - private PartitionKey createPartitionKey() { if (table.spec().isUnpartitioned()) { return null; } - Record record = GenericRecord.create(table.spec().schema()).copy(ImmutableMap.of("data", "aaa")); + Record record = GenericRecord.create(table.schema()).copy(ImmutableMap.of("data", "aaa")); PartitionKey partitionKey = new PartitionKey(table.spec(), table.schema()); partitionKey.partition(record); @@ -117,6 +122,14 @@ private PartitionKey createPartitionKey() { return partitionKey; } + private EncryptedOutputFile createEncryptedOutputFile() { + if (partition == null) { + return fileFactory.newOutputFile(); + } else { + return fileFactory.newOutputFile(partition); + } + } + private List testRowSet() { return Lists.newArrayList( createRow(1, "aaa"), @@ -127,9 +140,8 @@ private List testRowSet() { ); } - private DataFile prepareDataFile(List rowSet, FileAppenderFactory appenderFactory, - OutputFileFactory outputFileFactory) throws IOException { - DataWriter writer = appenderFactory.newDataWriter(outputFileFactory.newOutputFile(), format, partition); + private DataFile prepareDataFile(List rowSet, FileAppenderFactory appenderFactory) throws IOException { + DataWriter writer = appenderFactory.newDataWriter(createEncryptedOutputFile(), format, partition); try (DataWriter closeableWriter = writer) { for (T row : rowSet) { closeableWriter.add(row); @@ -142,10 +154,9 @@ private DataFile prepareDataFile(List rowSet, FileAppenderFactory appender @Test public void testDataWriter() throws IOException { FileAppenderFactory appenderFactory = createAppenderFactory(null, null, null); - OutputFileFactory outputFileFactory = createFileFactory(); List rowSet = testRowSet(); - DataFile dataFile = prepareDataFile(rowSet, appenderFactory, outputFileFactory); + DataFile dataFile = prepareDataFile(rowSet, appenderFactory); table.newRowDelta() .addRows(dataFile) @@ -157,28 +168,39 @@ public void testDataWriter() throws IOException { @Test public void testEqDeleteWriter() throws IOException { List equalityFieldIds = Lists.newArrayList(table.schema().findField("id").fieldId()); - FileAppenderFactory appenderFactory = createAppenderFactory(equalityFieldIds, - table.schema().select("id"), null); - OutputFileFactory outputFileFactory = createFileFactory(); + Schema eqDeleteRowSchema = table.schema().select("id"); + FileAppenderFactory appenderFactory = createAppenderFactory(equalityFieldIds, eqDeleteRowSchema, null); List rowSet = testRowSet(); - DataFile dataFile = prepareDataFile(rowSet, appenderFactory, outputFileFactory); + DataFile dataFile = prepareDataFile(rowSet, appenderFactory); table.newRowDelta() .addRows(dataFile) .commit(); + // The equality field is 'id'. No matter what the value of 'data' field is, we should delete the 1th, 3th, 5th + // rows. List deletes = Lists.newArrayList( createRow(1, "aaa"), createRow(3, "bbb"), createRow(5, "ccc") ); - EqualityDeleteWriter eqDeleteWriter = - appenderFactory.newEqDeleteWriter(outputFileFactory.newOutputFile(), format, partition); + EncryptedOutputFile out = createEncryptedOutputFile(); + EqualityDeleteWriter eqDeleteWriter = appenderFactory.newEqDeleteWriter(out, format, partition); try (EqualityDeleteWriter closeableWriter = eqDeleteWriter) { closeableWriter.deleteAll(deletes); } + // Check that the delete equality file has the expected equality deletes. + GenericRecord gRecord = GenericRecord.create(eqDeleteRowSchema); + Set expectedDeletes = Sets.newHashSet( + gRecord.copy("id", 1), + gRecord.copy("id", 3), + gRecord.copy("id", 5) + ); + Assert.assertEquals(expectedDeletes, + Sets.newHashSet(createReader(eqDeleteRowSchema, out.encryptingOutputFile().toInputFile()))); + table.newRowDelta() .addDeletes(eqDeleteWriter.toDeleteFile()) .commit(); @@ -194,10 +216,9 @@ public void testEqDeleteWriter() throws IOException { public void testPosDeleteWriter() throws IOException { // Initialize FileAppenderFactory without pos-delete row schema. FileAppenderFactory appenderFactory = createAppenderFactory(null, null, null); - OutputFileFactory outputFileFactory = createFileFactory(); List rowSet = testRowSet(); - DataFile dataFile = prepareDataFile(rowSet, appenderFactory, outputFileFactory); + DataFile dataFile = prepareDataFile(rowSet, appenderFactory); List> deletes = Lists.newArrayList( Pair.of(dataFile.path(), 0L), @@ -205,14 +226,25 @@ public void testPosDeleteWriter() throws IOException { Pair.of(dataFile.path(), 4L) ); - PositionDeleteWriter eqDeleteWriter = - appenderFactory.newPosDeleteWriter(outputFileFactory.newOutputFile(), format, partition); + EncryptedOutputFile out = createEncryptedOutputFile(); + PositionDeleteWriter eqDeleteWriter = appenderFactory.newPosDeleteWriter(out, format, partition); try (PositionDeleteWriter closeableWriter = eqDeleteWriter) { for (Pair delete : deletes) { closeableWriter.delete(delete.first(), delete.second()); } } + // Check that the pos delete file has the expected pos deletes. + Schema pathPosSchema = DeleteSchemaUtil.pathPosSchema(); + GenericRecord gRecord = GenericRecord.create(pathPosSchema); + Set expectedDeletes = Sets.newHashSet( + gRecord.copy("file_path", dataFile.path(), "pos", 0L), + gRecord.copy("file_path", dataFile.path(), "pos", 2L), + gRecord.copy("file_path", dataFile.path(), "pos", 4L) + ); + Assert.assertEquals(expectedDeletes, + Sets.newHashSet(createReader(pathPosSchema, out.encryptingOutputFile().toInputFile()))); + table.newRowDelta() .addRows(dataFile) .addDeletes(eqDeleteWriter.toDeleteFile()) @@ -230,10 +262,9 @@ public void testPosDeleteWriter() throws IOException { @Test public void testPosDeleteWriterWithRowSchema() throws IOException { FileAppenderFactory appenderFactory = createAppenderFactory(null, null, table.schema()); - OutputFileFactory outputFileFactory = createFileFactory(); List rowSet = testRowSet(); - DataFile dataFile = prepareDataFile(rowSet, appenderFactory, outputFileFactory); + DataFile dataFile = prepareDataFile(rowSet, appenderFactory); List> deletes = Lists.newArrayList( new PositionDelete().set(dataFile.path(), 0, rowSet.get(0)), @@ -241,14 +272,26 @@ public void testPosDeleteWriterWithRowSchema() throws IOException { new PositionDelete().set(dataFile.path(), 4, rowSet.get(4)) ); - PositionDeleteWriter eqDeleteWriter = - appenderFactory.newPosDeleteWriter(outputFileFactory.newOutputFile(), format, partition); + EncryptedOutputFile out = createEncryptedOutputFile(); + PositionDeleteWriter eqDeleteWriter = appenderFactory.newPosDeleteWriter(out, format, partition); try (PositionDeleteWriter closeableWriter = eqDeleteWriter) { for (PositionDelete delete : deletes) { closeableWriter.delete(delete.path(), delete.pos(), delete.row()); } } + // Check that the pos delete file has the expected pos deletes. + Schema pathPosRowSchema = DeleteSchemaUtil.posDeleteSchema(table.schema()); + GenericRecord gRecord = GenericRecord.create(pathPosRowSchema); + GenericRecord rowRecord = GenericRecord.create(table.schema()); + Set expectedDeletes = Sets.newHashSet( + gRecord.copy("file_path", dataFile.path(), "pos", 0L, "row", rowRecord.copy("id", 1, "data", "aaa")), + gRecord.copy("file_path", dataFile.path(), "pos", 2L, "row", rowRecord.copy("id", 3, "data", "ccc")), + gRecord.copy("file_path", dataFile.path(), "pos", 4L, "row", rowRecord.copy("id", 5, "data", "eee")) + ); + Assert.assertEquals(expectedDeletes, + Sets.newHashSet(createReader(pathPosRowSchema, out.encryptingOutputFile().toInputFile()))); + table.newRowDelta() .addRows(dataFile) .addDeletes(eqDeleteWriter.toDeleteFile()) @@ -262,4 +305,23 @@ public void testPosDeleteWriterWithRowSchema() throws IOException { ); Assert.assertEquals("Should have the expected records", expectedRowSet(expected), actualRowSet("*")); } + + private CloseableIterable createReader(Schema schema, InputFile inputFile) { + switch (format) { + case PARQUET: + return Parquet.read(inputFile) + .project(schema) + .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema)) + .build(); + + case AVRO: + return Avro.read(inputFile) + .project(schema) + .createReaderFunc(DataReader::create) + .build(); + + default: + throw new UnsupportedOperationException("Unsupported file format: " + format); + } + } } diff --git a/data/src/test/java/org/apache/iceberg/parquet/TestParquetMergingMetrics.java b/data/src/test/java/org/apache/iceberg/parquet/TestParquetMergingMetrics.java index 9297ee092794..03118e3a65f2 100644 --- a/data/src/test/java/org/apache/iceberg/parquet/TestParquetMergingMetrics.java +++ b/data/src/test/java/org/apache/iceberg/parquet/TestParquetMergingMetrics.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.util.List; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.TestMergingMetrics; import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.Record; @@ -32,8 +31,8 @@ public class TestParquetMergingMetrics extends TestMergingMetrics { @Override protected FileAppender writeAndGetAppender(List records) throws IOException { - FileAppender appender = new GenericAppenderFactory(SCHEMA, PartitionSpec.unpartitioned()) - .newAppender(org.apache.iceberg.Files.localOutput(temp.newFile()), FileFormat.PARQUET); + FileAppender appender = new GenericAppenderFactory(SCHEMA).newAppender( + org.apache.iceberg.Files.localOutput(temp.newFile()), FileFormat.PARQUET); try (FileAppender fileAppender = appender) { records.forEach(fileAppender::add); } diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java index 158086be2b75..7c67d2f651c8 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java @@ -62,7 +62,7 @@ public class FlinkAppenderFactory implements FileAppenderFactory, Seria private RowType posDeleteFlinkSchema = null; public FlinkAppenderFactory(Schema schema, RowType flinkSchema, Map props, PartitionSpec spec) { - this(schema, flinkSchema, props, spec, null, schema, null); + this(schema, flinkSchema, props, spec, null, null, null); } public FlinkAppenderFactory(Schema schema, RowType flinkSchema, Map props, @@ -141,6 +141,11 @@ public DataWriter newDataWriter(EncryptedOutputFile file, FileFormat fo @Override public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile outputFile, FileFormat format, StructLike partition) { + Preconditions.checkState(equalityFieldIds != null && equalityFieldIds.length > 0, + "Equality field ids shouldn't be null when creating equality-delete writer"); + Preconditions.checkNotNull(eqDeleteRowSchema, + "Equality delete row schema shouldn't be null when creating equality-delete writer"); + MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); try { switch (format) { @@ -170,7 +175,8 @@ public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile outpu .buildEqualityWriter(); default: - throw new UnsupportedOperationException("Cannot write unknown file format: " + format); + throw new UnsupportedOperationException( + "Cannot write equality-deletes for unsupported file format: " + format); } } catch (IOException e) { throw new UncheckedIOException(e); @@ -208,7 +214,7 @@ public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile outp .buildPositionWriter(FlinkPosPathAccessor.INSTANCE); default: - throw new UnsupportedOperationException("Cannot write unknown file format: " + format); + throw new UnsupportedOperationException("Cannot write pos-deletes for unsupported file format: " + format); } } catch (IOException e) { throw new UncheckedIOException(e); diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java index 0bc4a016801f..175e7c65c8e7 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java @@ -61,6 +61,10 @@ class SparkAppenderFactory implements FileAppenderFactory { private StructType eqDeleteSparkType = null; private StructType posDeleteSparkType = null; + SparkAppenderFactory(Map properties, Schema writeSchema, StructType dsSchema) { + this(properties, writeSchema, dsSchema, PartitionSpec.unpartitioned(), null, null, null); + } + SparkAppenderFactory(Map properties, Schema writeSchema, StructType dsSchema, PartitionSpec spec) { this(properties, writeSchema, dsSchema, spec, null, null, null); } @@ -140,6 +144,11 @@ public DataWriter newDataWriter(EncryptedOutputFile file, FileForma @Override public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile file, FileFormat format, StructLike partition) { + Preconditions.checkState(equalityFieldIds != null && equalityFieldIds.length > 0, + "Equality field ids shouldn't be null when creating equality-delete writer"); + Preconditions.checkNotNull(eqDeleteRowSchema, + "Equality delete row schema shouldn't be null when creating equality-delete writer"); + try { switch (format) { case PARQUET: @@ -165,7 +174,8 @@ public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile f .buildEqualityWriter(); default: - throw new UnsupportedOperationException("Cannot write unsupported format: " + format); + throw new UnsupportedOperationException( + "Cannot write equality-deletes for unsupported file format: " + format); } } catch (IOException e) { throw new UncheckedIOException("Failed to create new equality delete writer", e); @@ -200,7 +210,8 @@ public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile .buildPositionWriter(); default: - throw new UnsupportedOperationException("Cannot write unsupported format: " + format); + throw new UnsupportedOperationException( + "Cannot write pos-deletes for unsupported file format: " + format); } } catch (IOException e) { diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkParquetMergingMetrics.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkParquetMergingMetrics.java index f1215056243b..e49a18be5338 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkParquetMergingMetrics.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkParquetMergingMetrics.java @@ -20,13 +20,12 @@ package org.apache.iceberg.spark.source; import java.io.IOException; +import java.util.HashMap; import java.util.List; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.TestMergingMetrics; import org.apache.iceberg.data.Record; import org.apache.iceberg.io.FileAppender; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.spark.sql.catalyst.InternalRow; @@ -35,9 +34,8 @@ public class TestSparkParquetMergingMetrics extends TestMergingMetrics writeAndGetAppender(List records) throws IOException { FileAppender appender = - new SparkAppenderFactory(ImmutableMap.of(), SCHEMA, SparkSchemaUtil.convert(SCHEMA), - PartitionSpec.unpartitioned()) - .newAppender(org.apache.iceberg.Files.localOutput(temp.newFile()), FileFormat.PARQUET); + new SparkAppenderFactory(new HashMap<>(), SCHEMA, SparkSchemaUtil.convert(SCHEMA)).newAppender( + org.apache.iceberg.Files.localOutput(temp.newFile()), FileFormat.PARQUET); try (FileAppender fileAppender = appender) { records.stream().map(r -> new StructInternalRow(SCHEMA.asStruct()).setStruct(r)).forEach(fileAppender::add); } diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java index 804ad9c94680..b3c8e3134490 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java @@ -106,8 +106,8 @@ protected Record writeAndRead(String desc, Schema writeSchema, Schema readSchema // When tables are created, the column ids are reassigned. Schema tableSchema = table.schema(); - try (FileAppender writer = new GenericAppenderFactory(tableSchema, table.spec()) - .newAppender(localOutput(testFile), format)) { + try (FileAppender writer = new GenericAppenderFactory(tableSchema).newAppender( + localOutput(testFile), format)) { writer.add(record); } diff --git a/spark2/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java index 3d8ffc555dd0..60d044aed801 100644 --- a/spark2/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java +++ b/spark2/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java @@ -186,8 +186,8 @@ public void writeUnpartitionedTable() throws IOException { // create records using the table's schema this.records = testRecords(tableSchema); - try (FileAppender writer = new GenericAppenderFactory(tableSchema, table.spec()) - .newAppender(localOutput(testFile), fileFormat)) { + try (FileAppender writer = new GenericAppenderFactory(tableSchema).newAppender( + localOutput(testFile), fileFormat)) { writer.addAll(records); } From b26898d4746294bdec0d0bfce18060d864171c61 Mon Sep 17 00:00:00 2001 From: huzheng Date: Tue, 1 Dec 2020 15:05:25 +0800 Subject: [PATCH 5/8] Minor changes --- .../iceberg/TestGenericAppenderFactory.java | 15 --------- .../iceberg/io/TestAppenderFactory.java | 9 +++++- .../flink/sink/FlinkAppenderFactory.java | 18 ++--------- .../flink/sink/TestFlinkAppenderFactory.java | 13 -------- .../org/apache/iceberg/parquet/Parquet.java | 16 +++++----- .../iceberg/parquet/ParquetValueWriters.java | 31 ++++--------------- .../spark/source/SparkAppenderFactory.java | 18 ++--------- .../source/TestSparkAppenderFactory.java | 13 -------- .../spark/source/TestFilteredScan.java | 4 +-- 9 files changed, 29 insertions(+), 108 deletions(-) diff --git a/data/src/test/java/org/apache/iceberg/TestGenericAppenderFactory.java b/data/src/test/java/org/apache/iceberg/TestGenericAppenderFactory.java index 846b896fa543..60dcd633eda4 100644 --- a/data/src/test/java/org/apache/iceberg/TestGenericAppenderFactory.java +++ b/data/src/test/java/org/apache/iceberg/TestGenericAppenderFactory.java @@ -19,22 +19,16 @@ package org.apache.iceberg; -import java.io.IOException; import java.util.List; import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.GenericRecord; -import org.apache.iceberg.data.IcebergGenerics; import org.apache.iceberg.data.Record; -import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.TestAppenderFactory; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.util.ArrayUtil; import org.apache.iceberg.util.StructLikeSet; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -@RunWith(Parameterized.class) public class TestGenericAppenderFactory extends TestAppenderFactory { private final GenericRecord gRecord; @@ -63,13 +57,4 @@ protected StructLikeSet expectedRowSet(Iterable records) { records.forEach(set::add); return set; } - - @Override - protected StructLikeSet actualRowSet(String... columns) throws IOException { - StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); - try (CloseableIterable reader = IcebergGenerics.read(table).select(columns).build()) { - reader.forEach(set::add); - } - return set; - } } diff --git a/data/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java b/data/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java index 93689d18c981..44ac5190eeef 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java +++ b/data/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java @@ -32,6 +32,7 @@ import org.apache.iceberg.TableTestBase; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.IcebergGenerics; import org.apache.iceberg.data.Record; import org.apache.iceberg.data.avro.DataReader; import org.apache.iceberg.data.parquet.GenericParquetReaders; @@ -107,7 +108,13 @@ protected abstract FileAppenderFactory createAppenderFactory(List eq protected abstract StructLikeSet expectedRowSet(Iterable records) throws IOException; - protected abstract StructLikeSet actualRowSet(String... columns) throws IOException; + private StructLikeSet actualRowSet(String... columns) throws IOException { + StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); + try (CloseableIterable reader = IcebergGenerics.read(table).select(columns).build()) { + reader.forEach(set::add); + } + return set; + } private PartitionKey createPartitionKey() { if (table.spec().isUnpartitioned()) { diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java index 7c67d2f651c8..cda55ab2f14c 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java @@ -46,7 +46,6 @@ import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; -import org.apache.iceberg.parquet.ParquetValueWriters; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; public class FlinkAppenderFactory implements FileAppenderFactory, Serializable { @@ -211,7 +210,8 @@ public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile outp .rowSchema(posDeleteRowSchema) .withSpec(spec) .withKeyMetadata(outputFile.keyMetadata()) - .buildPositionWriter(FlinkPosPathAccessor.INSTANCE); + .transformPaths(path -> StringData.fromString(path.toString())) + .buildPositionWriter(); default: throw new UnsupportedOperationException("Cannot write pos-deletes for unsupported file format: " + format); @@ -220,18 +220,4 @@ public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile outp throw new UncheckedIOException(e); } } - - private static class FlinkPosPathAccessor implements ParquetValueWriters.PathPosAccessor { - private static final ParquetValueWriters.PathPosAccessor INSTANCE = new FlinkPosPathAccessor(); - - @Override - public StringData accessPath(CharSequence path) { - return StringData.fromString(path.toString()); - } - - @Override - public Long accessPos(Long pos) { - return pos; - } - } } diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkAppenderFactory.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkAppenderFactory.java index e69b580f795e..8d7fa86eac50 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkAppenderFactory.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkAppenderFactory.java @@ -19,17 +19,13 @@ package org.apache.iceberg.flink.sink; -import java.io.IOException; import java.util.List; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.iceberg.Schema; -import org.apache.iceberg.data.IcebergGenerics; -import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.RowDataWrapper; import org.apache.iceberg.flink.SimpleDataUtil; -import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.TestAppenderFactory; import org.apache.iceberg.util.ArrayUtil; @@ -66,13 +62,4 @@ protected StructLikeSet expectedRowSet(Iterable rows) { } return set; } - - @Override - protected StructLikeSet actualRowSet(String... columns) throws IOException { - StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); - try (CloseableIterable reader = IcebergGenerics.read(table).select(columns).build()) { - reader.forEach(set::add); - } - return set; - } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index ee95fe0da76f..4838e223892b 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -280,6 +280,7 @@ public static class DeleteWriteBuilder { private StructLike partition = null; private EncryptionKeyMetadata keyMetadata = null; private int[] equalityFieldIds = null; + private Function pathTransformFunc = t -> t; private DeleteWriteBuilder(OutputFile file) { this.appenderBuilder = write(file); @@ -359,6 +360,11 @@ public DeleteWriteBuilder equalityFieldIds(int... fieldIds) { return this; } + public DeleteWriteBuilder transformPaths(Function newPathTransformFunc) { + this.pathTransformFunc = newPathTransformFunc; + return this; + } + public EqualityDeleteWriter buildEqualityWriter() throws IOException { Preconditions.checkState(rowSchema != null, "Cannot create equality delete file without a schema`"); Preconditions.checkState(equalityFieldIds != null, "Cannot create equality delete file without delete field ids"); @@ -378,7 +384,7 @@ public EqualityDeleteWriter buildEqualityWriter() throws IOException { appenderBuilder.build(), FileFormat.PARQUET, location, spec, partition, keyMetadata, equalityFieldIds); } - public PositionDeleteWriter buildPositionWriter(ParquetValueWriters.PathPosAccessor accessor) + public PositionDeleteWriter buildPositionWriter() throws IOException { Preconditions.checkState(equalityFieldIds == null, "Cannot create position delete file using delete field ids"); @@ -391,7 +397,7 @@ public PositionDeleteWriter buildPositionWriter(ParquetValueWriters.PathP appenderBuilder.createWriterFunc(parquetSchema -> { ParquetValueWriter writer = createWriterFunc.apply(parquetSchema); if (writer instanceof StructWriter) { - return new PositionDeleteStructWriter((StructWriter) writer, accessor); + return new PositionDeleteStructWriter((StructWriter) writer, pathTransformFunc); } else { throw new UnsupportedOperationException("Cannot wrap writer for position deletes: " + writer.getClass()); } @@ -402,16 +408,12 @@ public PositionDeleteWriter buildPositionWriter(ParquetValueWriters.PathP appenderBuilder.createWriterFunc(parquetSchema -> new PositionDeleteStructWriter((StructWriter) GenericParquetWriter.buildWriter(parquetSchema), - ParquetValueWriters.IdentifyPathPosAccessor.INSTANCE)); + t -> t)); } return new PositionDeleteWriter<>( appenderBuilder.build(), FileFormat.PARQUET, location, spec, partition, keyMetadata); } - - public PositionDeleteWriter buildPositionWriter() throws IOException { - return buildPositionWriter(ParquetValueWriters.IdentifyPathPosAccessor.INSTANCE); - } } private static class ParquetWriteBuilder extends ParquetWriter.Builder> { diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java index f23dd8c763de..7d77996e9968 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java @@ -27,6 +27,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.function.Function; import java.util.stream.Stream; import org.apache.avro.util.Utf8; import org.apache.iceberg.FieldMetrics; @@ -561,41 +562,21 @@ public Stream metrics() { } } - public interface PathPosAccessor { - PATH accessPath(CharSequence path); - - POS accessPos(Long pos); - } - - public static class IdentifyPathPosAccessor implements PathPosAccessor { - public static final PathPosAccessor INSTANCE = new IdentifyPathPosAccessor(); - - @Override - public CharSequence accessPath(CharSequence path) { - return path; - } - - @Override - public Long accessPos(Long pos) { - return pos; - } - } - public static class PositionDeleteStructWriter extends StructWriter> { - private final PathPosAccessor accessor; + private final Function pathTransformFunc; - public PositionDeleteStructWriter(StructWriter replacedWriter, PathPosAccessor accessor) { + public PositionDeleteStructWriter(StructWriter replacedWriter, Function pathTransformFunc) { super(Arrays.asList(replacedWriter.writers)); - this.accessor = accessor; + this.pathTransformFunc = pathTransformFunc; } @Override protected Object get(PositionDelete delete, int index) { switch (index) { case 0: - return accessor.accessPath(delete.path()); + return pathTransformFunc.apply(delete.path()); case 1: - return accessor.accessPos(delete.pos()); + return delete.pos(); case 2: return delete.row(); } diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java index 175e7c65c8e7..39f1e17f0d53 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java @@ -39,7 +39,6 @@ import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; -import org.apache.iceberg.parquet.ParquetValueWriters; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.data.SparkAvroWriter; @@ -197,7 +196,8 @@ public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile .withSpec(spec) .withPartition(partition) .withKeyMetadata(file.keyMetadata()) - .buildPositionWriter(SparkPosPathAccessor.INSTANCE); + .transformPaths(path -> UTF8String.fromString(path.toString())) + .buildPositionWriter(); case AVRO: return Avro.writeDeletes(file.encryptingOutputFile()) @@ -218,18 +218,4 @@ public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile throw new UncheckedIOException("Failed to create new equality delete writer", e); } } - - private static class SparkPosPathAccessor implements ParquetValueWriters.PathPosAccessor { - private static final SparkPosPathAccessor INSTANCE = new SparkPosPathAccessor(); - - @Override - public UTF8String accessPath(CharSequence path) { - return UTF8String.fromString(path.toString()); - } - - @Override - public Long accessPos(Long pos) { - return pos; - } - } } diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java index ae2b04779280..cac021d57feb 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java @@ -19,12 +19,8 @@ package org.apache.iceberg.spark.source; -import java.io.IOException; import java.util.List; import org.apache.iceberg.Schema; -import org.apache.iceberg.data.IcebergGenerics; -import org.apache.iceberg.data.Record; -import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.TestAppenderFactory; import org.apache.iceberg.spark.SparkSchemaUtil; @@ -69,13 +65,4 @@ protected StructLikeSet expectedRowSet(Iterable rows) { } return set; } - - @Override - protected StructLikeSet actualRowSet(String... columns) throws IOException { - StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); - try (CloseableIterable reader = IcebergGenerics.read(table).select(columns).build()) { - reader.forEach(set::add); - } - return set; - } } diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java index c3bf1606676a..5824dca7a4e1 100644 --- a/spark3/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java @@ -183,8 +183,8 @@ public void writeUnpartitionedTable() throws IOException { this.records = testRecords(tableSchema); - try (FileAppender writer = new GenericAppenderFactory(tableSchema, table.spec()) - .newAppender(localOutput(testFile), fileFormat)) { + try (FileAppender writer = new GenericAppenderFactory(tableSchema).newAppender( + localOutput(testFile), fileFormat)) { writer.addAll(records); } From 8e423fb34d02da49e78a9750b6204f6ce23cead2 Mon Sep 17 00:00:00 2001 From: huzheng Date: Tue, 1 Dec 2020 15:59:17 +0800 Subject: [PATCH 6/8] Align the exception messages --- .../java/org/apache/iceberg/data/GenericAppenderFactory.java | 2 +- .../org/apache/iceberg/flink/sink/FlinkAppenderFactory.java | 2 +- .../org/apache/iceberg/spark/source/SparkAppenderFactory.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java index 8f27c0ccff94..b9b77fd571b5 100644 --- a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java +++ b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java @@ -133,7 +133,7 @@ public org.apache.iceberg.io.DataWriter newDataWriter(EncryptedOutputFil public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile file, FileFormat format, StructLike partition) { Preconditions.checkState(equalityFieldIds != null && equalityFieldIds.length > 0, - "Equality field ids shouldn't be null when creating equality-delete writer"); + "Equality field ids shouldn't be null or empty when creating equality-delete writer"); Preconditions.checkNotNull(eqDeleteRowSchema, "Equality delete row schema shouldn't be null when creating equality-delete writer"); diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java index cda55ab2f14c..1ddf0929fbdc 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java @@ -141,7 +141,7 @@ public DataWriter newDataWriter(EncryptedOutputFile file, FileFormat fo public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile outputFile, FileFormat format, StructLike partition) { Preconditions.checkState(equalityFieldIds != null && equalityFieldIds.length > 0, - "Equality field ids shouldn't be null when creating equality-delete writer"); + "Equality field ids shouldn't be null or empty when creating equality-delete writer"); Preconditions.checkNotNull(eqDeleteRowSchema, "Equality delete row schema shouldn't be null when creating equality-delete writer"); diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java index 39f1e17f0d53..3fe7513db6cd 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java @@ -144,7 +144,7 @@ public DataWriter newDataWriter(EncryptedOutputFile file, FileForma public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile file, FileFormat format, StructLike partition) { Preconditions.checkState(equalityFieldIds != null && equalityFieldIds.length > 0, - "Equality field ids shouldn't be null when creating equality-delete writer"); + "Equality field ids shouldn't be null or empty when creating equality-delete writer"); Preconditions.checkNotNull(eqDeleteRowSchema, "Equality delete row schema shouldn't be null when creating equality-delete writer"); From 4b2b04bd45bafa5ea794fd35366e819afc45b574 Mon Sep 17 00:00:00 2001 From: huzheng Date: Tue, 1 Dec 2020 16:16:49 +0800 Subject: [PATCH 7/8] Make the pathPosSchema to be private --- core/src/main/java/org/apache/iceberg/io/DeleteSchemaUtil.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/iceberg/io/DeleteSchemaUtil.java b/core/src/main/java/org/apache/iceberg/io/DeleteSchemaUtil.java index cfab6eda404b..466a2c01d76a 100644 --- a/core/src/main/java/org/apache/iceberg/io/DeleteSchemaUtil.java +++ b/core/src/main/java/org/apache/iceberg/io/DeleteSchemaUtil.java @@ -27,7 +27,7 @@ public class DeleteSchemaUtil { private DeleteSchemaUtil() { } - public static Schema pathPosSchema(Schema rowSchema) { + private static Schema pathPosSchema(Schema rowSchema) { return new Schema( MetadataColumns.DELETE_FILE_PATH, MetadataColumns.DELETE_FILE_POS, From e6a8169000cbb740e3e7a19ae8e843f605e642ed Mon Sep 17 00:00:00 2001 From: huzheng Date: Wed, 2 Dec 2020 09:51:09 +0800 Subject: [PATCH 8/8] Address nit issues. --- .../src/main/java/org/apache/iceberg/parquet/Parquet.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index 4838e223892b..17e1275216e0 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -280,7 +280,7 @@ public static class DeleteWriteBuilder { private StructLike partition = null; private EncryptionKeyMetadata keyMetadata = null; private int[] equalityFieldIds = null; - private Function pathTransformFunc = t -> t; + private Function pathTransformFunc = Function.identity(); private DeleteWriteBuilder(OutputFile file) { this.appenderBuilder = write(file); @@ -384,8 +384,7 @@ public EqualityDeleteWriter buildEqualityWriter() throws IOException { appenderBuilder.build(), FileFormat.PARQUET, location, spec, partition, keyMetadata, equalityFieldIds); } - public PositionDeleteWriter buildPositionWriter() - throws IOException { + public PositionDeleteWriter buildPositionWriter() throws IOException { Preconditions.checkState(equalityFieldIds == null, "Cannot create position delete file using delete field ids"); meta("delete-type", "position"); @@ -408,7 +407,7 @@ public PositionDeleteWriter buildPositionWriter() appenderBuilder.createWriterFunc(parquetSchema -> new PositionDeleteStructWriter((StructWriter) GenericParquetWriter.buildWriter(parquetSchema), - t -> t)); + Function.identity())); } return new PositionDeleteWriter<>(