Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
6de9d80
Abstract the TaskWriter to accept both INSERT and DELETE records.
openinx Oct 23, 2020
ea1a2d4
Introduce DataFileWriterFactory and EqualityDeleteWriterFactory
openinx Oct 26, 2020
a7e2e69
Refactor the BaseTaskWriter to use the generic ContentFileWriterFactory.
openinx Oct 26, 2020
7f06d23
Introduce the MixedTaskWriter to accept both INSERT and DELETE records.
openinx Oct 26, 2020
7e4fcf2
Minor fixes
openinx Oct 27, 2020
6f742f0
Add contentFiles in TaskWriterResult.
openinx Oct 28, 2020
9a58c5c
temp
openinx Oct 30, 2020
47972c8
Introduce the BasePartitionWriter and StructLikeMap
openinx Nov 5, 2020
8b7bb62
temp2
openinx Nov 5, 2020
f29e37d
Revert "temp2"
openinx Nov 5, 2020
cce17ce
Revert "Introduce the BasePartitionWriter and StructLikeMap"
openinx Nov 5, 2020
b286bdd
Revert "temp"
openinx Nov 5, 2020
c0f4983
Revert "Add contentFiles in TaskWriterResult."
openinx Nov 5, 2020
ddb326a
Revert "Minor fixes"
openinx Nov 5, 2020
f8c072c
Revert "Introduce the MixedTaskWriter to accept both INSERT and DELET…
openinx Nov 5, 2020
1cb68ad
Make PositionDeleteWriter to be a ContentFileWriter.
openinx Nov 5, 2020
e41fb8e
Introduce a separate RollingContentFileWriter to decouple with TaskWr…
openinx Nov 5, 2020
b8c4795
Introdue DeltaWriter to write insert and delete rows.
openinx Nov 5, 2020
54673c1
Implement GenericDeltaWriter.
openinx Nov 6, 2020
360bc4b
Fix the broken unit tests.
openinx Nov 6, 2020
af75785
Rebase to master.
openinx Nov 6, 2020
b83ac21
Add more unit tests for GenericDeltaWriter.
openinx Nov 6, 2020
a374fd6
Fix the broken unit tests.
openinx Nov 9, 2020
11e61de
Add more unit tests.
openinx Nov 9, 2020
9deb59d
Add flink delta writers.
openinx Nov 9, 2020
0195145
Remove the useless FlinkEqualityDeleterFactory.
openinx Nov 9, 2020
3449d87
Fix the broken TestTaskWriters.
openinx Nov 9, 2020
69b2eb2
Integrate the flink sink with delta writer.
openinx Nov 9, 2020
ee87c08
Fix the broken TestIcebergSourceHadoopTables3 tests
openinx Nov 10, 2020
fb2e6f8
Unfied the flink file appender factory.
openinx Nov 10, 2020
18a6808
Add equality field ids to the FlinkSink builder.
openinx Nov 10, 2020
0788b63
Flink: commit transaction with both data files and delete files.
openinx Nov 10, 2020
0b8ded6
Fix the broken unit tests.
openinx Nov 11, 2020
d432d3f
Minor changes.
openinx Nov 11, 2020
65971bf
Minor fixes
openinx Nov 11, 2020
fb556e4
Addressing comments.
openinx Nov 19, 2020
7ce7bc5
Separate the posDeleteRowSchema and eqDeleteRowSchema.
openinx Nov 19, 2020
dd94466
Fix the compile errors after rebase.
openinx Nov 19, 2020
3ba759b
Create the asCopiedKey and asKey in BaseDeltaWriter to avoid messing …
openinx Nov 19, 2020
957e3be
Create a RollingPosDeleteWriter to simpify the position delete writers.
openinx Nov 19, 2020
72940c3
Minor fixes
openinx Nov 20, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ project(':iceberg-flink') {
testCompile project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
testCompile project(path: ':iceberg-api', configuration: 'testArtifacts')
testCompile project(path: ':iceberg-data', configuration: 'testArtifacts')
testCompile project(path: ':iceberg-core', configuration: 'testArtifacts')

// By default, hive-exec is a fat/uber jar and it exports a guava library
// that's really old. We use the core classifier to be able to override our guava
Expand Down
48 changes: 48 additions & 0 deletions core/src/main/java/org/apache/iceberg/ContentFileWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg;

import java.io.Closeable;
import java.util.Iterator;

public interface ContentFileWriter<T, R> extends Closeable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little hesitant to add this. I think it is needed so that the same RollingContentFileWriter can be used for delete files and data files, but this introduces a lot of changes and new interfaces just to share about 20 lines of code. I'm not sure that it is worth the extra complexity, when compared to having one for data files and one for delete files.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my throught, the whole write workflow should be in the following:


                                               TaskWriter
                                                   |
                                                   |
                                                   |
                          --------------------------------------------------
                          |                                                |
                          |                                                |
                          V                                                V
                     DeltaWriter                                     DeltaWriter
                    (Partition-1)                                    (Partition-2)
                          |                                  
                          |
    ------------------------------------------------
   |                      |                        |
   |                      |                        |
   V                      V                        V
RollingFileWriter    RollingFileWriter      RollingFileWriter
 (Pos-Delete)          (Insert)              (Equality-Delete)
                           |
                           |
                           V
              -----------------------------
              |             |             |
              |             |             |
              |             |             |
              V             V             V
         FileAppender    FileAppender    ...
          (closed)        (Openning)
            

For each executor/task in compute engine, it have a TaskWriter to write
generic record. If it use the fanout policy to write records then it will
have multiple DeltaWriters and each one will write records for a single
partition, while if use the grouped polciy in spark then we might just have one
DeltaWriter in TaskWriter. The DeltaWriter could accept both INSERT/EQ-DELETE/POS-DELETE
records, each kind of record we will have a RollingFileWriter which will roll its file appender
to a newly opened file appender once its size reach the threshold.

In the RollingFileWriter, we should have the same logic. So in theory it's good to define an abstracted ContentFileWriter so that we don't have to define three kinds of RollingFileWriter.
Another way is to define a BaseRollingFileWriter and put the common logic there, then the DeltaWriter would use the BaseRollingFileWriter. when constructing the DeltaWriter, we would need to pass those subclasses PosDeleteRollingFileWriter, EqDeleteRollingFileWriter, DataRollingFileWriter to it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another way is to define a BaseRollingFileWriter and put the common logic there, then the DeltaWriter would use the BaseRollingFileWriter. when constructing the DeltaWriter, we would need to pass those subclasses PosDeleteRollingFileWriter, EqDeleteRollingFileWriter, DataRollingFileWriter to it.

In this way, we will need to created the PosDeleteRollingFileWriter, EqDeleteRollingFileWriter, DataRollingFileWriter for different engines, for example FlinkPosDeleteRollingWriter, SparkPosDeleteRollingWriter because different engines need to contruct different FileAppenders to convert the specified data type into the unified parquet/orc/avro files. I think that won't be less complexity compared to current solution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we would require a different class for each engine. The file writers are currently created using an AppenderFactory and we could continue using that.

Also, we would not need 3 rolling writers that are nearly identical. We would only need 2 because the position delete writer will be substantially different because of its sort order requirement.

Because deletes may come in any order relative to inserts and we need to write out a sorted delete file, we will need to buffer the deletes in memory. That's not as expensive as it seems at first because the file location will be reused (multiple deletes in the same data file) and so the main cost is the number of positions that get deleted, which is the number of rows written and deleted in the same checkpoint per partition. The position delete writer and logic to roll over to a new file is probably not going to be shared. I don't think I would even build a rolling position delete writer unless we see real cases where it is needed.

That leaves just the equality delete writer and the data file writer. I think it would be cleaner to just have two rolling file writers because the rolling logic is so small. I went ahead and started a PR to show what it would look like: #1802

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read the PR, here is my feeling:

First of all, I like the idea to use a single FileAppenderFactory to customize different writers for different computing engines, it's unified and graceful. Developers would find it's easy to understand and customize.

Second, I agreed that it's necessary to consider the sort order for position delete writer. We have to sort the pairs in memory (it's similar to the process about flushing the sorted memstore to HFiles in HBase.), once our memory size reached the threshold then would flush it to pos-delete file, then the rolling policy is decided by the memory size rather than the current file size. Sounds reasonable to make it to be a separate pos-writer.

Third, I'd like to finish the whole cdc write path work (PoC) based on #1802 to see whether there're other issues.

Thanks.


void write(R record);

default void writeAll(Iterator<R> values) {
while (values.hasNext()) {
write(values.next());
}
}

default void writeAll(Iterable<R> values) {
writeAll(values.iterator());
}

/**
* Returns the length of this file.
*/
long length();

/**
* Return a {@link ContentFile} which is either {@link DeleteFile} or {@link DataFile}.
*/
T toContentFile();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg;

import org.apache.iceberg.encryption.EncryptedOutputFile;

/**
* Factory to create a new {@link ContentFileWriter} to write INSERT or DELETE records.
*
* @param <T> Content file type, it's either {@link DataFile} or {@link DeleteFile}.
* @param <R> data type of the rows to write.
*/
public interface ContentFileWriterFactory<T, R> {

/**
* Create a new {@link ContentFileWriter}
*
* @param partitionKey an partition key to indicate which partition the rows will be written. Null if it's
* unpartitioned.
* @param outputFile an OutputFile used to create an output stream.
* @param fileFormat File format.
* @return a newly created {@link ContentFileWriter}
*/
ContentFileWriter<T, R> createWriter(PartitionKey partitionKey, EncryptedOutputFile outputFile,
FileFormat fileFormat);
}
79 changes: 79 additions & 0 deletions core/src/main/java/org/apache/iceberg/DataFileWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg;

