Skip to content

Conversation

@openinx
Copy link
Member

@openinx openinx commented Nov 24, 2020

@rdblue I've continued the work from here #1802 , and implemented a flink's RowDataTaskWriter to accept both insert rows and equality deletions.

public class SortedPosDeleteWriter<T> implements Closeable {
private static final int RECORDS_NUM_THRESHOLD = 1000_000;

private final Map<CharSequence, List<Long>> posDeletes = Maps.newHashMap();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could just put the pair <path, pos> into a fixed array, that seems more memory efficient ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be memory efficient if all of the paths that are equal are also the same reference. I like how using a map doesn't have that assumption, but it probably isn't a big concern.

this.dataWriter = new RollingFileWriter(partition);

this.enableEqDelete = equalityFieldIds != null && !equalityFieldIds.isEmpty();
if (enableEqDelete) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use a delta writer if eq deletes are disabled?

I typically like to use classes that don't need to check configuration in a tight loop. This setting introduces at least one check per row. I'd prefer using either a normal task writer or a delta writer depending on whether deletes are expected in the stream.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use a delta writer if eq deletes are disabled?

Because I only want to expose the BaseDeltaWriter to compute engines, I planed to make the BaseRollingWriter & RollingFileWriter & RollingEqDeleteWriter to be private. To implement the compute-engine specific TaskWriter, the only thing we need to do is implementing the asKey and asCopiedKey methods and customizing the policy to dispatch records to DeltaWriter.

// Adding a pos-delete to replace the old filePos.
FilePos previous = insertedRowMap.put(copiedKey, filePos);
if (previous != null) {
posDeleteWriter.delete(previous.path, previous.rowOffset, null /* TODO set non-nullable row*/);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would this set the row? Would we need to keep track of it somehow?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The straightforward way is adding a row field in FilePos which will reference to the inserted old row, but that will hold references of all the inserted rows in a checkpoint. If the row is large while equality fields are small, then the idea way should only keep the equality fields & file-pos in the insertedRowMap, but if we want to attach row when writing pos-delete file then the memory consumption is an issue. I'm considering that maybe we will need an embedded KV lib which could split to disk in future.

@Override
public void close() throws IOException {
// Moving the completed data files into task writer's completedFiles automatically.
dataWriter.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: dataWriter should be set to null so that it can be garbage collected and so any further calls to write will fail.

private SortedPosDeleteWriter<T> posDeleteWriter = null;
private StructLikeMap<FilePos> insertedRowMap = null;

public BaseDeltaWriter(PartitionKey partition, List<Integer> equalityFieldIds, Schema schema) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The list equalityFieldIds is only used in this constructor and it is used to create a projection of the schema that is passed in. I think it would be better to pass the delete schema or null in, so that we don't need each writer to create a new projection of the row schema.

MetadataColumns.DELETE_FILE_POS);
}

public static Schema pathPosSchema(Schema rowSchema) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one doesn't need to be public.

}
}
} catch (IOException e) {
throw new UncheckedIOException(e);
Copy link
Contributor

@rdblue rdblue Nov 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I always like to include whatever context is available. Here, it may be helpful to know which partition writer failed. What about using throw new UncheckedIOException("Failed to write position delete file for partition " + partitionKey, e)

}

