Avoid memory mapping hydrants after they are persisted & after they are merged for native batch ingestion#11123
Conversation
clintropolis
left a comment
There was a problem hiding this comment.
i think it would be useful to add some tests to show that this has the intended effect (maybe also easier/possible after modifying the appenderator heap usage calculation to consider whether or not the segment has been mapped)
There was a problem hiding this comment.
I'm not sure this is true, I think realtime queries still use the pre-merge intermediary segments and we do not do any sort of swap and replace to the merged segment
There was a problem hiding this comment.
It turns out that queries can still happen for realtime after hydrants are merged... I just added code to deal with this case. For realtime ingestion, memory mappings must remain after merge.
There was a problem hiding this comment.
I wonder if there is a way to limit the supplier to being part of the FireHydrant instead of doing this modification here?
While I can't think of any ill side-effect that this might cause since these suppliers shouldn't be called too many times in the scheme of things, this change also has huge surface area because of it happening here instead of being limited to ingestion.
There was a problem hiding this comment.
I rolled back the memoization code....
There was a problem hiding this comment.
Hmm, I think the memory overhead calculation checks are going to end up triggering the supplier which will map the segment? Specifically calculateMMappedHydrantMemoryInUsed, which is going to try to get the storage adapter to count the number of columns. I think FireHydrant is going to need some way to track whether or not the segment has been mapped (related to my earlier thread on why the supplier might also be more suitable to live in FireHydrant somehow, so that we can have some side effect to let the hydrant know the mapping has happened)
There was a problem hiding this comment.
Good catch... I did not see memory being mapped in my tests but it was because I had "skipBytesInMemoryOverheadCheck": true in the ingestion spec....
There was a problem hiding this comment.
I refactored the memory calculations a little in my last commit, enough to make it work, but they still need more work (judging from my brief look at the heap dumps)
…ure tracking calculations
f063436 to
c5f2840
Compare
| if (sink.swappable()) { | ||
| // It is swappable. Get the old one to persist it and create a new one: | ||
| indexesToPersist.add(Pair.of(sink.swap(), identifier)); | ||
| totalHydrantsPersisted.addAndGet(1); |
There was a problem hiding this comment.
Why is the count for hydrant increased by 1 here?
There was a problem hiding this comment.
Because sink.swap() creates a new hydrant (which should not be counted) but returns the old hydrant (which needs to be counted)
| final List<Pair<FireHydrant, SegmentIdWithShardSpec>> indexesToPersist = new ArrayList<>(); | ||
| int numPersistedRows = 0; | ||
| long bytesPersisted = 0L; | ||
| AtomicLong totalHydrantsCount = new AtomicLong(); |
There was a problem hiding this comment.
What's the reason for using AtomicLong here?
There was a problem hiding this comment.
Because it is being used inside the lambda thus the variable needs to be final. Since the value of this is set more than once it cannot be a primitive type.
There was a problem hiding this comment.
I forgot about SettableSupplier, I could use this rather than AtomicLong since there is no multiple thread access to that variable. But I think that because it is a counter using the built in add in AtomicLong is convenient in this case.
There was a problem hiding this comment.
Please do not use concurrent data structures where there is no concurrent access from multiple threads. It makes code very confusing.
There was a problem hiding this comment.
I guess you can use MutableLong instead.
| } | ||
| } | ||
|
|
||
| private static SegmentIdWithShardSpec si(String interval, String version, int partitionNum) |
There was a problem hiding this comment.
Can you rename this to something more readable?
There was a problem hiding this comment.
That is copied from the existing unit test for the real time appenderator...but sure I can rename it.
| ); | ||
| } | ||
|
|
||
| static InputRow ir(String ts, String dim, Object met) |
There was a problem hiding this comment.
Can you rename this to something more readable?
| ); | ||
| } | ||
|
|
||
| private static <T> List<T> sorted(final List<T> xs) |
There was a problem hiding this comment.
is this method required? Since SegmentIdWithShardSpec and DataSegment implement Comparable, is calling Collections.sort on list directly possible?
There was a problem hiding this comment.
Removed custom collector using stream sort instead (fix in next push).
| if (!isRealTime()) { | ||
| // sanity: | ||
| if (fireHydrant.getPersistedFile() == null) { | ||
| throw new ISE("Persisted file for batch hydrant is null!"); |
There was a problem hiding this comment.
would you want any other info in this message to debug the exception? It's ok if not.
There was a problem hiding this comment.
That should never happen... it is just sanity. Can you think of anything else to say?
There was a problem hiding this comment.
Added hydrant (toString), a little more but not a lot more.
| * @return The persisted segment id. This is needed to recreate mapped files before merging. | ||
| * It will be null for real time hydrants | ||
| */ | ||
| public @Nullable SegmentId getPersistedSegmentId() |
There was a problem hiding this comment.
can you also add a note about why persistedSegmentId is required in addition to getSegmentId()? I understand that the latter will throw an NPE?
There was a problem hiding this comment.
Having persistedSegmentId in addition to segmentId is kind of ugly. But I was trying to avoid touching the existing code as much as possible. In order to use only segment id then I would have to introduce a "segmentId" private member that sometimes it is set by the adapter and others it is set by the "persisted segment id". I am on the fence on whether doing this would even be more confusing still. In any case, our discussion was for the next step to do is to split Appendarator into two implementations: One batch and one for real time. The realtime appenderator would be practically the same implementation before these changes. The batch appenderator would take the current incremental direction to its logical conclusion: avoid keeping all data structures that are not needed in memory for native batch (which would now remove OOMs due to relation of data size to memory consumption). Then this code is temporary and we can have a cleaner implementation in the BatchAppenderatorImpl.
There was a problem hiding this comment.
I don't understand what persistedSegmentId is. Is it ever different from segmentId?
There was a problem hiding this comment.
The persistedSegmentId is the same as segmentId. However, segmentId is being accessed indirectly through the adapter. Closing the QueryableIndex nullifies the reference to it inside the adapter thus after this action segmentId is no longer accesible. I decided to add a new data member and a new method to store the segmentId after the queryable index is closed. The segmentId is required to re-open the QueryableIndex in the merge phase.
There was a problem hiding this comment.
In order to make the code less confusing I can still keep the persistedSegmentId reference but modify the getSegmentId method as follows:
public SegmentId getSegmentId() { if (adapter.get() != null) { return adapter.get().getId(); } else { return persistedSegmentId; } }
Would you prefer this instead? Personally I prefer the explicit way to obtain that reference but the fact that there are so many questions already makes it clear that it is confusing. So I am fine with an alternative. Let me know if the original way, or the above works or whether you have a better idea.
There was a problem hiding this comment.
Moved persisted metadata to AppenderatorImpl as a map. Added comments to the Map to why it is necessary. I think this way is much cleaner. In next push.
…emoved superfluous differences and fix comment typo. Removed custom comparator
| // in order to facilitate the mapping of the QueryableIndex associated with a given hydrant | ||
| // at merge time. This is necessary since batch appenderator will not map the QueryableIndex | ||
| // at persist time in order to minimize its memory footprint. | ||
| private final Map<FireHydrant, Pair<File, SegmentId>> persistedHydrantMetadata = new HashMap<>(); |
There was a problem hiding this comment.
does FireHydrant has a hashCode implementation? Maybe using identityHashMap makes more sense here? Also, do we ever need to clear this map? like when org.apache.druid.segment.realtime.appenderator.Appenderator#clear or org.apache.druid.segment.realtime.appenderator.Appenderator#drop is called.
There was a problem hiding this comment.
Yeah, IndentyHashMap makes it explicit that the key is a Java reference. Also, clearing the Map in the places you referenced above.
… fact that keys are Java references. Maintain persisted metadata when dropping/closing segments.
…t this to "true" make code fallback to previous code path.
| * in order to facilitate the mapping of the QueryableIndex associated with a given hydrant | ||
| * at merge time. This is necessary since batch appenderator will not map the QueryableIndex | ||
| * at persist time in order to minimize its memory footprint. This has to be synchronized since the | ||
| * map bay be accessed from multiple threads. |
| totalRows.addAndGet(-sink.getNumRows()); | ||
| } | ||
| // count hydrants for stats: | ||
| pushedHydrantsCount.addAndGet(IterableUtils.size(sink)); |
There was a problem hiding this comment.
You can use Iterables.size instead so you don't have to add new dependency on commons-collections4
| |`druid.peon.mode`|Choices are "local" and "remote". Setting this to local means you intend to run the peon as a standalone process (Not recommended).|remote| | ||
| |`druid.indexer.task.baseDir`|Base temporary working directory.|`System.getProperty("java.io.tmpdir")`| | ||
| |`druid.indexer.task.baseTaskDir`|Base temporary working directory for tasks.|`${druid.indexer.task.baseDir}/persistent/task`| | ||
| |`druid.indexer.task.batchMemoryMappedIndex`|If false, native batch ingestion will not map indexes thus saving heap space. This does not apply to streaming ingestion, just to batch.|`false`| |
There was a problem hiding this comment.
Can you add comment on how is this used, what's its purpose, and why would a user need to set it to true?
| <version>1.3</version> | ||
| <scope>test</scope> | ||
| </dependency> | ||
| <dependency> |
There was a problem hiding this comment.
Not needed. If we use Iterables.size instead
…a dependency), and fixing a typo in a comment.
Description
Currently native batch ingestion may run out of memory while ingesting files with lots of logical segments (i.e. Sinks) and multiple physical segments (i.e. FireHydrants) per sink. Memory profiling indicates that one source of memory consumption is the references to memory mapped files being created as firehydrants are created. This will avoid memory mapping firehydrants during segment creation for native batch ingestion and dropping the memory mapping right after they are merged.
The fix is relatively simple. Introduce a flag when the
Appenderatoris created to indicate whether it is working on a "real time" or batch task. When it is working on a batch task use the flag to avoid mapping the segments. In addition, just drop theQueriableIndexSegmentfrom the fire hydrant just after a given sink is merged.I also added metrics to track sinks & hydrants periodically (when persisting and at the end of segment creation phase). This is to have some information for debugging given that these data structures are the heaviest consumers of memory and even though hydrants are no longer mapped during batch their references still accumulate.
This PR has: