-
Notifications
You must be signed in to change notification settings - Fork 3k
Support for Flink's SpeculativeExecution in batch execution mode #10548
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
becketqin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@venkata91 Thanks for the patch. Left a comment. It looks like there is a simpler approach.
...link/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
Show resolved
Hide resolved
...link/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
Outdated
Show resolved
Hide resolved
bcc1550 to
347e25a
Compare
|
cc @stevenzwu for review. Should this change also be made in other Flink versions like Flink-1.17 and Flink-1.18? |
347e25a to
7bcfdb2
Compare
|
@venkata91: How can we be sure that the tests are exercising the speculative execution code path? Does any of the tests reads some splits multiple times, and use the result of the faster one? I think it would be useful to have a test demonstrating that the behavior works, to prevent disabling it by an unrelated change by accident. |
Sure sounds good. Will add a test. |
@pvary Added an integration test to verify the tasks are speculated and produces the expected output. PTAL. btw, should this change also be made in other Flink versions like Flink-1.17 and Flink-1.18? |
|
Gentle ping for reviews cc @pvary @stevenzwu Thanks! |
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpecExecSupport.java
Outdated
Show resolved
Hide resolved
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpecExecSupport.java
Show resolved
Hide resolved
| } | ||
|
|
||
| @Test | ||
| public void testSpeculativeExecution() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my local tests this passes even without the changes in AbstractIcebergEnumerator.
Does that mean, that the speculative execution is supported even without this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch!
Looks like I didn't set table.exec.iceberg.use-flip27-source to true. With this, without the changes in AbstractIcebergEnumerator, it will fail with an exception saying:
The split enumerator StaticIcebergEnumerator must implement SupportsHandleExecutionAttemptSourceEvent to be used in concurrent execution attempts scenario (e.g. if speculative execution is enabled"
|
@pvary should this change be ported to other flink versions like |
In a follow-up backport pr |
|
Merged to main. Could you please create the backport PR to the other Flink versions? The PR could be generated like this: |
@pvary Should we have 2 backport PRs one for 1.17 and 1.18 or is it fine to do it in a single PR? |
A single PR would be fine, as we don't expect any serious changes to review |
…che#10548) (cherry picked from commit 91585e9)
(cherry picked from commit 84c9125)
…che#10548) (cherry picked from commit 91585e9)
(cherry picked from commit 84c9125)
Summary
Add support for Flink's Speculative Execution in batch execution mode
Testing
Added an integration test to verify the expected speculative execution behavior with IcebergSource