explicitly unmap hydrant files when abandonSegment to recycle mmap memory#4341
explicitly unmap hydrant files when abandonSegment to recycle mmap memory#4341leventov merged 4 commits intoapache:masterfrom
Conversation
| serverProperties.put("zookeeper.connect", zkTestServer.getConnectString() + zkKafkaPath); | ||
| serverProperties.put("zookeeper.session.timeout.ms", "10000"); | ||
| serverProperties.put("zookeeper.sync.time.ms", "200"); | ||
| serverProperties.put("port", String.valueOf(new Random().nextInt(9999) + 10000)); |
There was a problem hiding this comment.
Please use ThreadLocalRandom.current()
| mergedFile, | ||
| sink.getSegment().withDimensions(Lists.newArrayList(index.getAvailableDimensions())) | ||
| ); | ||
| index.close(); |
There was a problem hiding this comment.
It doesn't seem that creating index is even needed here, IndexMerger.getMergedDimensions(indexes) could be used
| ); | ||
| for (FireHydrant hydrant : sink) { | ||
| cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant)); | ||
| hydrant.getSegment().close(); |
There was a problem hiding this comment.
Despite it's documentation, abandonSegment() is called not only from mergeExecutor, so races between persistAndMerge() and abandonSegment() are possible. It should be resolved before merging this change, because it may lead to JVM crashes.
There was a problem hiding this comment.
could you explain more on the race condition? @leventov
There was a problem hiding this comment.
abandonSegment() is called in FlushingPlumber not from mergeExecutor. Well, probably it doesn't lead to race, because in the context of FlushingPlumber mergeExecutor is not used at all, but it's a dangerous situation. Could you please refactor RealtimePlumber/FlushingPlumber by extracting logic and fields which are used by both into a superclass, and make RealtimePlumber and FlushingPlumber both subclasses of that class. So that FlushingPlumber doesn't have unused Executor fields.
There was a problem hiding this comment.
I got your point. You'd like to move the merge and handoff part of code out of the parent class and make them only in RealtimePlumber who really needs to care about merge and handoff.
I tried to refactor as you described, I found there are huge diff to make and I think it need be carefully tested. I think it's big risk to do it in this PR.
As FlushingPlumber doesn't really start the mergeExecuter, thus there is no race condition here. Should we do the refactor in another PR? @leventov
There was a problem hiding this comment.
IMO, another PR is ok. Or maybe even skipping it altogether, and instead, migrating users of Plumbers to Appenderator (which is meant to be an improved replacement).
The flushing plumber isn't really meant to be used in production anyway (I think it's not even documented). It was meant to be a way to set up some realtime demos that just throw away data after a period of time.
gianm
left a comment
There was a problem hiding this comment.
@kaijianding, could you make a similar change in AppenderatorImpl as well, in the "abandonSegment" method?
I think there's no need to refactor flushing/realtime plumber, since:
- Plumbers could be rewritten in the future in terms of Appenderators, or alternatively replaced with Appenderators, which are more flexible (see original description of #2220)
- The flushing plumber is not expected to be used in prod anyway.
| // We are materializing the list for performance reasons. Lists.transform | ||
| // only creates a "view" of the original list, meaning the function gets | ||
| // applied every time you access an element. | ||
| return Lists.transform( |
There was a problem hiding this comment.
The behavior is different here. The list used to be materialized (Lists.newArrayList) but now it's a view (Lists.transform). The comment says it should be materialized, so I think the old code was right.
There was a problem hiding this comment.
Also it's better to use Stream API for new code, instead of Guava
|
Got an error from the TeamCity build: I clicked "Run" in its UI to see if it will work when run again. |
|
Ah, got it. |
I noticed the RSS memory grown to extremely large number in realtime node in one of my environments. The memory usage grows after index merge and never goes down after handoff like it in other environments.
Usually the
FileUtils.deleteDirectory(target);can recycle mmap memory, but it doesn't work in some environment.We should explicitly unmap hydrant files when abandonSegment to recycle mmap memory.
Also a test fix in TestKafkaExtractionCluster to use random port like in KafkaSupervisorTest. This test failed if the kafka was already started in environment(my environment is not as clean as in Travis environment)