Motivation
Currently native batch ingestion has limited guardrails with respect to its memory consumption. On the positive side, it has a simple memory model that estimates heap utilization during its segment creation phase and when it senses memory pressure it offloads some data to disk, freeing up heap (it performs persists "on heap" indices -- aka "intermediate" persists). However, native batch internally keeps creating data structures (such as Sinks and Firehydrants) roughly proportionally to the number of segments and intermediate persists corresponding to the input file that a particular peon is processing. Therefore, regardless of the intermediary persists, as the file size grows, the memory consumption can grow unbounded eventually making ingestion exhaust all Java heap available making the ingestion task fail. The main ways to work around this today is to manage the input file (i.e. dividing it into smaller pieces) or by changing the segment granularity (so fewer Sinks and FireHydrants are created) but these workarounds are not convenient all the time. One big reason that this situation exists is because the code path for real time ingestion and that of native for the AppenderatorImpl class is shared causing the batch ingestion path to use data structures that it does not necessarily need. We propose to split the appenderator class into two classes: RealTimeAppenderator and BatchAppenderator and refactor BatchAppenderator so that data structures that are not required to be in memory can be garbaged collected thus removing the unbounded memory growth. This proposal would leave RealTimeAppenderator basically intact, same as before.
Proposed changes
We start by breaking up the current AppenderatorImpl into two new classes: RealTimeAppenderator and BatchAppenderator both implementing the Appenderator interface and potentially being derived from a base class that gathers common code. The RealTimeAppenderator is basically the same code as it exists today in the AppenderatorImpl and it will run unchanged for streaming (i.e. real time) tasks. BatchAppenderator will be modified as follows.
The main change for BatchAppenderator is actually straightforward to understand. First unnecessary code that is there to support only the real time use case can be removed (such as the VersionedIntervalTimeLine and other code). More important, at a high level the main change is as follows. Today, there is code that persists hydrants when the code detects (through a simple but effective memory use model) heap pressure is at a certain level. After the hydrants are persisted some heap memory is released back into the system. However, stress tests and code inspection reveal that not all memory associated with the hydrants is completely released. The consequence of this is that memory grows unbounded in a single task as its number of segments/hydrants increase (in the appendix we will show results of our tests that illustrate this). The change proposed is thus to release all memory being consumed by Sinks and Firehydrants after they are persisted. The code also makes sure that Sinks and FireHydrants are released in the merge phase as soon as they are used (Sinks and FireHydrants are recovered from their persisted files just in time when they are merged, on a Sink by Sink basis.) We have already written code that demonstrates that this is feasible and the memory savings are dramatic for our tests demonstrating that memory growth is minimal after this change (see Appendix below for preliminary results).
Specific BatchAppenderator changes
There are four phases: creating in memory segments, persisting segments on disk, merging persisted segments, and then pushing them. The basic insight in the refactoring is that the batch appenderator does not have to keep any segments in memory after they have been persisted and they can be recovered from persistent storage any time we need them (like in merging). As soon as the current segments in memory are persisted, their associated data structures (i.e. Sinks & Firehydrants) can immediately be released thus be eligible for garbage collection. This is because batch ingestion does not have to support "real time" querying. Next we explain the changes in each phase.
Creating in memory segments
This part of the code has practically no change (other than removing data structures & code used for querying but not needed in batch). The regular process works as usual. As a record is added Sinks are created or fetched from memory and new Firehydrants with "on heap" indices are created & added to the Sinks as usual.
Persisting segments on disk
The conditions for persisting segments are exactly as before: the memory estimation model determines when to do this according to its measurements and the parameters passed in the ingestion spec (i.e. maxBytesInMemory etc.). However, rather than a-synchronically calling persistAll inside the add method now we synchronously call a new method persistAllAndClear(). This new method is really straightforward and it is mostly composed of previous logic. The critical difference is that it enforces that persistAll and a new method clear(boolean removeOnDiskData) (passing false in removeOnDiskData) are called strictly sequentially. That is, the code waits until all data has been persisted and then it is cleared. persistHydrant which is called from within persistAll also no longer swaps the "on heap" memory index with a QueryableIndex since that is no longer required. After all Sinks and their Firehydrants are persisted then the clear(false) function is immediately called. This function will remove all Sinks and Firehydrants that were persisted but without removing them from disk. After persistAllAndClear() is called no Sinks and no Firehydrants will remain in memory immediately after that point.
One critical aspect here is that the act of persisting the FireHydrant saves all necessary data and metadata to disk and we keep a file reference to that local storage so the code is able to recover them later as needed.
After the current batch has been persisted, when add gets called again, the cycle will begin again until the next intermidiate persist or the final merge & push.
Merging segments
After all data has been read from the input data source (or when a push is forced by the appenderator driver before the whole file is read because some conditions, like maxRowsPerSegment have been met) then the segments will be merged and then pushed. When we reach this phase we first call persistAllAndClear() to make sure all data is on disk and no Sinks are in memory. Then right before merge we recover each Sink and their Firehydrants from disk. The key reason we can do this is because we know the base directory where they have been stored (local) and we know exactly their metadata and data layout. It is critical here that we recover Sinks and associated Firehydrants one at a time because we want to merge them one by one then delete them again from memory so that we avoid memory pressure if we recover them all at once and then merge them.
Pushing
After segments are merged they can be pushed as usual.
Rationale
An alternative that we considered is not splitting the class but rather introduce a flag. However, this would make the code hard to read and also hard to maintain since the changes, though simple to understand conceptually, are dramatic at the code level and have to modify some areas of the concurrency model used in streaming. Therefore we recommend the proposal to split the appenderators.
The downside of splitting is that now we have a split code base for the Appenderator functionality. Initially it may make maintenance of the code slightly more difficult for people used to the old code but eventually the advantages of the separation of concerns principle will kick in in addition to the critical benefits of avoiding unbounded memory growth.
Operational impact
The code is not modifying any external APIs so no impact is expected in operation (other than the kind of out of memory issues addressed by this proposal will no longer happen). There will not be anything deprecated or removed by this change and hence no migration path that cluster operators need to be aware of. All of the features introduced will be on by default but given the magnitude of the change a feature flag will be provided to rollback the behavior in case of suspected bugs with no code changes.
Test plan
In addition to usual unit test and integration test coverage we have created a test file that demonstrates the unbounded memory growth as well as the elimination of that growth with the proposed changes.
Future Work
Appenderator interface redesign
The work proposed here is limited to refactoring the current handling of Sink and Firehydrant in order to minimize their memory utilization so we can ingest large files for batch ingestion without running out of memory. With this refactor we expect that the current concurrency exposed by Appenderator interface (i.e. the use of Future is some of its methods) is no longer required. However, we propose to separate the redesign of the Appenderator interface (i.e. potentially splitting into concurrent/non-concurrent or completely eliminating it) in order to accelerate the time of delivery and to have more time to think about an appropriate design.
Do not persist all Sink and Firehydrant at the same time
The design in this proposal assumes all Sink and Firehydrant will be persisted at once and all their memory objects removed (i.e. left unreferenced so that GC can free them). There could be a more optimal implementation where we do not persist some of them (maybe they are small and currently being appended). We leave this for future work.
Use alternative file format for intermediate persist (for faster ingest)
While testing versions of the prototype code to support this proposal we noticed that intermediate persists and merge may take a long time. For batch ingestions, since we do not query the segments at real time, we do not really need to use the segment format when persisting. There could be a more efficient format (i.e. fast append) and we could delegate the segment file creation just after the "merge & push" phase is complete.
References
PR for implementation of this design
Bound memory utilization for dynamic partitioning (i.e. memory growth is constant)
Original proposal for Appenderator design
Appenderators, DataSource metadata, KafkaIndexTask
Appendix
Test design
We created a synthetic CSV file designed to reflect a real case scenario where a Druid user ingestion ran out or memory. The file has 1 million rows, each row with roughly 200 columns. The data represents many years of events and the segment granularity is set to DAY. There are roughly 90-100 events per day so at the end there are over 10,000 segments created. We used a peon with 2G of heap to do the runs. The partition type is dynamic and we used defaults in all other areas (such as tuningConfig etc) of the ingestion spec. The file is ordered by timestamp.
Test with current code (Apache/druid master)
The following diagram (obtained using gcplot processing the garbage collection log) shows the memory utilization of the ingestion of the test file. You can see that the memory grows really quickly (both before and after gc) until it just flattens and hits the 2G heap ceiling. Actually, this run ended in an out of memory error (OOM) thus ingestion failed. Ingestion crashed in the "segment creation" phase of the Appenderator not even making it to the "merge" phase. Other aspects of the garbage collection log (not shown here) shows that about 50% of the time (all the flat curve at the right of the graph) are actually spend just doing garbage collection and almost no actual work (until ingestion finally crashes). Additional data (not shown) indicates that the run ran out of memory at about the half point of segment creation (~5K segments out of 10K had been created) but before making it to the segment merge phase.

Test with code incorporating the proposed changes
This test is the same as the one above but with the new code (code is not production ready yet, just enough to demonstrate the value). Now you see a very different picture. The "EMA" line is the "exponential moving average" of the heap utilization and it clearly shows that memory usage is now very stable over both phases of ingestion and also not getting even close to the 2G heap limit (the left side of the graph with the "spikes" is the segment creation phase where spikes are created when a segment gets persisted and ll its memory is released back. The flat part in the right hand side is the merge phase which clearly shows the effect of merging on a Sink by Sink basis and then releasing all the memory associated with the Sink and FireHydrants after they have been merged. Now the new code fully processes the test file with no memory (or other) issues.

Motivation
Currently native batch ingestion has limited guardrails with respect to its memory consumption. On the positive side, it has a simple memory model that estimates heap utilization during its segment creation phase and when it senses memory pressure it offloads some data to disk, freeing up heap (it performs persists "on heap" indices -- aka "intermediate" persists). However, native batch internally keeps creating data structures (such as
SinksandFirehydrants) roughly proportionally to the number of segments and intermediate persists corresponding to the input file that a particular peon is processing. Therefore, regardless of the intermediary persists, as the file size grows, the memory consumption can grow unbounded eventually making ingestion exhaust all Java heap available making the ingestion task fail. The main ways to work around this today is to manage the input file (i.e. dividing it into smaller pieces) or by changing the segment granularity (so fewerSinksandFireHydrantsare created) but these workarounds are not convenient all the time. One big reason that this situation exists is because the code path for real time ingestion and that of native for theAppenderatorImplclass is shared causing the batch ingestion path to use data structures that it does not necessarily need. We propose to split the appenderator class into two classes:RealTimeAppenderatorandBatchAppenderatorand refactorBatchAppenderatorso that data structures that are not required to be in memory can be garbaged collected thus removing the unbounded memory growth. This proposal would leaveRealTimeAppenderatorbasically intact, same as before.Proposed changes
We start by breaking up the current
AppenderatorImplinto two new classes:RealTimeAppenderatorandBatchAppenderatorboth implementing theAppenderatorinterface and potentially being derived from a base class that gathers common code. TheRealTimeAppenderatoris basically the same code as it exists today in theAppenderatorImpland it will run unchanged for streaming (i.e. real time) tasks.BatchAppenderatorwill be modified as follows.The main change for
BatchAppenderatoris actually straightforward to understand. First unnecessary code that is there to support only the real time use case can be removed (such as theVersionedIntervalTimeLineand other code). More important, at a high level the main change is as follows. Today, there is code that persists hydrants when the code detects (through a simple but effective memory use model) heap pressure is at a certain level. After the hydrants are persisted some heap memory is released back into the system. However, stress tests and code inspection reveal that not all memory associated with the hydrants is completely released. The consequence of this is that memory grows unbounded in a single task as its number of segments/hydrants increase (in the appendix we will show results of our tests that illustrate this). The change proposed is thus to release all memory being consumed bySinksandFirehydrantsafter they are persisted. The code also makes sure thatSinksandFireHydrantsare released in the merge phase as soon as they are used (SinksandFireHydrantsare recovered from their persisted files just in time when they are merged, on aSinkbySinkbasis.) We have already written code that demonstrates that this is feasible and the memory savings are dramatic for our tests demonstrating that memory growth is minimal after this change (see Appendix below for preliminary results).Specific
BatchAppenderatorchangesThere are four phases: creating in memory segments, persisting segments on disk, merging persisted segments, and then pushing them. The basic insight in the refactoring is that the batch appenderator does not have to keep any segments in memory after they have been persisted and they can be recovered from persistent storage any time we need them (like in merging). As soon as the current segments in memory are persisted, their associated data structures (i.e.
Sinks&Firehydrants) can immediately be released thus be eligible for garbage collection. This is because batch ingestion does not have to support "real time" querying. Next we explain the changes in each phase.Creating in memory segments
This part of the code has practically no change (other than removing data structures & code used for querying but not needed in batch). The regular process works as usual. As a record is added
Sinksare created or fetched from memory and newFirehydrantswith "on heap" indices are created & added to theSinksas usual.Persisting segments on disk
The conditions for persisting segments are exactly as before: the memory estimation model determines when to do this according to its measurements and the parameters passed in the ingestion spec (i.e.
maxBytesInMemoryetc.). However, rather than a-synchronically callingpersistAllinside theaddmethod now we synchronously call a new methodpersistAllAndClear(). This new method is really straightforward and it is mostly composed of previous logic. The critical difference is that it enforces thatpersistAlland a new methodclear(boolean removeOnDiskData) (passingfalseinremoveOnDiskData) are called strictly sequentially. That is, the code waits until all data has been persisted and then it is cleared.persistHydrantwhich is called from withinpersistAllalso no longer swaps the "on heap" memory index with aQueryableIndexsince that is no longer required. After allSinksand theirFirehydrantsare persisted then theclear(false)function is immediately called. This function will remove allSinksandFirehydrantsthat were persisted but without removing them from disk. AfterpersistAllAndClear()is called noSinksand noFirehydrantswill remain in memory immediately after that point.One critical aspect here is that the act of persisting the
FireHydrantsaves all necessary data and metadata to disk and we keep a file reference to that local storage so the code is able to recover them later as needed.After the current batch has been persisted, when
addgets called again, the cycle will begin again until the next intermidiate persist or the final merge & push.Merging segments
After all data has been read from the input data source (or when a push is forced by the appenderator driver before the whole file is read because some conditions, like
maxRowsPerSegmenthave been met) then the segments will be merged and then pushed. When we reach this phase we first callpersistAllAndClear()to make sure all data is on disk and noSinksare in memory. Then right before merge we recover eachSinkand theirFirehydrantsfrom disk. The key reason we can do this is because we know the base directory where they have been stored (local) and we know exactly their metadata and data layout. It is critical here that we recoverSinksand associatedFirehydrantsone at a time because we want to merge them one by one then delete them again from memory so that we avoid memory pressure if we recover them all at once and then merge them.Pushing
After segments are merged they can be pushed as usual.
Rationale
An alternative that we considered is not splitting the class but rather introduce a flag. However, this would make the code hard to read and also hard to maintain since the changes, though simple to understand conceptually, are dramatic at the code level and have to modify some areas of the concurrency model used in streaming. Therefore we recommend the proposal to split the appenderators.
The downside of splitting is that now we have a split code base for the
Appenderatorfunctionality. Initially it may make maintenance of the code slightly more difficult for people used to the old code but eventually the advantages of the separation of concerns principle will kick in in addition to the critical benefits of avoiding unbounded memory growth.Operational impact
The code is not modifying any external APIs so no impact is expected in operation (other than the kind of out of memory issues addressed by this proposal will no longer happen). There will not be anything deprecated or removed by this change and hence no migration path that cluster operators need to be aware of. All of the features introduced will be on by default but given the magnitude of the change a feature flag will be provided to rollback the behavior in case of suspected bugs with no code changes.
Test plan
In addition to usual unit test and integration test coverage we have created a test file that demonstrates the unbounded memory growth as well as the elimination of that growth with the proposed changes.
Future Work
Appenderatorinterface redesignThe work proposed here is limited to refactoring the current handling of
SinkandFirehydrantin order to minimize their memory utilization so we can ingest large files for batch ingestion without running out of memory. With this refactor we expect that the current concurrency exposed byAppenderatorinterface (i.e. the use ofFutureis some of its methods) is no longer required. However, we propose to separate the redesign of theAppenderatorinterface (i.e. potentially splitting into concurrent/non-concurrent or completely eliminating it) in order to accelerate the time of delivery and to have more time to think about an appropriate design.Do not persist all
SinkandFirehydrantat the same timeThe design in this proposal assumes all
SinkandFirehydrantwill be persisted at once and all their memory objects removed (i.e. left unreferenced so that GC can free them). There could be a more optimal implementation where we do not persist some of them (maybe they are small and currently being appended). We leave this for future work.Use alternative file format for intermediate persist (for faster ingest)
While testing versions of the prototype code to support this proposal we noticed that intermediate persists and merge may take a long time. For batch ingestions, since we do not query the segments at real time, we do not really need to use the segment format when persisting. There could be a more efficient format (i.e. fast append) and we could delegate the segment file creation just after the "merge & push" phase is complete.
References
PR for implementation of this design
Bound memory utilization for dynamic partitioning (i.e. memory growth is constant)
Original proposal for
AppenderatordesignAppenderators, DataSource metadata, KafkaIndexTask
Appendix
Test design
We created a synthetic CSV file designed to reflect a real case scenario where a Druid user ingestion ran out or memory. The file has 1 million rows, each row with roughly 200 columns. The data represents many years of events and the segment granularity is set to
DAY. There are roughly 90-100 events per day so at the end there are over 10,000 segments created. We used a peon with 2G of heap to do the runs. The partition type isdynamicand we used defaults in all other areas (such astuningConfigetc) of the ingestion spec. The file is ordered by timestamp.Test with current code (Apache/druid master)
The following diagram (obtained using gcplot processing the garbage collection log) shows the memory utilization of the ingestion of the test file. You can see that the memory grows really quickly (both before and after gc) until it just flattens and hits the 2G heap ceiling. Actually, this run ended in an out of memory error (OOM) thus ingestion failed. Ingestion crashed in the "segment creation" phase of the
Appenderatornot even making it to the "merge" phase. Other aspects of the garbage collection log (not shown here) shows that about 50% of the time (all the flat curve at the right of the graph) are actually spend just doing garbage collection and almost no actual work (until ingestion finally crashes). Additional data (not shown) indicates that the run ran out of memory at about the half point of segment creation (~5K segments out of 10K had been created) but before making it to the segment merge phase.Test with code incorporating the proposed changes
This test is the same as the one above but with the new code (code is not production ready yet, just enough to demonstrate the value). Now you see a very different picture. The "EMA" line is the "exponential moving average" of the heap utilization and it clearly shows that memory usage is now very stable over both phases of ingestion and also not getting even close to the 2G heap limit (the left side of the graph with the "spikes" is the segment creation phase where spikes are created when a segment gets persisted and ll its memory is released back. The flat part in the right hand side is the merge phase which clearly shows the effect of merging on a
SinkbySinkbasis and then releasing all the memory associated with theSinkandFireHydrantsafter they have been merged. Now the new code fully processes the test file with no memory (or other) issues.