-
Notifications
You must be signed in to change notification settings - Fork 3k
Core: Add data and delete writers in FileAppenderFactory. #1836
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java
Show resolved
Hide resolved
| .buildPositionWriter(); | ||
|
|
||
| default: | ||
| throw new UnsupportedOperationException("Cannot write unknown file format: " + format); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should be "unsupported" rather than "unknown" because ORC is known, but not supported.
|
|
||
| public GenericAppenderFactory(Schema schema) { | ||
| public GenericAppenderFactory(Schema schema, PartitionSpec spec) { | ||
| this(schema, spec, null, schema, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not set eqDeleteRowSchema to null since equalityFieldIds is null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, it's more reasonable to set the eqDeleteRowSchema to be null when equalityFieldIds is null.
| FileAppender<Record> appender = new GenericAppenderFactory(table.schema()).newAppender( | ||
| private static DataFile appendToLocalFile(Table table, File file, FileFormat format, StructLike partition, | ||
| List<Record> records, PartitionSpec spec) throws IOException { | ||
| FileAppender<Record> appender = new GenericAppenderFactory(table.schema(), spec).newAppender( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we keep the constructor this was using before so we don't need to change any tests that only use newAppender? There are 4 files just here that don't appear like they need to change just to add the spec that won't be used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I've counted at least 10 files that would not need to change if we kept the original constructor.
|
|
||
|
|
||
| public <T> PositionDeleteWriter<T> buildPositionWriter() throws IOException { | ||
| public <T> PositionDeleteWriter<T> buildPositionWriter(ParquetValueWriters.PathPosAccessor<?, ?> accessor) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rather not add this to the build method. Nothing distinguishes it from other options, so I think we should add transformations as configuration methods, like we do for equalityFieldIds.
I'm also thinking that it would be good to have a more light-weight way to add these transforms. Rather than an additional accessor that has two abstract methods, why not just register functions? It could look like this:
Avro.writeDeletes(outFile)
...
.transformPaths(StringData::fromString)
.buildPositionWriter();That way it's easier to use a method reference rather than creating a class. And nothing actually needs to transform pos yet, so we can just leave that out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea about registering a light-weight function to convert the CharSequence to StringData.
| return null; | ||
| } | ||
|
|
||
| Record record = GenericRecord.create(table.spec().schema()).copy(ImmutableMap.of("data", "aaa")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this could just be table.schema(). No need to go through the spec to get the table schema.
|
|
||
| List<T> deletes = Lists.newArrayList( | ||
| createRow(1, "aaa"), | ||
| createRow(3, "bbb"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the data here is bbb instead of ccc on purpose, then could you add a comment that this is testing that just id is used for comparison?
| createRow(2, "bbb"), | ||
| createRow(4, "ddd") | ||
| ); | ||
| Assert.assertEquals("Should have the expected records", expectedRowSet(expected), actualRowSet("*")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of reading from the table, I would rather see a test that the equality delete file contains the expected row data. In this case, it should not contain the data column. I would like to see that checked. And it would be good to add a case where the whole original row is written to the file.
| createRow(2, "bbb"), | ||
| createRow(4, "ddd") | ||
| ); | ||
| Assert.assertEquals("Should have the expected records", expectedRowSet(expected), actualRowSet("*")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to above, I think this should check that only the path and position columns are written to the file and that they are the expected values. The test below should check that the row column is present and set correctly for each row.
|
|
||
| StructType structType = SparkSchemaUtil.convert(schema); | ||
| SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, schema, structType); | ||
| SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, schema, structType, spec); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here I use the constructor that has spec argument because I believe we will use the DataWriter to append records once we switch to the RollingFileWriter. https://github.com/apache/iceberg/pull/1818/files#diff-fc9a9fd84d24c607fd85e053b08a559f56dd2dd2a46f1341c528e7a0269f873cR263.
| private StructLike partition = null; | ||
| private EncryptionKeyMetadata keyMetadata = null; | ||
| private int[] equalityFieldIds = null; | ||
| private Function<CharSequence, ?> pathTransformFunc = t -> t; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I think it would be better to use Function.identity().
|
|
||
| public <T> PositionDeleteWriter<T> buildPositionWriter() throws IOException { | ||
| public <T> PositionDeleteWriter<T> buildPositionWriter() | ||
| throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this line need to change? I'm fine removing the empty line, but I think throws can still fit on the previous line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can revert it.
| appenderBuilder.createWriterFunc(parquetSchema -> | ||
| new PositionDeleteStructWriter<T>((StructWriter<?>) GenericParquetWriter.buildWriter(parquetSchema))); | ||
| new PositionDeleteStructWriter<T>((StructWriter<?>) GenericParquetWriter.buildWriter(parquetSchema), | ||
| t -> t)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this pass pathTransformFunc as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't pass pathTransformFunc here, because in this path we will use GenericParquetWriter (Rather than FlinkParquetWriter or SparkParquetWriter) to write the PositionDelete, if convert the path CharSequence to StringData, the GenericParquetWriter could not find the correct writer to write values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's good to use Function.identity() here.
rdblue
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 overall. There is only one issue, which is the case where the new transform func isn't passed in Parquet. I'm not sure whether that is a bug, so please have a look.
Once that's done, please merge!
|
All check passed, let me merge this PR, thanks @rdblue for reviewing. |
This is an PR which is divided from this : #1818.