diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index d00e2a8f37a1..c6e6ddd19358 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -43,12 +43,14 @@ import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.PropertyUtil; +import static org.apache.iceberg.TableProperties.FORMAT_VERSION; +import static org.apache.iceberg.TableProperties.FORMAT_VERSION_DEFAULT; + /** * Metadata for a table. */ public class TableMetadata implements Serializable { static final long INITIAL_SEQUENCE_NUMBER = 0; - static final int DEFAULT_TABLE_FORMAT_VERSION = 1; static final int SUPPORTED_TABLE_FORMAT_VERSION = 2; static final int INITIAL_SPEC_ID = 0; static final int INITIAL_SORT_ORDER_ID = 1; @@ -64,7 +66,8 @@ public static TableMetadata newTableMetadata(TableOperations ops, PartitionSpec spec, String location, Map properties) { - return newTableMetadata(schema, spec, SortOrder.unsorted(), location, properties, DEFAULT_TABLE_FORMAT_VERSION); + int formatVersion = PropertyUtil.propertyAsInt(properties, FORMAT_VERSION, FORMAT_VERSION_DEFAULT); + return newTableMetadata(schema, spec, SortOrder.unsorted(), location, properties, formatVersion); } public static TableMetadata newTableMetadata(Schema schema, @@ -72,14 +75,16 @@ public static TableMetadata newTableMetadata(Schema schema, SortOrder sortOrder, String location, Map properties) { - return newTableMetadata(schema, spec, sortOrder, location, properties, DEFAULT_TABLE_FORMAT_VERSION); + int formatVersion = PropertyUtil.propertyAsInt(properties, FORMAT_VERSION, FORMAT_VERSION_DEFAULT); + return newTableMetadata(schema, spec, sortOrder, location, properties, formatVersion); } public static TableMetadata newTableMetadata(Schema schema, PartitionSpec spec, String location, Map properties) { - return newTableMetadata(schema, spec, SortOrder.unsorted(), location, properties, DEFAULT_TABLE_FORMAT_VERSION); + int formatVersion = PropertyUtil.propertyAsInt(properties, FORMAT_VERSION, FORMAT_VERSION_DEFAULT); + return newTableMetadata(schema, spec, SortOrder.unsorted(), location, properties, formatVersion); } static TableMetadata newTableMetadata(Schema schema, diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index e9930c854151..c2ce1c4076c3 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -137,4 +137,7 @@ private TableProperties() { public static final String GC_ENABLED = "gc.enabled"; public static final boolean GC_ENABLED_DEFAULT = true; + + public static final String FORMAT_VERSION = "format.version"; + public static final int FORMAT_VERSION_DEFAULT = 1; } diff --git a/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java b/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java index 1838ef749b08..05bb8ec77131 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java @@ -61,6 +61,10 @@ public void delete(T row) { appender.add(row); } + public long length() { + return appender.length(); + } + @Override public void close() throws IOException { if (deleteFile == null) { diff --git a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java index ca16318d8bdc..9e1eac271630 100644 --- a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java @@ -23,17 +23,26 @@ import java.io.IOException; import java.util.List; import org.apache.iceberg.DataFile; -import org.apache.iceberg.DataFiles; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.Metrics; import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.StructLikeMap; import org.apache.iceberg.util.Tasks; public abstract class BaseTaskWriter implements TaskWriter { private final List completedFiles = Lists.newArrayList(); + private final List completedDeletes = Lists.newArrayList(); private final PartitionSpec spec; private final FileFormat format; private final FileAppenderFactory appenderFactory; @@ -51,39 +60,149 @@ protected BaseTaskWriter(PartitionSpec spec, FileFormat format, FileAppenderFact this.targetFileSize = targetFileSize; } + protected PartitionSpec spec() { + return spec; + } + + protected FileAppenderFactory appenderFactory() { + return appenderFactory; + } + @Override public void abort() throws IOException { close(); // clean up files created by this writer - Tasks.foreach(completedFiles) + Tasks.foreach(Iterables.concat(completedFiles, completedDeletes)) .throwFailureWhenFinished() .noRetry() .run(file -> io.deleteFile(file.path().toString())); } @Override - public DataFile[] complete() throws IOException { + public WriterResult complete() throws IOException { close(); - return completedFiles.toArray(new DataFile[0]); + return WriterResult.builder() + .addDataFiles(completedFiles) + .addDeleteFiles(completedDeletes) + .build(); + } + + protected abstract class BaseDeltaWriter implements Closeable { + private final RollingFileWriter dataWriter; + + private final boolean enableEqDelete; + private RollingEqDeleteWriter eqDeleteWriter = null; + private SortedPosDeleteWriter posDeleteWriter = null; + private StructLikeMap insertedRowMap = null; + + public BaseDeltaWriter(PartitionKey partition, List equalityFieldIds, Schema schema) { + this.dataWriter = new RollingFileWriter(partition); + + this.enableEqDelete = equalityFieldIds != null && !equalityFieldIds.isEmpty(); + if (enableEqDelete) { + this.eqDeleteWriter = new RollingEqDeleteWriter(partition); + this.posDeleteWriter = new SortedPosDeleteWriter<>(appenderFactory, fileFactory, format, partition); + + Schema deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)); + this.insertedRowMap = StructLikeMap.create(deleteSchema.asStruct()); + } + } + + protected abstract StructLike asKey(T row); + + protected abstract StructLike asCopiedKey(T row); + + public void write(T row) throws IOException { + if (enableEqDelete) { + FilePos filePos = FilePos.create(dataWriter.currentPath(), dataWriter.currentRows()); + + StructLike copiedKey = asCopiedKey(row); + // Adding a pos-delete to replace the old filePos. + FilePos previous = insertedRowMap.put(copiedKey, filePos); + if (previous != null) { + posDeleteWriter.delete(previous.path, previous.rowOffset, null /* TODO set non-nullable row*/); + } + } + + dataWriter.write(row); + } + + public void delete(T row) throws IOException { + Preconditions.checkState(enableEqDelete, "Could not accept equality deletion."); + + StructLike key = asKey(row); + FilePos previous = insertedRowMap.remove(key); + + if (previous != null) { + posDeleteWriter.delete(previous.path, previous.rowOffset, null /* TODO set non-nullable row */); + } + + eqDeleteWriter.write(row); + } + + @Override + public void close() throws IOException { + // Moving the completed data files into task writer's completedFiles automatically. + dataWriter.close(); + + if (enableEqDelete) { + // Moving the completed eq-delete files into task writer's completedDeletes automatically. + eqDeleteWriter.close(); + insertedRowMap.clear(); + + // Moving the completed pos-delete files into completedDeletes. + completedDeletes.addAll(posDeleteWriter.complete()); + } + } + } + + private static class FilePos { + private final CharSequence path; + private final long rowOffset; + + private FilePos(CharSequence path, long rowOffset) { + this.path = path; + this.rowOffset = rowOffset; + } + + private static FilePos create(CharSequence path, long rowOffset) { + return new FilePos(path, rowOffset); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("path", path) + .add("row_offset", rowOffset) + .toString(); + } } - protected class RollingFileWriter implements Closeable { + private abstract class BaseRollingWriter implements Closeable { private static final int ROWS_DIVISOR = 1000; private final PartitionKey partitionKey; private EncryptedOutputFile currentFile = null; - private FileAppender currentAppender = null; + private W currentWriter = null; private long currentRows = 0; - public RollingFileWriter(PartitionKey partitionKey) { + private BaseRollingWriter(PartitionKey partitionKey) { this.partitionKey = partitionKey; openCurrent(); } - public void add(T record) throws IOException { - this.currentAppender.add(record); + abstract W newWriter(EncryptedOutputFile file, PartitionKey key); + + abstract long length(W writer); + + abstract void write(W writer, T record); + + abstract void complete(W closedWriter); + + public void write(T record) throws IOException { + write(currentWriter, record); this.currentRows++; if (shouldRollToNewFile()) { @@ -92,48 +211,45 @@ public void add(T record) throws IOException { } } + public CharSequence currentPath() { + Preconditions.checkNotNull(currentFile, "The currentFile shouldn't be null"); + return currentFile.encryptingOutputFile().location(); + } + + public long currentRows() { + return currentRows; + } + private void openCurrent() { if (partitionKey == null) { // unpartitioned - currentFile = fileFactory.newOutputFile(); + this.currentFile = fileFactory.newOutputFile(); } else { // partitioned - currentFile = fileFactory.newOutputFile(partitionKey); + this.currentFile = fileFactory.newOutputFile(partitionKey); } - currentAppender = appenderFactory.newAppender(currentFile.encryptingOutputFile(), format); - currentRows = 0; + this.currentWriter = newWriter(currentFile, partitionKey); + this.currentRows = 0; } private boolean shouldRollToNewFile() { // TODO: ORC file now not support target file size before closed return !format.equals(FileFormat.ORC) && - currentRows % ROWS_DIVISOR == 0 && currentAppender.length() >= targetFileSize; + currentRows % ROWS_DIVISOR == 0 && length(currentWriter) >= targetFileSize; } private void closeCurrent() throws IOException { - if (currentAppender != null) { - currentAppender.close(); - // metrics are only valid after the appender is closed - Metrics metrics = currentAppender.metrics(); - long fileSizeInBytes = currentAppender.length(); - List splitOffsets = currentAppender.splitOffsets(); - this.currentAppender = null; - - if (metrics.recordCount() == 0L) { + if (currentWriter != null) { + currentWriter.close(); + + if (currentRows == 0L) { io.deleteFile(currentFile.encryptingOutputFile()); } else { - DataFile dataFile = DataFiles.builder(spec) - .withEncryptionKeyMetadata(currentFile.keyMetadata()) - .withPath(currentFile.encryptingOutputFile().location()) - .withFileSizeInBytes(fileSizeInBytes) - .withPartition(spec.fields().size() == 0 ? null : partitionKey) // set null if unpartitioned - .withMetrics(metrics) - .withSplitOffsets(splitOffsets) - .build(); - completedFiles.add(dataFile); + complete(currentWriter); } this.currentFile = null; + this.currentWriter = null; this.currentRows = 0; } } @@ -143,4 +259,56 @@ public void close() throws IOException { closeCurrent(); } } + + protected class RollingFileWriter extends BaseRollingWriter> { + public RollingFileWriter(PartitionKey partitionKey) { + super(partitionKey); + } + + @Override + DataWriter newWriter(EncryptedOutputFile file, PartitionKey key) { + return appenderFactory.newDataWriter(file, format, key); + } + + @Override + long length(DataWriter writer) { + return writer.length(); + } + + @Override + void write(DataWriter writer, T record) { + writer.add(record); + } + + @Override + void complete(DataWriter closedWriter) { + completedFiles.add(closedWriter.toDataFile()); + } + } + + private class RollingEqDeleteWriter extends BaseRollingWriter> { + private RollingEqDeleteWriter(PartitionKey partitionKey) { + super(partitionKey); + } + + @Override + EqualityDeleteWriter newWriter(EncryptedOutputFile file, PartitionKey key) { + return appenderFactory.newEqDeleteWriter(file, format, key); + } + + @Override + long length(EqualityDeleteWriter writer) { + return writer.length(); + } + + @Override + void write(EqualityDeleteWriter writer, T record) { + writer.delete(record); + } + + @Override + void complete(EqualityDeleteWriter closedWriter) { + completedDeletes.add(closedWriter.toDeleteFile()); + } + } } diff --git a/core/src/main/java/org/apache/iceberg/io/DataWriter.java b/core/src/main/java/org/apache/iceberg/io/DataWriter.java new file mode 100644 index 000000000000..3b01bdcdbc94 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/DataWriter.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class DataWriter implements Closeable { + private final FileAppender appender; + private final FileFormat format; + private final String location; + private final PartitionSpec spec; + private final StructLike partition; + private final ByteBuffer keyMetadata; + private DataFile dataFile = null; + + public DataWriter(FileAppender appender, FileFormat format, String location, + PartitionSpec spec, StructLike partition, EncryptionKeyMetadata keyMetadata) { + this.appender = appender; + this.format = format; + this.location = location; + this.spec = spec; + this.partition = partition; + this.keyMetadata = keyMetadata != null ? keyMetadata.buffer() : null; + } + + public void add(T row) { + appender.add(row); + } + + public long length() { + return appender.length(); + } + + @Override + public void close() throws IOException { + if (dataFile == null) { + appender.close(); + this.dataFile = DataFiles.builder(spec) + .withFormat(format) + .withPath(location) + .withPartition(partition) + .withEncryptionKeyMetadata(keyMetadata) + .withFileSizeInBytes(appender.length()) + .withMetrics(appender.metrics()) + .withSplitOffsets(appender.splitOffsets()) + .build(); + } + } + + public DataFile toDataFile() { + Preconditions.checkState(dataFile != null, "Cannot create data file from unclosed writer"); + return dataFile; + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/DeleteUtil.java b/core/src/main/java/org/apache/iceberg/io/DeleteUtil.java new file mode 100644 index 000000000000..49f7699ef16d --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/DeleteUtil.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Schema; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Types; + +public class DeleteUtil { + + private DeleteUtil() { + } + + public static Schema pathPosSchema() { + return new Schema( + MetadataColumns.DELETE_FILE_PATH, + MetadataColumns.DELETE_FILE_POS); + } + + public static Schema pathPosSchema(Schema rowSchema) { + Preconditions.checkNotNull(rowSchema, "Row schema should not be null when constructing the pos-delete schema."); + + return new Schema( + MetadataColumns.DELETE_FILE_PATH, + MetadataColumns.DELETE_FILE_POS, + Types.NestedField.required( + MetadataColumns.DELETE_FILE_ROW_FIELD_ID, "row", rowSchema.asStruct(), + MetadataColumns.DELETE_FILE_ROW_DOC)); + } + + public static Schema posDeleteSchema(Schema rowSchema) { + return rowSchema == null ? pathPosSchema() : pathPosSchema(rowSchema); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java b/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java index 9afdca460f0a..e2d8f88f1fe9 100644 --- a/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java +++ b/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java @@ -20,6 +20,10 @@ package org.apache.iceberg.io; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; /** * Factory to create a new {@link FileAppender} to write records. @@ -36,4 +40,34 @@ public interface FileAppenderFactory { * @return a newly created {@link FileAppender} */ FileAppender newAppender(OutputFile outputFile, FileFormat fileFormat); + + /** + * Create a new {@link DataWriter}. + * + * @param outputFile an OutputFile used to create an output stream. + * @param format a file format + * @param partition a tuple of partition values + * @return a newly created {@link DataWriter} for rows + */ + DataWriter newDataWriter(EncryptedOutputFile outputFile, FileFormat format, StructLike partition); + + /** + * Create a new {@link EqualityDeleteWriter}. + * + * @param outputFile an OutputFile used to create an output stream. + * @param format a file format + * @param partition a tuple of partition values + * @return a newly created {@link EqualityDeleteWriter} for equality deletes + */ + EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile outputFile, FileFormat format, StructLike partition); + + /** + * Create a new {@link PositionDeleteWriter}. + * + * @param outputFile an OutputFile used to create an output stream. + * @param format a file format + * @param partition a tuple of partition values + * @return a newly created {@link EqualityDeleteWriter} for position deletes + */ + PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile outputFile, FileFormat format, StructLike partition); } diff --git a/core/src/main/java/org/apache/iceberg/io/PartitionedWriter.java b/core/src/main/java/org/apache/iceberg/io/PartitionedWriter.java index 7b5fa90e4502..566702cd03c8 100644 --- a/core/src/main/java/org/apache/iceberg/io/PartitionedWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/PartitionedWriter.java @@ -73,7 +73,7 @@ public void write(T row) throws IOException { currentWriter = new RollingFileWriter(currentKey); } - currentWriter.add(row); + currentWriter.write(row); } @Override diff --git a/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java new file mode 100644 index 000000000000..f59fafd6e3ff --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.Closeable; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Comparators; + +public class SortedPosDeleteWriter implements Closeable { + private static final int RECORDS_NUM_THRESHOLD = 1000_000; + + private final Map> posDeletes = Maps.newHashMap(); + private final List completedFiles = Lists.newArrayList(); + + private final FileAppenderFactory appenderFactory; + private final OutputFileFactory fileFactory; + private final FileFormat format; + private final PartitionKey partitionKey; + + private int records = 0; + + public SortedPosDeleteWriter(FileAppenderFactory appenderFactory, + OutputFileFactory fileFactory, + FileFormat format, + PartitionKey partitionKey) { + this.appenderFactory = appenderFactory; + this.fileFactory = fileFactory; + this.format = format; + this.partitionKey = partitionKey; + } + + public void delete(CharSequence path, long pos) { + delete(path, pos, null); + } + + public void delete(CharSequence path, long pos, T row) { + // TODO support non-null row in future. + Preconditions.checkArgument(row == null, "Does not support non-null row in pos-delete now."); + + posDeletes.compute(path, (k, v) -> { + if (v == null) { + return Lists.newArrayList(pos); + } else { + v.add(pos); + return v; + } + }); + + records += 1; + if (records >= RECORDS_NUM_THRESHOLD) { + flush(); + records = 0; + } + } + + public List complete() { + flush(); + + return completedFiles; + } + + + @Override + public void close() throws IOException { + flush(); + } + + private void flush() { + if (posDeletes.isEmpty()) { + return; + } + + EncryptedOutputFile outputFile; + if (partitionKey == null) { + outputFile = fileFactory.newOutputFile(); + } else { + outputFile = fileFactory.newOutputFile(partitionKey); + } + + PositionDeleteWriter writer = appenderFactory.newPosDeleteWriter(outputFile, format, partitionKey); + try (PositionDeleteWriter closeableWriter = writer) { + CharSequence[] paths = posDeletes.keySet().toArray(new CharSequence[0]); + Arrays.sort(paths, Comparators.charSequences()); + + for (CharSequence path : paths) { + List positions = posDeletes.get(path); + Collections.sort(positions); + + for (Long position : positions) { + closeableWriter.delete(path, position); + } + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + // Clear the buffered pos-deletions. + posDeletes.clear(); + + completedFiles.add(writer.toDeleteFile()); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/TaskWriter.java b/core/src/main/java/org/apache/iceberg/io/TaskWriter.java index 806e37de1bee..aedc6da8d815 100644 --- a/core/src/main/java/org/apache/iceberg/io/TaskWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/TaskWriter.java @@ -21,7 +21,6 @@ import java.io.Closeable; import java.io.IOException; -import org.apache.iceberg.DataFile; /** * The writer interface could accept records and provide the generated data files. @@ -43,9 +42,9 @@ public interface TaskWriter extends Closeable { void abort() throws IOException; /** - * Close the writer and get the completed data files. + * Close the writer and get the completed data and delete files. * - * @return the completed data files of this task writer. + * @return the completed files of this task writer. */ - DataFile[] complete() throws IOException; + WriterResult complete() throws IOException; } diff --git a/core/src/main/java/org/apache/iceberg/io/UnpartitionedWriter.java b/core/src/main/java/org/apache/iceberg/io/UnpartitionedWriter.java index 37f3db49aef4..2e98706816c7 100644 --- a/core/src/main/java/org/apache/iceberg/io/UnpartitionedWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/UnpartitionedWriter.java @@ -35,7 +35,7 @@ public UnpartitionedWriter(PartitionSpec spec, FileFormat format, FileAppenderFa @Override public void write(T record) throws IOException { - currentWriter.add(record); + currentWriter.write(record); } @Override diff --git a/core/src/main/java/org/apache/iceberg/io/WriterResult.java b/core/src/main/java/org/apache/iceberg/io/WriterResult.java new file mode 100644 index 000000000000..543d36cf3894 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/WriterResult.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.Serializable; +import java.util.List; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +public class WriterResult implements Serializable { + private static final DataFile[] EMPTY_DATA_FILES = new DataFile[0]; + private static final DeleteFile[] EMPTY_DELETE_FILES = new DeleteFile[0]; + + private DataFile[] dataFiles; + private DeleteFile[] deleteFiles; + + private WriterResult(DataFile[] dataFiles, DeleteFile[] deleteFiles) { + this.dataFiles = dataFiles; + this.deleteFiles = deleteFiles; + } + + private WriterResult(List dataFiles, List deleteFiles) { + this.dataFiles = dataFiles.toArray(new DataFile[0]); + this.deleteFiles = deleteFiles.toArray(new DeleteFile[0]); + } + + public DataFile[] dataFiles() { + return dataFiles; + } + + public DeleteFile[] deleteFiles() { + return deleteFiles; + } + + public static WriterResult create(DataFile dataFile) { + return new WriterResult(new DataFile[] {dataFile}, EMPTY_DELETE_FILES); + } + + public static WriterResult create(DeleteFile deleteFile) { + return new WriterResult(EMPTY_DATA_FILES, new DeleteFile[] {deleteFile}); + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private final List dataFiles; + private final List deleteFiles; + + private Builder() { + this.dataFiles = Lists.newArrayList(); + this.deleteFiles = Lists.newArrayList(); + } + + public Builder add(WriterResult result) { + for (DataFile dataFile : result.dataFiles()) { + add(dataFile); + } + for (DeleteFile deleteFile : result.deleteFiles()) { + add(deleteFile); + } + + return this; + } + + public Builder addDataFiles(Iterable iterable) { + for (DataFile dataFile : iterable) { + add(dataFile); + } + + return this; + } + + public Builder addDeleteFiles(Iterable iterable) { + for (DeleteFile deleteFile : iterable) { + add(deleteFile); + } + + return this; + } + + public void add(ContentFile contentFile) { + Preconditions.checkNotNull(contentFile, "Content file shouldn't be null."); + switch (contentFile.content()) { + case DATA: + this.dataFiles.add((DataFile) contentFile); + break; + + case EQUALITY_DELETES: + case POSITION_DELETES: + this.deleteFiles.add((DeleteFile) contentFile); + break; + + default: + throw new UnsupportedOperationException("Unknown content file: " + contentFile.content()); + } + } + + public WriterResult build() { + return new WriterResult(dataFiles, deleteFiles); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/util/StructLikeMap.java b/core/src/main/java/org/apache/iceberg/util/StructLikeMap.java new file mode 100644 index 000000000000..5b045e66d0f6 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/util/StructLikeMap.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.util; + +import java.util.AbstractMap; +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types; + +public class StructLikeMap extends AbstractMap implements Map { + + public static StructLikeMap create(Types.StructType type) { + return new StructLikeMap<>(type); + } + + private final Types.StructType type; + private final Map wrapperMap; + private final ThreadLocal wrappers; + + private StructLikeMap(Types.StructType type) { + this.type = type; + this.wrapperMap = Maps.newHashMap(); + this.wrappers = ThreadLocal.withInitial(() -> StructLikeWrapper.forType(type)); + } + + @Override + public int size() { + return wrapperMap.size(); + } + + @Override + public boolean isEmpty() { + return wrapperMap.isEmpty(); + } + + @Override + public boolean containsKey(Object key) { + if (key instanceof StructLike) { + StructLikeWrapper wrapper = wrappers.get(); + boolean result = wrapperMap.containsKey(wrapper.set((StructLike) key)); + wrapper.set(null); + return result; + } + return false; + } + + @Override + public boolean containsValue(Object value) { + throw new UnsupportedOperationException(); + } + + @Override + public T get(Object key) { + if (key instanceof StructLike) { + StructLikeWrapper wrapper = wrappers.get(); + T value = wrapperMap.get(wrapper.set((StructLike) key)); + wrapper.set(null); + return value; + } + return null; + } + + @Override + public T put(StructLike key, T value) { + return wrapperMap.put(StructLikeWrapper.forType(type).set(key), value); + } + + @Override + public T remove(Object key) { + if (key instanceof StructLike) { + StructLikeWrapper wrapper = wrappers.get(); + T value = wrapperMap.remove(wrapper.set((StructLike) key)); + wrapper.set(null); // don't hold a reference to the value. + return value; + } + return null; + } + + @Override + public void putAll(Map keyValues) { + if (keyValues != null && !keyValues.isEmpty()) { + for (Map.Entry pair : keyValues.entrySet()) { + wrapperMap.put(StructLikeWrapper.forType(type).set(pair.getKey()), pair.getValue()); + } + } + } + + @Override + public void clear() { + wrapperMap.clear(); + } + + @Override + public Set keySet() { + return wrapperMap.keySet().stream().map(StructLikeWrapper::get).collect(Collectors.toSet()); + } + + @Override + public Collection values() { + return wrapperMap.values(); + } + + @Override + public Set> entrySet() { + throw new UnsupportedOperationException(); + } +} diff --git a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java index e5d2652bcb11..1b77cd0d39d9 100644 --- a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java +++ b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java @@ -24,11 +24,16 @@ import java.util.Map; import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.data.avro.DataWriter; import org.apache.iceberg.data.orc.GenericOrcWriter; import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.OutputFile; @@ -42,10 +47,12 @@ public class GenericAppenderFactory implements FileAppenderFactory { private final Schema schema; + private final PartitionSpec spec; private final Map config = Maps.newHashMap(); - public GenericAppenderFactory(Schema schema) { + public GenericAppenderFactory(Schema schema, PartitionSpec spec) { this.schema = schema; + this.spec = spec; } public GenericAppenderFactory set(String property, String value) { @@ -95,4 +102,24 @@ public FileAppender newAppender(OutputFile outputFile, FileFormat fileFo throw new UncheckedIOException(e); } } + + @Override + public org.apache.iceberg.io.DataWriter newDataWriter(EncryptedOutputFile file, FileFormat format, + StructLike partition) { + return new org.apache.iceberg.io.DataWriter<>( + newAppender(file.encryptingOutputFile(), format), format, + file.encryptingOutputFile().location(), spec, partition, file.keyMetadata()); + } + + @Override + public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile outputFile, FileFormat format, + StructLike partition) { + throw new UnsupportedOperationException("Cannot create equality-delete writer for generic record now."); + } + + @Override + public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile outputFile, FileFormat format, + StructLike partition) { + throw new UnsupportedOperationException("Cannot create pos-delete writer for generic record now."); + } } diff --git a/data/src/test/java/org/apache/iceberg/TestSplitScan.java b/data/src/test/java/org/apache/iceberg/TestSplitScan.java index 2f191f6160d7..3500a552787f 100644 --- a/data/src/test/java/org/apache/iceberg/TestSplitScan.java +++ b/data/src/test/java/org/apache/iceberg/TestSplitScan.java @@ -63,7 +63,7 @@ public class TestSplitScan { @Parameterized.Parameters(name = "format = {0}") public static Object[] parameters() { - return new Object[] { "parquet", "avro" }; + return new Object[] {"parquet", "avro"}; } private final FileFormat format; @@ -116,7 +116,7 @@ private File writeToFile(List records, FileFormat fileFormat) throws IOE File file = temp.newFile(); Assert.assertTrue(file.delete()); - GenericAppenderFactory factory = new GenericAppenderFactory(SCHEMA).set( + GenericAppenderFactory factory = new GenericAppenderFactory(SCHEMA, table.spec()).set( TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, String.valueOf(SPLIT_SIZE)); try (FileAppender appender = factory.newAppender(Files.localOutput(file), fileFormat)) { appender.addAll(records); diff --git a/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java b/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java index c32be08263a9..edd4ae65f742 100644 --- a/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java +++ b/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java @@ -79,7 +79,7 @@ public DataFile writeFile(StructLike partition, List records) throws IOE private static DataFile appendToLocalFile( Table table, File file, FileFormat format, StructLike partition, List records) throws IOException { - FileAppender appender = new GenericAppenderFactory(table.schema()).newAppender( + FileAppender appender = new GenericAppenderFactory(table.schema(), table.spec()).newAppender( Files.localOutput(file), format); try (FileAppender fileAppender = appender) { fileAppender.addAll(records); diff --git a/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java b/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java index 1cc74ba029ff..5847247406df 100644 --- a/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java +++ b/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java @@ -449,7 +449,7 @@ private DataFile writeFile(String location, String filename, Schema schema, List FileFormat fileFormat = FileFormat.fromFileName(filename); Preconditions.checkNotNull(fileFormat, "Cannot determine format for file: %s", filename); - FileAppender fileAppender = new GenericAppenderFactory(schema).newAppender( + FileAppender fileAppender = new GenericAppenderFactory(schema, PartitionSpec.unpartitioned()).newAppender( fromPath(path, CONF), fileFormat); try (FileAppender appender = fileAppender) { appender.addAll(records); diff --git a/data/src/test/java/org/apache/iceberg/parquet/TestParquetMergingMetrics.java b/data/src/test/java/org/apache/iceberg/parquet/TestParquetMergingMetrics.java index 03118e3a65f2..02580f2ff66c 100644 --- a/data/src/test/java/org/apache/iceberg/parquet/TestParquetMergingMetrics.java +++ b/data/src/test/java/org/apache/iceberg/parquet/TestParquetMergingMetrics.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.List; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.TestMergingMetrics; import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.Record; @@ -31,7 +32,7 @@ public class TestParquetMergingMetrics extends TestMergingMetrics { @Override protected FileAppender writeAndGetAppender(List records) throws IOException { - FileAppender appender = new GenericAppenderFactory(SCHEMA).newAppender( + FileAppender appender = new GenericAppenderFactory(SCHEMA, PartitionSpec.unpartitioned()).newAppender( org.apache.iceberg.Files.localOutput(temp.newFile()), FileFormat.PARQUET); try (FileAppender fileAppender = appender) { records.forEach(fileAppender::add); diff --git a/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java b/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java index 401e9db65992..19c7735779e6 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java +++ b/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java @@ -53,6 +53,16 @@ public RowDataWrapper(RowType rowType, Types.StructType struct) { } } + private RowDataWrapper(RowDataWrapper toCopy) { + this.types = toCopy.types; + this.getters = toCopy.getters; + this.rowData = toCopy.rowData; + } + + public RowDataWrapper copy() { + return new RowDataWrapper(this); + } + public RowDataWrapper wrap(RowData data) { this.rowData = data; return this; diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileAppenderFactory.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileAppenderFactory.java new file mode 100644 index 000000000000..17d21ae95d1c --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileAppenderFactory.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.sink; + +import java.io.IOException; +import java.io.Serializable; +import java.io.UncheckedIOException; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import org.apache.avro.io.DatumWriter; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.data.FlinkAvroWriter; +import org.apache.iceberg.flink.data.FlinkOrcWriter; +import org.apache.iceberg.flink.data.FlinkParquetWriters; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.DeleteUtil; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.parquet.Parquet; + +public class FlinkFileAppenderFactory implements FileAppenderFactory, Serializable { + private final Schema schema; + private final RowType flinkSchema; + private final Map props; + private final PartitionSpec spec; + private final List equalityFieldIds; + + private final boolean posDeleteWithRow; + + public FlinkFileAppenderFactory(Schema schema, RowType flinkSchema, Map props, + PartitionSpec spec, List equalityFieldIds) { + this.schema = schema; + this.flinkSchema = flinkSchema; + this.props = props; + this.spec = spec; + this.equalityFieldIds = equalityFieldIds; + + // TODO Set this switch to true if needed. + this.posDeleteWithRow = false; + } + + @Override + public FileAppender newAppender(OutputFile outputFile, FileFormat format) { + MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); + try { + switch (format) { + case AVRO: + return Avro.write(outputFile) + .createWriterFunc(ignore -> new FlinkAvroWriter(flinkSchema)) + .setAll(props) + .schema(schema) + .overwrite() + .build(); + + case ORC: + return ORC.write(outputFile) + .createWriterFunc((iSchema, typDesc) -> FlinkOrcWriter.buildWriter(flinkSchema, iSchema)) + .setAll(props) + .schema(schema) + .overwrite() + .build(); + + case PARQUET: + return Parquet.write(outputFile) + .createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(flinkSchema, msgType)) + .setAll(props) + .metricsConfig(metricsConfig) + .schema(schema) + .overwrite() + .build(); + + default: + throw new UnsupportedOperationException("Cannot write unknown file format: " + format); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public DataWriter newDataWriter(EncryptedOutputFile file, FileFormat format, StructLike partition) { + return new DataWriter<>( + newAppender(file.encryptingOutputFile(), format), format, + file.encryptingOutputFile().location(), spec, partition, file.keyMetadata()); + } + + @Override + public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile outputFile, FileFormat format, + StructLike partition) { + MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); + try { + switch (format) { + case AVRO: + return Avro.writeDeletes(outputFile.encryptingOutputFile()) + .createWriterFunc(ignore -> new FlinkAvroWriter(flinkSchema)) + .withPartition(partition) + .overwrite() + .setAll(props) + .rowSchema(schema) + .withSpec(spec) + .withKeyMetadata(outputFile.keyMetadata()) + .equalityFieldIds(equalityFieldIds) + .buildEqualityWriter(); + + case PARQUET: + return Parquet.writeDeletes(outputFile.encryptingOutputFile()) + .createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(flinkSchema, msgType)) + .withPartition(partition) + .overwrite() + .setAll(props) + .metricsConfig(metricsConfig) + .rowSchema(schema) + .withSpec(spec) + .withKeyMetadata(outputFile.keyMetadata()) + .equalityFieldIds(equalityFieldIds) + .buildEqualityWriter(); + + default: + throw new UnsupportedOperationException("Cannot write unknown file format: " + format); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile outputFile, FileFormat format, + StructLike partition) { + MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); + Schema rowSchema = posDeleteWithRow ? schema : null; + try { + switch (format) { + case AVRO: + Function> writeFunc = null; + if (rowSchema != null) { + writeFunc = ignore -> new FlinkAvroWriter(FlinkSchemaUtil.convert(rowSchema)); + } + + return Avro.writeDeletes(outputFile.encryptingOutputFile()) + .createWriterFunc(writeFunc) + .withPartition(partition) + .overwrite() + .setAll(props) + .rowSchema(rowSchema) + .withSpec(spec) + .withKeyMetadata(outputFile.keyMetadata()) + .buildPositionWriter(); + + case PARQUET: + RowType flinkParquetRowType = FlinkSchemaUtil.convert(DeleteUtil.posDeleteSchema(rowSchema)); + + return Parquet.writeDeletes(outputFile.encryptingOutputFile()) + .createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(flinkParquetRowType, msgType)) + .withPartition(partition) + .overwrite() + .setAll(props) + .metricsConfig(metricsConfig) + .rowSchema(rowSchema) + .withSpec(spec) + .withKeyMetadata(outputFile.keyMetadata()) + .buildPositionWriter(); + + default: + throw new UnsupportedOperationException("Cannot write unknown file format: " + format); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java index 7f1d9ca021fa..47546ee67bb9 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.util.List; import java.util.Locale; import java.util.Map; import org.apache.flink.api.common.functions.MapFunction; @@ -43,6 +44,7 @@ import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.PropertyUtil; @@ -112,6 +114,7 @@ public static class Builder { private TableSchema tableSchema; private boolean overwrite = false; private Integer writeParallelism = null; + private List equalityColumns = null; private Builder() { } @@ -168,6 +171,11 @@ public Builder writeParallelism(int newWriteParallelism) { return this; } + public Builder equalityColumns(List newEqualityColumns) { + this.equalityColumns = newEqualityColumns; + return this; + } + @SuppressWarnings("unchecked") public DataStreamSink build() { Preconditions.checkArgument(rowDataInput != null, @@ -183,7 +191,17 @@ public DataStreamSink build() { } } - IcebergStreamWriter streamWriter = createStreamWriter(table, tableSchema); + List equalityFieldIds = Lists.newArrayList(); + if (equalityColumns != null && !equalityColumns.isEmpty()) { + for (String column : equalityColumns) { + org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column); + Preconditions.checkNotNull(field, + "Field with column %s does not find in iceberg schema.", column); + equalityFieldIds.add(field.fieldId()); + } + } + + IcebergStreamWriter streamWriter = createStreamWriter(table, tableSchema, equalityFieldIds); IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, overwrite); this.writeParallelism = writeParallelism == null ? rowDataInput.getParallelism() : writeParallelism; @@ -201,7 +219,8 @@ public DataStreamSink build() { } } - static IcebergStreamWriter createStreamWriter(Table table, TableSchema requestedSchema) { + static IcebergStreamWriter createStreamWriter(Table table, TableSchema requestedSchema, + List equalityFieldIds) { Preconditions.checkArgument(table != null, "Iceberg table should't be null"); RowType flinkSchema; @@ -224,7 +243,8 @@ static IcebergStreamWriter createStreamWriter(Table table, TableSchema FileFormat fileFormat = getFileFormat(props); TaskWriterFactory taskWriterFactory = new RowDataTaskWriterFactory(table.schema(), flinkSchema, - table.spec(), table.locationProvider(), table.io(), table.encryption(), targetFileSize, fileFormat, props); + table.spec(), table.locationProvider(), table.io(), table.encryption(), targetFileSize, fileFormat, props, + equalityFieldIds); return new IcebergStreamWriter<>(table.name(), taskWriterFactory); } diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java index 95daa9656c68..8e9861171534 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java @@ -62,7 +62,7 @@ public void open() { @Override public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { // close all open files and emit files to downstream committer operator - for (DataFile dataFile : writer.complete()) { + for (DataFile dataFile : writer.complete().dataFiles()) { emit(dataFile); } @@ -87,7 +87,7 @@ public void dispose() throws Exception { public void endInput() throws IOException { // For bounded stream, it may don't enable the checkpoint mechanism so we'd better to emit the remaining // data files to downstream before closing the writer so that we won't miss any of them. - for (DataFile dataFile : writer.complete()) { + for (DataFile dataFile : writer.complete().dataFiles()) { emit(dataFile); } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedFanoutWriter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedFanoutWriter.java deleted file mode 100644 index ad846974adcf..000000000000 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedFanoutWriter.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.flink.sink; - -import java.io.IOException; -import java.util.Map; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.PartitionKey; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.io.BaseTaskWriter; -import org.apache.iceberg.io.FileAppenderFactory; -import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.io.OutputFileFactory; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; - -abstract class PartitionedFanoutWriter extends BaseTaskWriter { - private final Map writers = Maps.newHashMap(); - - PartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory appenderFactory, - OutputFileFactory fileFactory, FileIO io, long targetFileSize) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize); - } - - /** - * Create a PartitionKey from the values in row. - *

- * Any PartitionKey returned by this method can be reused by the implementation. - * - * @param row a data row - */ - protected abstract PartitionKey partition(T row); - - @Override - public void write(T row) throws IOException { - PartitionKey partitionKey = partition(row); - - RollingFileWriter writer = writers.get(partitionKey); - if (writer == null) { - // NOTICE: we need to copy a new partition key here, in case of messing up the keys in writers. - PartitionKey copiedKey = partitionKey.copy(); - writer = new RollingFileWriter(copiedKey); - writers.put(copiedKey, writer); - } - - writer.add(row); - } - - @Override - public void close() throws IOException { - if (!writers.isEmpty()) { - for (PartitionKey key : writers.keySet()) { - writers.get(key).close(); - } - writers.clear(); - } - } -} diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriter.java new file mode 100644 index 000000000000..2c4d5b645e40 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriter.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.sink; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import java.util.Map; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.flink.RowDataWrapper; +import org.apache.iceberg.io.BaseTaskWriter; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.Tasks; + +public class RowDataTaskWriter extends BaseTaskWriter { + + private static final PartitionKey UNPARTITIONED_KEY = new PartitionKey(PartitionSpec.unpartitioned(), null); + + private final Schema schema; + private final PartitionKey partitionKey; + private final RowDataWrapper rowDataWrapper; + private final List equalityFieldIds; + + private final Map deltaWriterMap = Maps.newHashMap(); + + RowDataTaskWriter(PartitionSpec spec, + FileFormat format, + FileAppenderFactory appenderFactory, + OutputFileFactory fileFactory, + FileIO io, + long targetFileSize, + Schema schema, + RowType flinkSchema, + List equalityFieldIds) { + super(spec, format, appenderFactory, fileFactory, io, targetFileSize); + + this.schema = schema; + this.equalityFieldIds = equalityFieldIds; + + this.partitionKey = new PartitionKey(spec, schema); + this.rowDataWrapper = new RowDataWrapper(flinkSchema, schema.asStruct()); + } + + @Override + public void write(RowData row) throws IOException { + RowDataDeltaWriter deltaWriter; + + if (spec().fields().size() <= 0) { + // Create and cache the delta writer if absent for unpartitioned table. + deltaWriter = deltaWriterMap.get(UNPARTITIONED_KEY); + if (deltaWriter == null) { + deltaWriter = new RowDataDeltaWriter(null, equalityFieldIds); + deltaWriterMap.put(UNPARTITIONED_KEY, deltaWriter); + } + } else { + // Refresh the current partition key. + partitionKey.partition(rowDataWrapper.wrap(row)); + + // Create and cache the delta writer if absent for partitioned table. + deltaWriter = deltaWriterMap.get(partitionKey); + if (deltaWriter == null) { + // NOTICE: we need to copy a new partition key here, in case of messing up the keys in writers. + PartitionKey copiedKey = partitionKey.copy(); + deltaWriter = new RowDataDeltaWriter(copiedKey, equalityFieldIds); + deltaWriterMap.put(copiedKey, deltaWriter); + } + } + + switch (row.getRowKind()) { + case INSERT: + case UPDATE_AFTER: + deltaWriter.write(row); + break; + + case DELETE: + case UPDATE_BEFORE: + deltaWriter.delete(row); + break; + + default: + throw new UnsupportedOperationException("Unknown row kind: " + row.getRowKind()); + } + } + + @Override + public void close() { + Tasks.foreach(deltaWriterMap.values()) + .throwFailureWhenFinished() + .noRetry() + .run(deltaWriter -> { + try { + deltaWriter.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + } + + private class RowDataDeltaWriter extends BaseDeltaWriter { + + private RowDataDeltaWriter(PartitionKey partition, List equalityFieldIds) { + super(partition, equalityFieldIds, schema); + } + + @Override + protected StructLike asKey(RowData row) { + return rowDataWrapper.wrap(row); + } + + @Override + protected StructLike asCopiedKey(RowData row) { + return rowDataWrapper.wrap(row).copy(); + } + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java b/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java index 72e7a7941b77..fc1c168eba55 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java @@ -19,33 +19,19 @@ package org.apache.iceberg.flink.sink; -import java.io.IOException; -import java.io.Serializable; -import java.io.UncheckedIOException; +import java.util.List; import java.util.Map; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.MetricsConfig; -import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; -import org.apache.iceberg.avro.Avro; import org.apache.iceberg.encryption.EncryptionManager; -import org.apache.iceberg.flink.RowDataWrapper; -import org.apache.iceberg.flink.data.FlinkAvroWriter; -import org.apache.iceberg.flink.data.FlinkOrcWriter; -import org.apache.iceberg.flink.data.FlinkParquetWriters; -import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.LocationProvider; -import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.io.TaskWriter; -import org.apache.iceberg.io.UnpartitionedWriter; -import org.apache.iceberg.orc.ORC; -import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; public class RowDataTaskWriterFactory implements TaskWriterFactory { @@ -57,6 +43,7 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory { private final EncryptionManager encryptionManager; private final long targetFileSizeBytes; private final FileFormat format; + private final List equalityFieldIds; private final FileAppenderFactory appenderFactory; private transient OutputFileFactory outputFileFactory; @@ -69,7 +56,8 @@ public RowDataTaskWriterFactory(Schema schema, EncryptionManager encryptionManager, long targetFileSizeBytes, FileFormat format, - Map tableProperties) { + Map tableProperties, + List equalityFieldIds) { this.schema = schema; this.flinkSchema = flinkSchema; this.spec = spec; @@ -78,7 +66,8 @@ public RowDataTaskWriterFactory(Schema schema, this.encryptionManager = encryptionManager; this.targetFileSizeBytes = targetFileSizeBytes; this.format = format; - this.appenderFactory = new FlinkFileAppenderFactory(schema, flinkSchema, tableProperties); + this.equalityFieldIds = equalityFieldIds; + this.appenderFactory = new FlinkFileAppenderFactory(schema, flinkSchema, tableProperties, spec, equalityFieldIds); } @Override @@ -91,81 +80,7 @@ public TaskWriter create() { Preconditions.checkNotNull(outputFileFactory, "The outputFileFactory shouldn't be null if we have invoked the initialize()."); - if (spec.fields().isEmpty()) { - return new UnpartitionedWriter<>(spec, format, appenderFactory, outputFileFactory, io, targetFileSizeBytes); - } else { - return new RowDataPartitionedFanoutWriter(spec, format, appenderFactory, outputFileFactory, - io, targetFileSizeBytes, schema, flinkSchema); - } - } - - private static class RowDataPartitionedFanoutWriter extends PartitionedFanoutWriter { - - private final PartitionKey partitionKey; - private final RowDataWrapper rowDataWrapper; - - RowDataPartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory appenderFactory, - OutputFileFactory fileFactory, FileIO io, long targetFileSize, Schema schema, - RowType flinkSchema) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize); - this.partitionKey = new PartitionKey(spec, schema); - this.rowDataWrapper = new RowDataWrapper(flinkSchema, schema.asStruct()); - } - - @Override - protected PartitionKey partition(RowData row) { - partitionKey.partition(rowDataWrapper.wrap(row)); - return partitionKey; - } - } - - public static class FlinkFileAppenderFactory implements FileAppenderFactory, Serializable { - private final Schema schema; - private final RowType flinkSchema; - private final Map props; - - public FlinkFileAppenderFactory(Schema schema, RowType flinkSchema, Map props) { - this.schema = schema; - this.flinkSchema = flinkSchema; - this.props = props; - } - - @Override - public FileAppender newAppender(OutputFile outputFile, FileFormat format) { - MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); - try { - switch (format) { - case AVRO: - return Avro.write(outputFile) - .createWriterFunc(ignore -> new FlinkAvroWriter(flinkSchema)) - .setAll(props) - .schema(schema) - .overwrite() - .build(); - - case ORC: - return ORC.write(outputFile) - .createWriterFunc((iSchema, typDesc) -> FlinkOrcWriter.buildWriter(flinkSchema, iSchema)) - .setAll(props) - .schema(schema) - .overwrite() - .build(); - - case PARQUET: - return Parquet.write(outputFile) - .createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(flinkSchema, msgType)) - .setAll(props) - .metricsConfig(metricsConfig) - .schema(schema) - .overwrite() - .build(); - - default: - throw new UnsupportedOperationException("Cannot write unknown file format: " + format); - } - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } + return new RowDataTaskWriter(spec, format, appenderFactory, outputFileFactory, io, targetFileSizeBytes, schema, + flinkSchema, equalityFieldIds); } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java index 65a3ca92c4d0..2e005ea1bd8b 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java @@ -80,7 +80,9 @@ public RowDataRewriter(Table table, boolean caseSensitive, FileIO io, Encryption encryptionManager, Long.MAX_VALUE, format, - table.properties()); + table.properties(), + /* TODO does not support rewriting with delete files.*/ + Lists.newArrayList()); } public List rewriteDataForTasks(DataStream dataStream, int parallelism) { @@ -131,7 +133,7 @@ public List map(CombinedScanTask task) throws Exception { RowData rowData = iterator.next(); writer.write(rowData); } - return Lists.newArrayList(writer.complete()); + return Lists.newArrayList(writer.complete().dataFiles()); } catch (Throwable originalThrowable) { try { LOG.error("Aborting commit for (subTaskId {}, attemptId {})", subTaskId, attemptId); diff --git a/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java index dc14fb683a91..439080f31c07 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java +++ b/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java @@ -28,6 +28,7 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.RowKind; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.iceberg.DataFile; @@ -39,7 +40,7 @@ import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.IcebergGenerics; import org.apache.iceberg.data.Record; -import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory; +import org.apache.iceberg.flink.sink.FlinkFileAppenderFactory; import org.apache.iceberg.hadoop.HadoopInputFile; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.CloseableIterable; @@ -94,6 +95,10 @@ public static RowData createRowData(Integer id, String data) { return GenericRowData.of(id, StringData.fromString(data)); } + public static RowData createEqDelete(Integer id, String data) { + return GenericRowData.ofKind(RowKind.DELETE, id, StringData.fromString(data)); + } + public static DataFile writeFile(Schema schema, PartitionSpec spec, Configuration conf, String location, String filename, List rows) throws IOException { @@ -102,8 +107,8 @@ public static DataFile writeFile(Schema schema, PartitionSpec spec, Configuratio Preconditions.checkNotNull(fileFormat, "Cannot determine format for file: %s", filename); RowType flinkSchema = FlinkSchemaUtil.convert(schema); - FileAppenderFactory appenderFactory = new RowDataTaskWriterFactory.FlinkFileAppenderFactory(schema, - flinkSchema, ImmutableMap.of()); + FileAppenderFactory appenderFactory = + new FlinkFileAppenderFactory(schema, flinkSchema, ImmutableMap.of(), spec, null); FileAppender appender = appenderFactory.newAppender(fromPath(path, conf), fileFormat); try (FileAppender closeableAppender = appender) { diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java index 80771a4d233f..51e5e1cd1ab2 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java @@ -317,7 +317,7 @@ private OneInputStreamOperatorTestHarness createIcebergStream private OneInputStreamOperatorTestHarness createIcebergStreamWriter( Table icebergTable, TableSchema flinkSchema) throws Exception { - IcebergStreamWriter streamWriter = FlinkSink.createStreamWriter(icebergTable, flinkSchema); + IcebergStreamWriter streamWriter = FlinkSink.createStreamWriter(icebergTable, flinkSchema, null); OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness<>( streamWriter, 1, 1, 0); diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java index 60e6cefd45c0..a9e942875ea4 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Set; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.hadoop.conf.Configuration; @@ -31,15 +32,24 @@ import org.apache.hadoop.fs.Path; import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.RowDelta; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.data.IcebergGenerics; import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.RowDataWrapper; import org.apache.iceberg.flink.SimpleDataUtil; import org.apache.iceberg.flink.data.RandomRowData; import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.io.WriterResult; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.StructLikeSet; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -48,6 +58,11 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import static org.apache.iceberg.flink.SimpleDataUtil.ROW_TYPE; +import static org.apache.iceberg.flink.SimpleDataUtil.SCHEMA; +import static org.apache.iceberg.flink.SimpleDataUtil.createEqDelete; +import static org.apache.iceberg.flink.SimpleDataUtil.createRowData; + @RunWith(Parameterized.class) public class TestTaskWriters { private static final Configuration CONF = new Configuration(); @@ -85,7 +100,9 @@ public void before() throws IOException { path = folder.getAbsolutePath(); // Construct the iceberg table with the specified file format. - Map props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()); + Map props = ImmutableMap.of( + TableProperties.DEFAULT_FILE_FORMAT, format.name(), + TableProperties.FORMAT_VERSION, "2"); table = SimpleDataUtil.createTable(path, props, partitioned); } @@ -94,13 +111,13 @@ public void testWriteZeroRecord() throws IOException { try (TaskWriter taskWriter = createTaskWriter(TARGET_FILE_SIZE)) { taskWriter.close(); - DataFile[] dataFiles = taskWriter.complete(); + DataFile[] dataFiles = taskWriter.complete().dataFiles(); Assert.assertNotNull(dataFiles); Assert.assertEquals(0, dataFiles.length); // Close again. taskWriter.close(); - dataFiles = taskWriter.complete(); + dataFiles = taskWriter.complete().dataFiles(); Assert.assertNotNull(dataFiles); Assert.assertEquals(0, dataFiles.length); } @@ -109,13 +126,13 @@ public void testWriteZeroRecord() throws IOException { @Test public void testCloseTwice() throws IOException { try (TaskWriter taskWriter = createTaskWriter(TARGET_FILE_SIZE)) { - taskWriter.write(SimpleDataUtil.createRowData(1, "hello")); - taskWriter.write(SimpleDataUtil.createRowData(2, "world")); + taskWriter.write(createRowData(1, "hello")); + taskWriter.write(createRowData(2, "world")); taskWriter.close(); // The first close taskWriter.close(); // The second close int expectedFiles = partitioned ? 2 : 1; - DataFile[] dataFiles = taskWriter.complete(); + DataFile[] dataFiles = taskWriter.complete().dataFiles(); Assert.assertEquals(expectedFiles, dataFiles.length); FileSystem fs = FileSystem.get(CONF); @@ -128,11 +145,11 @@ public void testCloseTwice() throws IOException { @Test public void testAbort() throws IOException { try (TaskWriter taskWriter = createTaskWriter(TARGET_FILE_SIZE)) { - taskWriter.write(SimpleDataUtil.createRowData(1, "hello")); - taskWriter.write(SimpleDataUtil.createRowData(2, "world")); + taskWriter.write(createRowData(1, "hello")); + taskWriter.write(createRowData(2, "world")); taskWriter.abort(); - DataFile[] dataFiles = taskWriter.complete(); + DataFile[] dataFiles = taskWriter.complete().dataFiles(); int expectedFiles = partitioned ? 2 : 1; Assert.assertEquals(expectedFiles, dataFiles.length); @@ -147,16 +164,16 @@ public void testAbort() throws IOException { @Test public void testCompleteFiles() throws IOException { try (TaskWriter taskWriter = createTaskWriter(TARGET_FILE_SIZE)) { - taskWriter.write(SimpleDataUtil.createRowData(1, "a")); - taskWriter.write(SimpleDataUtil.createRowData(2, "b")); - taskWriter.write(SimpleDataUtil.createRowData(3, "c")); - taskWriter.write(SimpleDataUtil.createRowData(4, "d")); + taskWriter.write(createRowData(1, "a")); + taskWriter.write(createRowData(2, "b")); + taskWriter.write(createRowData(3, "c")); + taskWriter.write(createRowData(4, "d")); - DataFile[] dataFiles = taskWriter.complete(); + DataFile[] dataFiles = taskWriter.complete().dataFiles(); int expectedFiles = partitioned ? 4 : 1; Assert.assertEquals(expectedFiles, dataFiles.length); - dataFiles = taskWriter.complete(); + dataFiles = taskWriter.complete().dataFiles(); Assert.assertEquals(expectedFiles, dataFiles.length); FileSystem fs = FileSystem.get(CONF); @@ -191,7 +208,7 @@ public void testRollingWithTargetFileSize() throws IOException { List records = Lists.newArrayListWithCapacity(8000); for (int i = 0; i < 2000; i++) { for (String data : new String[] {"a", "b", "c", "d"}) { - rows.add(SimpleDataUtil.createRowData(i, data)); + rows.add(createRowData(i, data)); records.add(SimpleDataUtil.createRecord(i, data)); } } @@ -200,7 +217,7 @@ public void testRollingWithTargetFileSize() throws IOException { taskWriter.write(row); } - DataFile[] dataFiles = taskWriter.complete(); + DataFile[] dataFiles = taskWriter.complete().dataFiles(); Assert.assertEquals(8, dataFiles.length); AppendFiles appendFiles = table.newAppend(); @@ -217,13 +234,13 @@ public void testRollingWithTargetFileSize() throws IOException { @Test public void testRandomData() throws IOException { try (TaskWriter taskWriter = createTaskWriter(TARGET_FILE_SIZE)) { - Iterable rows = RandomRowData.generate(SimpleDataUtil.SCHEMA, 100, 1996); + Iterable rows = RandomRowData.generate(SCHEMA, 100, 1996); for (RowData row : rows) { taskWriter.write(row); } taskWriter.close(); - DataFile[] dataFiles = taskWriter.complete(); + DataFile[] dataFiles = taskWriter.complete().dataFiles(); AppendFiles appendFiles = table.newAppend(); for (DataFile dataFile : dataFiles) { appendFiles.appendFile(dataFile); @@ -235,11 +252,115 @@ public void testRandomData() throws IOException { } } + @Test + public void testWriteEqualityDelete() throws IOException { + if (format == FileFormat.ORC) { + return; + } + + try (TaskWriter taskWriter = createEqDeleteTaskWriter(TARGET_FILE_SIZE)) { + taskWriter.write(createRowData(1, "aaa")); + taskWriter.write(createRowData(2, "bbb")); + + taskWriter.write(createEqDelete(1, "aaa")); + taskWriter.write(createEqDelete(2, "bbb")); + + taskWriter.write(createRowData(3, "ccc")); + + WriterResult result = taskWriter.complete(); + Assert.assertEquals(result.dataFiles().length, partitioned ? 3 : 1); + Assert.assertEquals(result.deleteFiles().length, partitioned ? 4 : 2); + commitTransaction(result); + + assertTableRecords(Sets.newHashSet(createRowData(3, "ccc"))); + } + + try (TaskWriter taskWriter = createEqDeleteTaskWriter(TARGET_FILE_SIZE)) { + taskWriter.write(createEqDelete(3, "ccc")); + + WriterResult result = taskWriter.complete(); + Assert.assertEquals(result.dataFiles().length, 0); + Assert.assertEquals(result.deleteFiles().length, 1); + commitTransaction(result); + + assertTableRecords(ImmutableSet.of()); + } + } + + @Test + public void testEqualityDeleteSameRow() throws IOException { + if (format == FileFormat.ORC) { + return; + } + + try (TaskWriter taskWriter = createEqDeleteTaskWriter(TARGET_FILE_SIZE)) { + taskWriter.write(createRowData(1, "aaa")); + taskWriter.write(createEqDelete(1, "aaa")); + taskWriter.write(createRowData(1, "aaa")); + taskWriter.write(createEqDelete(1, "aaa")); + taskWriter.write(createRowData(1, "aaa")); + + WriterResult result = taskWriter.complete(); + Assert.assertEquals(result.dataFiles().length, 1); + Assert.assertEquals(result.deleteFiles().length, 2); + commitTransaction(result); + + assertTableRecords(ImmutableSet.of(createRowData(1, "aaa"))); + } + + try (TaskWriter taskWriter = createEqDeleteTaskWriter(TARGET_FILE_SIZE)) { + taskWriter.write(createRowData(1, "aaa")); + + WriterResult result = taskWriter.complete(); + Assert.assertEquals(result.dataFiles().length, 1); + Assert.assertEquals(result.deleteFiles().length, 0); + commitTransaction(result); + + assertTableRecords(ImmutableSet.of(createRowData(1, "aaa"), createRowData(1, "aaa"))); + } + } + + private void commitTransaction(WriterResult result) { + RowDelta rowDelta = table.newRowDelta(); + + for (DataFile dataFile : result.dataFiles()) { + rowDelta.addRows(dataFile); + } + + for (DeleteFile deleteFile : result.deleteFiles()) { + rowDelta.addDeletes(deleteFile); + } + + rowDelta.commit(); + } + + private void assertTableRecords(Set expectedRowDataSet) { + StructLikeSet expectedSet = StructLikeSet.create(SCHEMA.asStruct()); + for (RowData rowData : expectedRowDataSet) { + RowDataWrapper wrapper = new RowDataWrapper(ROW_TYPE, SCHEMA.asStruct()); + expectedSet.add(wrapper.wrap(rowData)); + } + + StructLikeSet actualSet = StructLikeSet.create(SCHEMA.asStruct()); + Iterables.addAll(actualSet, IcebergGenerics.read(table).build()); + Assert.assertEquals(expectedSet, actualSet); + } + + private TaskWriter createEqDeleteTaskWriter(long targetFileSize) { + List equalityFieldIds = Lists.newArrayList(table.schema().findField("id").fieldId()); + + return createTaskWriter(targetFileSize, equalityFieldIds); + } + private TaskWriter createTaskWriter(long targetFileSize) { + return createTaskWriter(targetFileSize, null); + } + + private TaskWriter createTaskWriter(long targetFileSize, List equalityFieldIds) { TaskWriterFactory taskWriterFactory = new RowDataTaskWriterFactory(table.schema(), (RowType) SimpleDataUtil.FLINK_SCHEMA.toRowDataType().getLogicalType(), table.spec(), table.locationProvider(), table.io(), table.encryption(), - targetFileSize, format, table.properties()); + targetFileSize, format, table.properties(), equalityFieldIds); taskWriterFactory.initialize(1, 1); return taskWriterFactory.create(); } diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java index b116faff23d1..0c341bcc5186 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java +++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java @@ -20,17 +20,18 @@ package org.apache.iceberg.flink.source; import java.io.IOException; -import java.util.HashMap; import java.util.List; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.TestMergingMetrics; import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.RowDataConverter; -import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory; +import org.apache.iceberg.flink.sink.FlinkFileAppenderFactory; import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; public class TestFlinkMergingMetrics extends TestMergingMetrics { @@ -39,8 +40,8 @@ protected FileAppender writeAndGetAppender(List records) throws RowType flinkSchema = FlinkSchemaUtil.convert(SCHEMA); FileAppender appender = - new RowDataTaskWriterFactory.FlinkFileAppenderFactory(SCHEMA, flinkSchema, new HashMap<>()).newAppender( - org.apache.iceberg.Files.localOutput(temp.newFile()), FileFormat.PARQUET); + new FlinkFileAppenderFactory(SCHEMA, flinkSchema, Maps.newHashMap(), PartitionSpec.unpartitioned(), null) + .newAppender(org.apache.iceberg.Files.localOutput(temp.newFile()), FileFormat.PARQUET); try (FileAppender fileAppender = appender) { records.stream().map(r -> RowDataConverter.convert(SCHEMA, r)).forEach(fileAppender::add); } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index fc35adc63f18..ac86c73de947 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -32,7 +32,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Files; -import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -49,6 +48,7 @@ import org.apache.iceberg.hadoop.HadoopInputFile; import org.apache.iceberg.hadoop.HadoopOutputFile; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.DeleteUtil; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; @@ -58,7 +58,6 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.util.ArrayUtil; import org.apache.parquet.HadoopReadOptions; import org.apache.parquet.ParquetReadOptions; @@ -379,7 +378,6 @@ public EqualityDeleteWriter buildEqualityWriter() throws IOException { appenderBuilder.build(), FileFormat.PARQUET, location, spec, partition, keyMetadata, equalityFieldIds); } - public PositionDeleteWriter buildPositionWriter() throws IOException { Preconditions.checkState(equalityFieldIds == null, "Cannot create position delete file using delete field ids"); @@ -387,12 +385,7 @@ public PositionDeleteWriter buildPositionWriter() throws IOException { if (rowSchema != null && createWriterFunc != null) { // the appender uses the row schema wrapped with position fields - appenderBuilder.schema(new org.apache.iceberg.Schema( - MetadataColumns.DELETE_FILE_PATH, - MetadataColumns.DELETE_FILE_POS, - NestedField.optional( - MetadataColumns.DELETE_FILE_ROW_FIELD_ID, "row", rowSchema.asStruct(), - MetadataColumns.DELETE_FILE_ROW_DOC))); + appenderBuilder.schema(DeleteUtil.posDeleteSchema(rowSchema)); appenderBuilder.createWriterFunc(parquetSchema -> { ParquetValueWriter writer = createWriterFunc.apply(parquetSchema); @@ -404,9 +397,7 @@ public PositionDeleteWriter buildPositionWriter() throws IOException { }); } else { - appenderBuilder.schema(new org.apache.iceberg.Schema( - MetadataColumns.DELETE_FILE_PATH, - MetadataColumns.DELETE_FILE_POS)); + appenderBuilder.schema(DeleteUtil.pathPosSchema()); appenderBuilder.createWriterFunc(parquetSchema -> new PositionDeleteStructWriter((StructWriter) GenericParquetWriter.buildWriter(parquetSchema))); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java index c17a2ffa173d..86bcd77e778b 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java @@ -562,6 +562,7 @@ public Stream metrics() { } public static class PositionDeleteStructWriter extends StructWriter> { + public PositionDeleteStructWriter(StructWriter replacedWriter) { super(Arrays.asList(replacedWriter.writers)); } diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java index 37ca56c700a4..7077cb89efc5 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java @@ -98,7 +98,7 @@ private List rewriteDataForTask(CombinedScanTask task) throws Exceptio task, schema, schema, nameMapping, io.value(), encryptionManager.value(), caseSensitive); StructType structType = SparkSchemaUtil.convert(schema); - SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, schema, structType); + SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, schema, structType, spec); OutputFileFactory fileFactory = new OutputFileFactory( spec, format, locations, io.value(), encryptionManager.value(), partitionId, taskId); @@ -120,7 +120,7 @@ private List rewriteDataForTask(CombinedScanTask task) throws Exceptio dataReader = null; writer.close(); - return Lists.newArrayList(writer.complete()); + return Lists.newArrayList(writer.complete().dataFiles()); } catch (Throwable originalThrowable) { try { diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java index 3383fe7c29f7..a63535b3a5d7 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java @@ -23,9 +23,15 @@ import java.util.Map; import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.io.DataWriter; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.OutputFile; @@ -41,11 +47,13 @@ class SparkAppenderFactory implements FileAppenderFactory { private final Map properties; private final Schema writeSchema; private final StructType dsSchema; + private final PartitionSpec spec; - SparkAppenderFactory(Map properties, Schema writeSchema, StructType dsSchema) { + SparkAppenderFactory(Map properties, Schema writeSchema, StructType dsSchema, PartitionSpec spec) { this.properties = properties; this.writeSchema = writeSchema; this.dsSchema = dsSchema; + this.spec = spec; } @Override @@ -86,4 +94,24 @@ public FileAppender newAppender(OutputFile file, FileFormat fileFor throw new RuntimeIOException(e); } } + + @Override + public DataWriter newDataWriter(EncryptedOutputFile file, FileFormat fileFormat, + StructLike partition) { + return new DataWriter<>( + newAppender(file.encryptingOutputFile(), fileFormat), fileFormat, + file.encryptingOutputFile().location(), spec, partition, file.keyMetadata()); + } + + @Override + public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile outputFile, FileFormat format, + StructLike partition) { + throw new UnsupportedOperationException("Cannot create equality-delete writer for spark now."); + } + + @Override + public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile outputFile, FileFormat format, + StructLike partition) { + throw new UnsupportedOperationException("Cannot create pos-delete writer for spark now."); + } } diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkParquetMergingMetrics.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkParquetMergingMetrics.java index e49a18be5338..5ce6ac78c9f0 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkParquetMergingMetrics.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkParquetMergingMetrics.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.List; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.TestMergingMetrics; import org.apache.iceberg.data.Record; import org.apache.iceberg.io.FileAppender; @@ -34,7 +35,8 @@ public class TestSparkParquetMergingMetrics extends TestMergingMetrics writeAndGetAppender(List records) throws IOException { FileAppender appender = - new SparkAppenderFactory(new HashMap<>(), SCHEMA, SparkSchemaUtil.convert(SCHEMA)).newAppender( + new SparkAppenderFactory(new HashMap<>(), SCHEMA, SparkSchemaUtil.convert(SCHEMA), + PartitionSpec.unpartitioned()).newAppender( org.apache.iceberg.Files.localOutput(temp.newFile()), FileFormat.PARQUET); try (FileAppender fileAppender = appender) { records.stream().map(r -> new StructInternalRow(SCHEMA.asStruct()).setStruct(r)).forEach(fileAppender::add); diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java index b3c8e3134490..4ff22704ef15 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java @@ -106,8 +106,8 @@ protected Record writeAndRead(String desc, Schema writeSchema, Schema readSchema // When tables are created, the column ids are reassigned. Schema tableSchema = table.schema(); - try (FileAppender writer = new GenericAppenderFactory(tableSchema).newAppender( - localOutput(testFile), format)) { + try (FileAppender writer = new GenericAppenderFactory(tableSchema, PartitionSpec.unpartitioned()) + .newAppender(localOutput(testFile), format)) { writer.add(record); } diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java b/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java index 6b6739a06a25..167f0c0d97bd 100644 --- a/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java +++ b/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java @@ -266,7 +266,7 @@ static class WriterFactory implements DataWriterFactory { public DataWriter createDataWriter(int partitionId, long taskId, long epochId) { OutputFileFactory fileFactory = new OutputFileFactory( spec, format, locations, io.value(), encryptionManager.value(), partitionId, taskId); - SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, writeSchema, dsSchema); + SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, writeSchema, dsSchema, spec); if (spec.fields().isEmpty()) { return new Unpartitioned24Writer(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize); @@ -288,7 +288,7 @@ private static class Unpartitioned24Writer extends UnpartitionedWriter writer = new GenericAppenderFactory(tableSchema).newAppender( - localOutput(testFile), fileFormat)) { + try (FileAppender writer = new GenericAppenderFactory(tableSchema, PartitionSpec.unpartitioned()) + .newAppender(localOutput(testFile), fileFormat)) { writer.addAll(records); } diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index efbc319197f0..8f001d7f5e93 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -416,7 +416,7 @@ public DataWriter createWriter(int partitionId, long taskId) { public DataWriter createWriter(int partitionId, long taskId, long epochId) { OutputFileFactory fileFactory = new OutputFileFactory( spec, format, locations, io.value(), encryptionManager.value(), partitionId, taskId); - SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, writeSchema, dsSchema); + SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, writeSchema, dsSchema, spec); if (spec.fields().isEmpty()) { return new Unpartitioned3Writer(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize); } else { @@ -437,7 +437,7 @@ private static class Unpartitioned3Writer extends UnpartitionedWriter writer = new GenericAppenderFactory(tableSchema).newAppender( - localOutput(testFile), fileFormat)) { + try (FileAppender writer = new GenericAppenderFactory(tableSchema, PartitionSpec.unpartitioned()) + .newAppender(localOutput(testFile), fileFormat)) { writer.addAll(records); }