-
Notifications
You must be signed in to change notification settings - Fork 3k
Hive: OutputCommitter implementation for Hive writes #1861
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@marton-bod: Could you please review? |
marton-bod
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @pvary ! looks great - just a few questions
mr/src/main/java/org/apache/iceberg/mr/hive/ClosedFileData.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/hive/ClosedFileData.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/hive/LocationHelper.java
Outdated
Show resolved
Hide resolved
287ccc4 to
9c6451c
Compare
mr/src/main/java/org/apache/iceberg/mr/hive/ClosedFileData.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
Outdated
Show resolved
Hide resolved
|
|
||
| // Reading the committed files. The assumption here is that the taskIds are generated in sequential order | ||
| // starting from 0. | ||
| Tasks.range(expectedFiles) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also like to explicitly set the failure behavior. This should probably use throwFailureWhenFinished() and stopOnFailure() because this can't continue if any task fails.
Not using stopOnFailure is for tasks like cleaning up files, where each task should at least attempt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added throwFailureWhenFinished, but did not used stopOnFailure. This way I was able to reuse the code for abort and for commit. Since this is only for the exception handing I think this could be an acceptable compromise
mr/src/main/java/org/apache/iceberg/mr/hive/LocationHelper.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
Outdated
Show resolved
Hide resolved
|
|
||
| if (dataFiles.size() > 0) { | ||
| // Appending data files to the table | ||
| AppendFiles append = table.newAppend(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a blocker since this is scoped to appends only, but can we detect when the user called INSERT OVERWRITE at least?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to do it in another PR. I do not see how specific writes could be handled with SerDe-s. This might be trivial, but since I have too many things in progress I prefer to not to delve into a new issue before closing up some already open tasks.
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/hive/LocationHelper.java
Outdated
Show resolved
Hide resolved
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| class HiveIcebergRecordWriter extends org.apache.hadoop.mapreduce.RecordWriter<NullWritable, Container> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: Since this lives in Iceberg, we can probably remove Iceberg from the class names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to have it in the name of the classes. When we have to debug code in Hive it could be very helpful to see that we are looking at an Iceberg class without examining the package first.
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
Outdated
Show resolved
Hide resolved
| Path toDelete = new Path(file); | ||
| FileSystem fs = Util.getFs(toDelete, jobContext.getJobConf()); | ||
| try { | ||
| fs.delete(toDelete, true /* recursive */); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This won't fail if the path doesn't exist, but if you delete just the file directly, you'd want to check that it exists first, since some tasks may not have finished and committed and have aborted instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there are runaway tasks they might write into the temp directory even after the job is finished.
So I feel that it is ok to clean this as a best effort. Do you agree?
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
Outdated
Show resolved
Hide resolved
|
Big thanks @rdblue for reviewing the PR even on Saturday! Really appreciate it! Your comments were really useful as usual. Updated the PR so if you have time again I would love to hear you thoughts. Thanks, |
|
Looks like the last update accidentally included a few .crc files that are temp data from tests. Could you clean up the additions? |
Of course. I have removed the extra files. |
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
Outdated
Show resolved
Hide resolved
|
@pvary, it looks like this now includes commits you didn't intend to add. Can you take a look at your branch? I think it's about ready to merge (we can fix the remaining issues later). |
905b99a to
80dd89d
Compare
Addressed other review comments as well.
…ctory instead Removed LocationHelper - added the methods to HiveIcebergOutputCommitter instead since the methods are not used elsewhere anymore
1d40559 to
1741c37
Compare
|
@rdblue: Cleaned up the commits, and rebased the patch. If you have time could you please check out this PR? |
| .onFailure((file, exc) -> LOG.debug("Failed on to remove directory {} on cleanup job", file, exc)) | ||
| .run(file -> { | ||
| Path toDelete = new Path(file); | ||
| FileSystem fs = Util.getFs(toDelete, jobContext.getJobConf()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I would prefer if cleanup happened by removing the expected task commit files one-by-one rather than deleting a directory because it could use FileIO. I understand that this is intended to drop the folder as well for stores that track folders. Maybe a follow-up to add a deletePrefix to FileIO would fix it.
|
@pvary, looks great! I merged this. Thanks for separating these into smaller PRs, it really helps keep the reviews manageable. |
|
Thanks for the review and the merge @rdblue! I have learned a lot during the review! |
Separated out the HiveIcebergOutputCommitter related stuff from #1407 so it is easier to review. HiveIcebergRecordWriter was also needed since the 2 are tightly coupled.