release mmap immediately after merge indexes#6699
release mmap immediately after merge indexes#6699kaijianding wants to merge 3 commits intoapache:masterfrom
Conversation
| return indexIO.loadIndex(input); | ||
| } | ||
| catch (IOException e) { | ||
| Throwables.propagate(e); |
There was a problem hiding this comment.
Throwables.propagate() must not be used without throw before it. See https://github.com/google/guava/wiki/Why-we-deprecated-Throwables.propagate#propagate-is-magic. This method must not be used in new code. (Note: there are already 39 occurrences of this bug in Druid, #6701)
clintropolis
left a comment
There was a problem hiding this comment.
Hi @kaijianding, could you please explain a bit more clearly how this helps to resolve the issue you've encountered? It isn't obvious to me from the description or added code how this change would help, all that is clear is that it puts a (probably insignificant) bit more pressure onto the heap during merge time by having a 2nd set of QueryableIndex objects. Thinking about what is going on here, the only thing I can come up with is that this patch would ensure that any 64k decompression buffers allocated to read the segment columns during a merge would be surrendered back to the compression pool after merge is complete - if they aren't already (I'm not certain of this detail, investigating). When your process got killed by the oom killer, did you have a chance to look at the size of the direct buffer objects in a dump as well as composition in general?
Overall though, I don't think the amount of direct memory usage required would change significantly (unless the buffers from the compression pool are in fact held until handoff), since the mechanics of how merge is done is not really modified. Also I don't believe mmap footprint wouldn't be impacted in any way afaik because the OS would share the same physical memory locations for mapped pages that are in use by both query and merge, and query would still need those segments until handoff is completed to be able to keep serving the data. This part shouldn't be playing a role directly in the oom killer anyway, though thrashing on disk if free space grows too small from ballooning heap or direct memory usage, could be slowing things down overall - causing further pressure on heap or direct memory as things pile up.
Thanks!
|
@clintropolis here is how things happen.
Direct memory is not a problem here, direct memory is released after each merge success/fail. |
|
The added code in this PR, is to close the QueryableIndex after merge success/fail to explicitly release mmap and other side-effect objects. And these QueryableIndexes are different objects to the ones used by queries, so it's safe. |
| @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory | ||
| ) throws IOException; | ||
|
|
||
| File mergeSegmentFiles( |
There was a problem hiding this comment.
Please add javadoc with the rationale for this method, and the difference from the most closely related one, mergeQueryableIndex().
| return mergeQueryableIndex(indexes, rollup, metricAggs, outDir, indexSpec, new BaseProgressIndicator(), segmentWriteOutMediumFactory); | ||
| } | ||
| finally { | ||
| for (QueryableIndex index : indexes) { |
There was a problem hiding this comment.
Use Closer to close many objects.
|
|
||
| mergedFile = indexMerger.mergeQueryableIndex( | ||
| indexes, | ||
| File[] hydrantDirs = persistDir.listFiles( |
There was a problem hiding this comment.
listFiles() is an unsafe API, it may return null. Use Files.list() instead. Due to this, it's also more convenient to change the new IndexMerger's new method parameter type to List<Path>.
| @Override | ||
| public boolean accept(File dir, String fileName) | ||
| { | ||
| // To avoid reading and listing of "merged" dir |
There was a problem hiding this comment.
The comment is unclear, please elaborate
There was a problem hiding this comment.
"hydrantFiles" maybe? or those files are actually directories? Then IndexMerger.mergeSegmentFiles() should be called mergeSegmentDirs().
| mergedFile = indexMerger.mergeQueryableIndex( | ||
| indexes, | ||
| File[] hydrantDirs = persistDir.listFiles( | ||
| new FilenameFilter() |
There was a problem hiding this comment.
Consider extracting the repeated fragment as a method
| } | ||
|
|
||
| mergedFile = indexMerger.mergeQueryableIndex( | ||
| indexes, |
There was a problem hiding this comment.
@kaijianding I agree with @clintropolis, maybe this PR is incomplete, and you planned to free those indexes? In it's current form the PR doesn't free any extra resources that were (probably unnecessarily) held before by the worker process.
There was a problem hiding this comment.
The QueryableIndexes are always new objects loaded from file, and closed when IndexMerger.mergeSegmentFiles() is done. This behavior frees the mmap usage increased during the merge process.
Currently we use indexMerger.mergeQueryableIndex(indexes) in RealtimePlumber, the indexes are not closed to explicitly release the increased mmap because these indexes are still used by queries. Then the mmap usage always increases until abandonSegment() is called which only happens when handoff succeeds.
If handoff is slow or coordinator is not working properly, the mmap usage will keep increasing, this is a problem.
This PR separates the QueryableIndex used by query and used by merge process, then we can close the QueryableIndexes used by merge process to release mmap, and leave the QueryableIndexes used by query untouched.
This PR is verified in my product environment, it indeed controls the mmap usage
There was a problem hiding this comment.
Again, this is beyond me how just creating (and later closing) some new objects without revoking creating any other objects could improve anything, unless any code around here is lazy, that doesn't seem to be the case.
There was a problem hiding this comment.
If this is about avoiding refreshing some memory mapped files in memory (although I don't see the mechanism how it helps either), at very least the surrounding try {} block should be refactored, because currently it couples the creation of the indexes list and mergeSegmentFiles() for no apparent reason.
There was a problem hiding this comment.
Will address these comments.
The mechanism is like this:
- mmap usage will increase after indexes merged. mmap can only be released when close() called on indexes.
- currently close() on the indexes are not called until abandonSegment() which can be delayed for a very long time if handoff is slow. Before handoff succeeds, the mmap will keep increasing.
- to avoid this increasement, I create new QueryableIndexes object and close them after each merge success/fail, then the increased mmap during the merge process is released.
- I think this magic works because even a flle is mmaped multiple times, the mmap usage is calculated separately each mmap action, and can be un-mmaped separately each un-mmap action. In this case, close() on QueryableIndex is the un-mmap action.
Wish I explain the mechanism clearly. Though this PR is very simple, it indeed help on the mmap usage.
There is an alternative solution to recycle mmap: load the mergedFile as QueryableIndex and swap all hydrants' small QueryableIndexes with this single big QueryableIndex to the Sink. and close all hydrants' small QueryableIndexes and delete all hydrant segments to release mmap (the mmap usage is already increased after merge process done)
There was a problem hiding this comment.
We are standing at the same page now, your understanding is totally correct.
The worst case, the complete set of data is queried (all rows/all columns), the exact same footprint is as the merge required in the first place.
But usually(as I noticed in my production environment), very little mmap is increased after queries, many columns are ingested but not queried at all. This part of mmap is not a problem. And if we do the final swap after merge, this part of mmap will decrease because of the new merged segment has no mmap footprint. In most cases, user is like to query the latest data, when handoff is slow, it's better to do the final swap to reduce this part of mmap for the earlier time's hydrant segments.
Back to this PR, it releases mmap after merge is done, as a result, the mmap usage is under control, it won't grow to a huge number when handoff is slow to let the process be killed by yarn.
There was a problem hiding this comment.
Cool, glad we could sort out what is going on, and apologies it took so long for me to understand what the point was 👍
Since this doesn't really seem to particularly have any negative consequences, and since it is doing something useful in some cases at least, I'll have another look at this PR.
Also in jdk 10+, I believe it will be possible to open files with O_DIRECT, which is probably what we really want here if we want merge to be done out of the query path and not have a significant impact on page cache usage, though it would potentially lower merge performance. I think it would be worth putting a note and adding this link so our future selves remember to consider if we ever make it out of java 8.
There was a problem hiding this comment.
@clintropolis it's possible to use O_DIRECT already, obtain a raw pointer from mmap() and wrap as https://github.com/DataSketches/memory.
There was a problem hiding this comment.
so my understanding is that, this is useful in case when, on realtime process, very small subset of columns/data in the intermediate persisted segments is being read from queries.
some commentary in the code along those lines would be great.
overall, this patch looks useful to me.
"swap after merge" is further useful to reduce inode count, but the swap would be tricky to implement as there might be queries underway on the indexes just merged.
I think O_DIRECT is whole other beast and would require significant performance regression testing in this case.
There was a problem hiding this comment.
Hmm, this got marked stale, but i also think it would be ok to merge if @leventov's comments and some additional javadocs explaining what was going on and linking to this discussion were added. Do you have any interest in finishing this @kaijianding?
Apologies that it got stalled in review for so long.
|
I'm untagging milestone since this issue is not necessarily a release blocker. Feel free to let me know if you think this should be. |
|
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that's incorrect or this pull request should instead be reviewed, please simply write any comment. Even if closed, you can still revive the PR at any time or discuss it on the dev@druid.apache.org list. Thank you for your contributions. |
|
This pull request/issue is no longer marked as stale. |
1 similar comment
|
This pull request/issue is no longer marked as stale. |
|
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 4 weeks if no further activity occurs. If you think that's incorrect or this pull request should instead be reviewed, please simply write any comment. Even if closed, you can still revive the PR at any time or discuss it on the dev@druid.apache.org list. Thank you for your contributions. |
|
This pull request/issue has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
When coordinator is down or loading segments slowly, or realtime node populates segments too fast(like pulling data from long time ago), there will be many many merged files and many many hydrant segments be loaded as mmap during the merge process. There will be many many intermediate objects, like DirectBufferR, while these hydrant segments' queryableIndex can't be unloaded immediately after the merge process due to they may be used by a query.
As a result, both mmap and heap size will grow to a large number and finally killed by container(in my case is yarn) or OOM(it indeed happened in my product environment).
A better approach is that: separate query and merge process, alway load indexes from hydrant files and close the loaded indexes after merge success/fail to release mmap and intermediate objects