-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[BEAM-12740] [BEAM-8268] Improve error handling and retries for GCS rename used by FileBasedSink #15301
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
chamikaramj
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we still need removeTemporaryFiles() to remove the temporary directory for batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
adding back, I think we need this as well if we are filtering if the destination exists because we still need to remove the source in that case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might make sense to update the semantics of "StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS" to delete the source during rename. That will allow us to prevent the double delete for the case where the source existed.
Also probably removeTemporaryFiles() should be updated to just cleanup the temporary directory (where appropriate) instead of trying to delete already renamed files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I modified FileSystems.rename to delete srcs that existed but were filtered due to dest existing.
I kept the existing methods in FileBasedSink because it appears they are designed to be called by subclasses. I changed to pass an empty set for known files after the rename to avoid the unnecessary delete.
sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
Outdated
Show resolved
Hide resolved
sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
Outdated
Show resolved
Hide resolved
...oogle-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
Outdated
Show resolved
Hide resolved
...oogle-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
Outdated
Show resolved
Hide resolved
8d30e0a to
e288ade
Compare
|
PTAL, I removed the hacks for testing since the performance seemed greatly improved. |
chamikaramj
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. LGTM other than one comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should raise an UnsupportedOperationexception for this and other FileSystems when MoveOptions that are not fully supported are used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, changed FileSystems to catch the exception and retry without move options to keep existing behavior for filesystems that don't support options.
I also fixed bug where I wasn't forwarding on the move options as I thought to the filesystem in FileSystems.rename ( and thus wasn't using the improved error handling in GcsFileSystem)
|
Run Java PreCommit |
|
Run Java_Examples_Dataflow PreCommit |
|
Run Java_Examples_Dataflow_Java11 PreCommit |
|
Not sure why word count failed, but re-trying filed tests. |
…ename operations Previously exceptions such as timeouts or unavailable for a single file would not be caught and would cause retries and backoff of the entire DoFn processing. Additionally plumb the options to ignore missing sources through to underlying FileSystem.rename so that file systems can handle such errors intelligently per-file. The filtering in FileSystems.rename to remove such files is beneficial because it may use cheaper match operations but it is insufficient as previous deletes observed as timeouts may delete files between the match and rename. Further reduce operations by modifying FileBasedSink to not delete files that have been renamed.
|
LGTM. Thanks. |
|
Run Java PreCommit |
1 similar comment
|
Run Java PreCommit |
|
Run Java_Examples_Dataflow PreCommit |
|
Run Java_Examples_Dataflow_Java11 PreCommit |
|
Run Java PreCommit |
|
Run Java_Examples_Dataflow PreCommit |
|
Run Java_Examples_Dataflow_Java11 PreCommit |
|
Run Java_Examples_Dataflow PreCommit |
|
Run Java_Examples_Dataflow_Java11 PreCommit |
|
@chamikaramj @lukecwik Tests are passing now, I think this is ready to merge |
|
Thanks. Yeah, we can merge. |
Previously exceptions such as timeouts or unavailable for a single file would not be caught
and would cause retries and backoff of the entire DoFn processing.
Additionally plumb the options to ignore missing sources through to underlying FileSystem.rename
so that file systems can handle such errors intelligently per-file. The filtering in FileSystems.rename
to remove such files is beneficial because it may use cheaper match operations but it is insufficient
as previous deletes observed as timeouts may delete files between the match and rename.
Further reduce operations by modifying FileBasedSink to not delete files that have been renamed.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username).[BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replaceBEAM-XXXwith the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
ValidatesRunnercompliance status (on master branch)Examples testing status on various runners
Post-Commit SDK/Transform Integration Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.