public List<DeleteFile> complete() {
flush();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect this to call close rather than flush. While close just calls flush so they are equivalent right now, I think using close is better in the long term. If close is modified in the future, it is unlikely that someone will go here and make the same change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, make sense ! It's better to use close in this complete method.

}
for (DeleteFile deleteFile : result.deleteFiles()) {
add(deleteFile);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than copying the loops, could this call addDataFiles and addDeleteFiles?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dataFiles() will return an array (I defined it as an array because wanting to avoid the serialization issue for ImmutableList), while the current addDataFiles would only accept Iterable<DataFile>. I think I can add a addDataFiles(DataFile... dataFiles).


@Override
public Set<StructLike> keySet() {
return wrapperMap.keySet().stream().map(StructLikeWrapper::get).collect(Collectors.toSet());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should use a StructLikeSet, or else there is no expectation that the returned set will function properly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. I will provide a full unit test to cover all those map API.


@Override
public Set<Entry<StructLike, T>> entrySet() {
throw new UnsupportedOperationException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be implemented.

It isn't too difficult to implement Map.Entry with a getKey method that calls StructLikeWrapper::get. This method is commonly called on maps. It's even called from putAll above in this class.


@Override
public boolean containsValue(Object value) {
throw new UnsupportedOperationException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't this just delegate?

  @Override
  public boolean containsValue(Object value) {
    return wrapperMap.containsValue(value);
  }

public class SortedPosDeleteWriter<T> implements Closeable {
private static final int RECORDS_NUM_THRESHOLD = 1000_000;

private final Map<CharSequence, List<Long>> posDeletes = Maps.newHashMap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to use a new CharSequenceMap or Map<CharSequenceWrapper, List<Long>>.

@Override
public EqualityDeleteWriter<Record> newEqDeleteWriter(EncryptedOutputFile outputFile, FileFormat format,
StructLike partition) {
throw new UnsupportedOperationException("Cannot create equality-delete writer for generic record now.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not implement this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expected to provide the implemented methods with complete coverage unit tests. But for the purpose providing the PoC solution as soon as possible, I did't have the time to write those tests so left them unsupported now.

@Parameterized.Parameters(name = "format = {0}")
public static Object[] parameters() {
return new Object[] { "parquet", "avro" };
return new Object[] {"parquet", "avro"};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this line doesn't need to change. Can you revert it to avoid commit conflicts?

.buildPositionWriter();

case PARQUET:
RowType flinkParquetRowType = FlinkSchemaUtil.convert(DeleteUtil.posDeleteSchema(rowSchema));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should separately fix the schema that gets passed to the createWriterFunc. That's a bug.

RowType flinkParquetRowType = FlinkSchemaUtil.convert(DeleteUtil.posDeleteSchema(rowSchema));

return Parquet.writeDeletes(outputFile.encryptingOutputFile())
.createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(flinkParquetRowType, msgType))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this will work because the writer that is returned will be wrapped by a PositionDeleteStructWriter. That would duplicate the position delete struct because this is going to produce a writer for it as well. That's why the schema passed here is a bug, like I noted above.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this needs to be fixed. I'm looking through how Avro actually works right now and it is okay because we're calling setSchema a second time from the position writer, which basically discards the original writer with the extra record and rebuilds it with just the row schema.

I'll open a PR to fix it.

public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
// close all open files and emit files to downstream committer operator
for (DataFile dataFile : writer.complete()) {
for (DataFile dataFile : writer.complete().dataFiles()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this also check that there are no delete files?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it should be emit the deleteFiles to downstream, and commit both delete files and data files into iceberg table by RowDelta API. I think that would be a separate PR to address this.

}

public static class PositionDeleteStructWriter<R> extends StructWriter<PositionDelete<R>> {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this file doesn't need to change.

}

@Override
public EqualityDeleteWriter<InternalRow> newEqDeleteWriter(EncryptedOutputFile outputFile, FileFormat format,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think one of the first commits to get this in could make these small changes to both Spark and Flink to update the appender factory. We could do the same for generics as well.


@Override
protected StructLike asKey(RowData row) {
return rowDataWrapper.wrap(row);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a bug. It doesn't extract the key fields, it just wraps the row as a StructLike. It should extract the equality fields to produce a key, probably using StructProjection.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the tests work because the ID column is the first column in the record.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests work because the StructLikeMap has a StructLikeWrapper which will only compare the equality fields, even here we provided the full columns. The name asKey and asCopiedKey are not inappropriate here, asStructLike and asCopiedStructLike will be better.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests work because the StructLikeMap has a StructLikeWrapper which will only compare the equality fields

But this happens using the key schema and fields are accessed by position. Wouldn't that fail if the key schema wasn't a prefix of the row schema?

public void write(RowData row) throws IOException {
RowDataDeltaWriter deltaWriter;

if (spec().fields().size() <= 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use spec.isUnpartitioned().

case DELETE:
case UPDATE_BEFORE:
deltaWriter.delete(row);
break;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with UPDATE_AFTER or UPDATE_BEFORE. Can you help me understand what's going on here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For CDC events, such as mysql binlog: UPDATE test set a=2, b=2 where a=1, b=1, the flink-cdc-connectors will produce two change logs for it:

-U (a=1, b=1)
+U (a=2,b=2)

The first RowData means we will need to delete the old row (1,1), it's also called a UPDATE_BEFORE row. The second RowData means we will need to insert the new row (2,2), it's also called a UPDATE_AFTER row.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What distinguishes UPDATE_BEFORE from DELETE? Is it that it will be associated with UPDATE_AFTER so the two are combined to form a replace operation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it that it will be associated with UPDATE_AFTER so the two are combined to form a replace operation?

Yes, that's right.

Tasks.foreach(deltaWriterMap.values())
.throwFailureWhenFinished()
.noRetry()
.run(deltaWriter -> {
Copy link
Contributor

@rdblue rdblue Nov 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can supply a checked exception class that will be thrown. That way, you can move the try/catch outside of the run block:

  @Override
  public void close() {
    try {
      Tasks.foreach(deltaWriterMap.values())
          .throwFailureWhenFinished()
          .noRetry()
          .run(RowDataDeltaWriter::close, IOException.class);
    } catch (IOException e) {
      throw new UncheckedIOException("Failed to close writers", e);
    }
  }

And you can also add a callback that receives the exception for each failure with onFailure if you want to log the exceptions individually.


@Test
public void testWriteEqualityDelete() throws IOException {
if (format == FileFormat.ORC) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use Assume.assumeTrue? That way it doesn't look like ORC is passing. It shows up as skipped.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, make sense !

@rdblue
Copy link
Contributor

rdblue commented Nov 25, 2020

@openinx, this looks great to me!

I found a few issues to address, but I think the general design and structure is good. Should we start breaking it into smaller commits to get the changes into master?

@openinx
Copy link
Member Author

openinx commented Dec 24, 2020

All this work has been merged in several individual PR, I plan to close this PR now.

@openinx openinx closed this Dec 24, 2020
@openinx openinx deleted the flink-cdc-writers branch December 24, 2020 10:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants