-
Notifications
You must be signed in to change notification settings - Fork 3k
Flink: FlinkSink & IcebergSink desynchronized tests alignment #11249
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: FlinkSink & IcebergSink desynchronized tests alignment #11249
Conversation
|
@rodmeneses @pvary please take a look |
|
@pvary @stevenzwu |
|
This one looks good to me. |
…-unit-tests-alignment
@pvary can you add @rodmeneses as a reviewer or look at this PR on your own? This is very non-controversial change. Only cleanup of unused code and synchronization of improvements in the code that are already done in other places. |
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Branch.java
Show resolved
Hide resolved
| assertThat(partitionFiles("aaa")) | ||
| .as("There should be only 1 data file in partition 'aaa'") | ||
| .isEqualTo(1); | ||
| assertThat(partitionFiles("bbb")) | ||
| .as("There should be only 1 data file in partition 'bbb'") | ||
| .isEqualTo(1); | ||
| assertThat(partitionFiles("ccc")) | ||
| .as("There should be only 1 data file in partition 'ccc'") | ||
| .isEqualTo(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do the comments in this file add any value above the ones we can read from the code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't answer on this question because I'm not up-to-date with assertj. I copied it from this PR: https://github.com/apache/iceberg/pull/10179/files#diff-42c31c3c7e3e8287ea451196f6c0b4d0a47aa931391df70369a87ed3d19a1452R221 @rodmeneses copied this code for SinkV2 and (I assume) forgot to backport it to the original source of the code. We can synchronize it in the opposite direction (remove these explanations from duplicated places) if we find out that it doesn't add a value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.. I'm fine in both ways, and I agree with the sync effort.
@rodmeneses: What is the history of this difference? Is this intentional?
| // Use schema identifier field IDs as equality field id list by default | ||
| assertThat(SinkUtil.checkAndGetEqualityFieldIds(table, null)) | ||
| .containsExactlyInAnyOrderElementsOf(table.schema().identifierFieldIds()); | ||
|
|
||
| // Use user-provided equality field column as equality field id list | ||
| builder.equalityFieldColumns(Lists.newArrayList("id")); | ||
| assertThat(SinkUtil.checkAndGetEqualityFieldIds(table, Lists.newArrayList("id"))) | ||
| .containsExactlyInAnyOrder(table.schema().findField("id").fieldId()); | ||
|
|
||
| assertThat(SinkUtil.checkAndGetEqualityFieldIds(table, Lists.newArrayList("type"))) | ||
| .containsExactlyInAnyOrder(table.schema().findField("type").fieldId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this feature have anything to do with the SinkV2? Shall we create a different test for SinkUtil?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TestIcebergSinkV2 is a copy of TestFlinkIcebergSinkV2 (which tests V1-based FlinkSink). For some reasons, these checks weren't copied. At first glance it looks like most of tests is this class don't check anything related with SinkV2. Take a look that most of test cases use TestFlinkIcebergSinkV2Base.testChangeLogs which uses FlinkSink (V1 based). My proposition of plan is:
- Synchronize the code duplication
- Replace duplicated classes with parametrized test case
- Extract things that are not related with either V1 or V2 sinks to some other classes as you suggested
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rodmeneses: Isn't this a mistake in the tests? Shouldn't these tests use something like testChangeLogsV2 and use the new sink?
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions. |
Summary
Currently, the unit tests for
IcebergSink(based on the Flink SinkV2 interface) have a code duplication fromFlinkSinktests. In theIcebergSinktests there were done some code enhancements that haven't been migrated toFlinkSinktests.The scope of this change is to migrate these enhancements.
List of enhancements:
Flink.checkAndGetEqualityFieldIdshas been replaced bySinkUtil.checkAndGetEqualityFieldIdsas("description")has been added to some assertionsOther improvements that were done:
testOperatorsUidNameWitUidSuffixTestFlinkIcebergSinkV2BranchandTestFlinkIcebergSinkV2BaseextendedTestFlinkIcebergSinkV2Basewhich has test parameters defined but they weren't used. Instead of that, were used the default, initial values of fields. After this change, test parameters are only defined in leaf classesTestFlinkIcebergSinkV2Base.testChangeLogshas akeySelectorparameter that wasn't used anywhere - this change removes it. To discuss: Are tests based on this selector correctly checking what was intended?