import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

public class DataFileWriter<T> implements ContentFileWriter<DataFile, T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much of this duplicates FileAppender. Should we use that interface instead and add toDataFile to it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, because FileAppender is the basic writer which would be shared by all of the data file writer, equality-delete writer, position-delete writer, even the manifest writer is based on it. So apparently putting the toDataFile to FileAppender is not a good choice.

private final FileAppender<T> appender;
private final FileFormat format;
private final String location;
private final PartitionKey partitionKey;
private final PartitionSpec spec;
private final ByteBuffer keyMetadata;
private DataFile dataFile = null;

public DataFileWriter(FileAppender<T> appender, FileFormat format,
String location, PartitionKey partitionKey, PartitionSpec spec,
EncryptionKeyMetadata keyMetadata) {
this.appender = appender;
this.format = format;
this.location = location;
this.partitionKey = partitionKey; // set null if unpartitioned.
this.spec = spec;
this.keyMetadata = keyMetadata != null ? keyMetadata.buffer() : null;
}

@Override
public void write(T row) {
appender.add(row);
}

@Override
public long length() {
return appender.length();
}

@Override
public DataFile toContentFile() {
Preconditions.checkState(dataFile != null, "Cannot create data file from unclosed writer");
return dataFile;
}

@Override
public void close() throws IOException {
if (dataFile == null) {
appender.close();
this.dataFile = DataFiles.builder(spec)
.withEncryptionKeyMetadata(keyMetadata)
.withFormat(format)
.withPath(location)
.withFileSizeInBytes(appender.length())
.withPartition(partitionKey) // set null if unpartitioned
.withMetrics(appender.metrics())
.withSplitOffsets(appender.splitOffsets())
.build();
}
}
}
16 changes: 3 additions & 13 deletions core/src/main/java/org/apache/iceberg/avro/Avro.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@
import org.apache.avro.io.Encoder;
import org.apache.avro.specific.SpecificData;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.deletes.DeletesUtil;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.deletes.PositionDeleteWriter;
Expand All @@ -53,7 +53,6 @@
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.util.ArrayUtil;

