-
Notifications
You must be signed in to change notification settings - Fork 3k
Core:Remove unnecessary row filtering in deleted manifest file #4316
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
34d2c06 to
3fe16db
Compare
kbendick
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you so much for the patch @hililiwei!
And thank you for expanding on the issue in the other PR from @xloya. Can you please link that PR and possibly restate some of the facts from over there so reviewers have more of the facts in one place? I’ll DM you to not add too much unneeded stuff to the PR.
Given the situation doesn’t occur with Avro files, I’d like to get input from others on the best way to resolve this.
This seems like the solution to go with, but I’d like to get input from others on whether there is possibly a better way to compose this with existing Predicate and related interfaces.
But overall this is really great work. And thank you to @xloya for opening the original PR to bring attention to this issue.
cc @szehon-ho @stevenzwu (when you get a chance)
flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
Outdated
Show resolved
Hide resolved
flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
Outdated
Show resolved
Hide resolved
api/src/main/java/org/apache/iceberg/expressions/ExpressionRemove.java
Outdated
Show resolved
Hide resolved
|
We seem to have the same issues with @xloya. Here are some comments from #4311
|
70d46f2 to
4889692
Compare
Can you help me understand what's happening? Are you saying that the lower/upper bounds in the delete file for other columns are not accurate for the deleted rows? That's what it sounds like when you say the bounds are "new" -- are these values for the upserted row columns rather than the deleted row columns? |
|
I'm going to try to explain what happened, which may be a bit long, 😄 prerequisite:
id INT NOT NULL,
province STRING NOT NULL,
dt DATE,
PRIMARY KEY(id,province) NOT ENFORCED
PARTITIONED BY (province)
WITH
('write.format.default'='PARQUET',
'write.upsert.enabled'='true',
'format-version'='2')
INSERT INTO test_upsert_query
VALUES
(1, 'a', TO_DATE('2022-03-01')),
(2, 'b', TO_DATE('2022-03-01')),
(1, 'b', TO_DATE('2022-03-01'))Two manifest files are generated:
m1.avo is a delete manifest file, view it: Notice Execute upsert: INSERT INTO test_upsert_query
VALUES
(4, 'a', TO_DATE('2022-03-02')),
(5, 'b', TO_DATE('2022-03-02')),
(1, 'b', TO_DATE('2022-03-02'))The dt of (1,'b') is updated to '2022-03-02' (:19053) . Check again. The following two manifest files are displayed::
This time, we're still just looking at c3fd1626-d26f-4067-b4b0-a245d59a0615-m1.avro: Now, the value of dt (key=3) in If we query data at this time: SELECT * FROM test_upsert_query WHERE dt < '2022-03-02'During the query, the manifest file is filtered based on the values of metric So in the result, it will contain In this PR, I tried to trim the filter predicate, and only the field in If the process and result of our analysis are wrong, please do not hesitate to tell me. |
|
Thanks, @hililiwei! Great repro case. I just went through this with @kbendick and we have a slightly different fix to prevent this with future tables. He's also going to open a PR with a fix for existing tables that we'll ping you to review. |
It's the least I could do. Looking forward to your PR. Please feel free to let me know if there's anything I can do to help. 😄 |
…ert mode by updating equalityDeleteSchema Co-authored-by: hililiwei <hililiwei@gmail.com>
|
@kbendick and I are trying to solve this, ref: kbendick#71 |
4889692 to
3abe264
Compare
3abe264 to
23836a0
Compare
|
Hi, @kbendick, based on our previous discussion, I've raised a preliminary solution. It deletes data based on the key. Please take a look at it. Thx. |
| case UPDATE_AFTER: | ||
| if (upsert) { | ||
| writer.delete(row); | ||
| RowData wrap = RowDataProjection.create(schema, deleteSchema).wrap(row); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't create a new projection every time. Instead it should create one and reuse it by calling wrap every time.
| writer.deleteKey(wrap); | ||
| } else { | ||
| writer.delete(row); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this need to change? I think you just want to fix the upsert case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addtionally, in the upsert case, data doesn't come through as UPDATE_BEFORE. Though this might be needed to keep CDC data in check.
We've been workin on the PR in my fork but I'll run some tests.
| return fieldGetter[index].getFieldOrNull(struct); | ||
| if (struct.isNullAt(index)) { | ||
| return null; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did this need to change? NPE in a test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is an NPE in some test cases, yes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However, I'm going to investigate a bit furhter as I do think it might be indicative of a bug.
I think if we use the correct deletion schema in all cases, the NPEs will go away. I am testing now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or actually, I don't think this change is needed. If there's no fieldGetter for a given index, that's likely indicative of a bug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this didn't need to be changed if we use the full schema as the deletion schema outside of upsert case like here https://github.com/apache/iceberg/pull/4364/files#diff-bbdfbcbd83d2e6f53d402e804dcd0a6fd28ab39d1cc3f74a88641ae8763fde3bR75-R87
| this.schema = schema; | ||
| this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)); | ||
| this.wrapper = new RowDataWrapper(flinkSchema, schema.asStruct()); | ||
| this.wrapperDelete = new RowDataWrapper(FlinkSchemaUtil.convert(deleteSchema), deleteSchema.asStruct()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about keyWrapper instead?
| return wrapper; | ||
| } | ||
|
|
||
| RowDataWrapper wrapperDelete() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is there an accessor for this?
| // TODO provide the ability to customize the equality-delete row schema. | ||
| this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec, | ||
| ArrayUtil.toIntArray(equalityFieldIds), schema, null); | ||
| ArrayUtil.toIntArray(equalityFieldIds), TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)), null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the correct schema for upsert but should not be used for delete when the row passed to the delete file is the deleted row.
|
|
||
| RowDataWrapper wrapperDelete() { | ||
| return wrapperDelete; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you revert the changes in all modules other than 1.14? That makes reviewing and updating for review comments much easier. Once this goes in we can backport to 1.12 and 1.13.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah agreed. I left a similar comment on another file because the changes were a bit much.
It keeps the discussion of a specific change all in one place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, I believe that @hililiwei made the changes as existing tests might not pass in earlier versions of Flink.
But for something important, we should still keep the changes in one PR while reviewing. Otherwise it's difficult for others to review.
rdblue
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall this looks good, but we don't want to change how Flink handles CDC deletes or updates. This should be a narrower change that only applies to upserts.
| if (struct.isNullAt(index)) { | ||
| return null; | ||
| } | ||
| return this.fieldGetter[index].getFieldOrNull(struct); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep the work to just Flink 1.14 for now, so comments don't get duplicated on many things.
After we update the data via
upsert, if we use the field not in theidentifierFieldIdsto query the data, we may get inaccurate result.Because the metrics values ( such as
upper_bounds\lower_bounds)of non-identifierFieldIds in the manifest file are new, and these new data may be hit by therow filter, this will cause the update to the old data to not take effect, even though it has beenupsert, old data is still fetched and put into the result set.In this PR, try to come up with a solution. When filtering the delete manifest file, if the
identifierFieldIdsis not empty, only the fields in theidentifierFieldIdsare retained in therow filter, always keepTruefor filter predicate of non-identifierFieldIds fields.