import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION;
Expand Down Expand Up @@ -301,21 +300,12 @@ public <T> PositionDeleteWriter<T> buildPositionWriter() throws IOException {
meta("delete-type", "position");

if (rowSchema != null && createWriterFunc != null) {
// the appender uses the row schema wrapped with position fields
appenderBuilder.schema(new org.apache.iceberg.Schema(
MetadataColumns.DELETE_FILE_PATH,
MetadataColumns.DELETE_FILE_POS,
NestedField.optional(
MetadataColumns.DELETE_FILE_ROW_FIELD_ID, "row", rowSchema.asStruct(),
MetadataColumns.DELETE_FILE_ROW_DOC)));
appenderBuilder.schema(DeletesUtil.pathPosSchema(rowSchema));

appenderBuilder.createWriterFunc(
avroSchema -> new PositionAndRowDatumWriter<>(createWriterFunc.apply(avroSchema)));

} else {
appenderBuilder.schema(new org.apache.iceberg.Schema(
MetadataColumns.DELETE_FILE_PATH,
MetadataColumns.DELETE_FILE_POS));
appenderBuilder.schema(DeletesUtil.pathPosSchema());

appenderBuilder.createWriterFunc(ignored -> new PositionDatumWriter());
}
Expand Down
53 changes: 53 additions & 0 deletions core/src/main/java/org/apache/iceberg/deletes/DeletesUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.deletes;

import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.Types;

public class DeletesUtil {

private DeletesUtil() {
}

public static Schema pathPosSchema() {
return new Schema(
MetadataColumns.DELETE_FILE_PATH,
MetadataColumns.DELETE_FILE_POS);
}

public static Schema pathPosSchema(Schema rowSchema) {
Preconditions.checkNotNull(rowSchema, "Row schema should not be null when constructing the pos-delete schema.");

// the appender uses the row schema wrapped with position fields
return new Schema(
MetadataColumns.DELETE_FILE_PATH,
MetadataColumns.DELETE_FILE_POS,
Types.NestedField.required(
MetadataColumns.DELETE_FILE_ROW_FIELD_ID, "row", rowSchema.asStruct(),
MetadataColumns.DELETE_FILE_ROW_DOC));
}

public static Schema posDeleteSchema(Schema rowSchema) {
return rowSchema == null ? pathPosSchema() : pathPosSchema(rowSchema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

package org.apache.iceberg.deletes;

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.iceberg.ContentFileWriter;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileMetadata;
Expand All @@ -31,7 +31,7 @@
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

public class EqualityDeleteWriter<T> implements Closeable {
public class EqualityDeleteWriter<T> implements ContentFileWriter<DeleteFile, T> {
private final FileAppender<T> appender;
private final FileFormat format;
private final String location;
Expand All @@ -53,11 +53,13 @@ public EqualityDeleteWriter(FileAppender<T> appender, FileFormat format, String
this.equalityFieldIds = equalityFieldIds;
}

public void deleteAll(Iterable<T> rows) {
appender.addAll(rows);
@Override
public long length() {
return appender.length();
}

public void delete(T row) {
@Override
public void write(T row) {
appender.add(row);
}

Expand All @@ -77,7 +79,8 @@ public void close() throws IOException {
}
}

public DeleteFile toDeleteFile() {
@Override
public DeleteFile toContentFile() {
Preconditions.checkState(deleteFile != null, "Cannot create delete file from unclosed writer");
return deleteFile;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@

package org.apache.iceberg.deletes;

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Set;
import org.apache.iceberg.ContentFileWriter;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileMetadata;
Expand All @@ -33,7 +33,7 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.CharSequenceSet;

public class PositionDeleteWriter<T> implements Closeable {
public class PositionDeleteWriter<T> implements ContentFileWriter<DeleteFile, PositionDelete<T>> {
private final FileAppender<StructLike> appender;
private final FileFormat format;
private final String location;
Expand Down Expand Up @@ -85,7 +85,18 @@ public Set<CharSequence> referencedDataFiles() {
return pathSet;
}

public DeleteFile toDeleteFile() {
@Override
public void write(PositionDelete<T> record) {
delete(record.path(), record.pos(), record.row());
}

@Override
public long length() {
return appender.length();
}

@Override
public DeleteFile toContentFile() {
Preconditions.checkState(deleteFile != null, "Cannot create delete file from unclosed writer");
return deleteFile;
}
Expand Down
Loading