Motivation
The MiddleManager currently runs each task in a separate JVM, with identical configuration across all tasks spawned by a given MM. This model has some drawbacks:
- Complexity of configuration: There is a level of indirection in the configurations (
druid.indexer.runner.javaOpts), and to understand total resource usage the user must consider both the MM config (num workers) and the individual sub-JVM task configurations.
- Easy to overprovision resources: On a single MM, all tasks will have identical sizing. However, not all tasks have the same resource requirements. This can be addressed to some extent with worker affinity, for cases where task load levels can be distinctly categorized based on datasource. However, even for a single datasource, not all tasks necessarily have equal resource requirements.
- Redundant resource provisioning: Some in-memory information, lookup maps in particular, are common across tasks, but having separate task JVMs requires a separate copy of this info in each JVM. With large lookup maps, this can consume an immense amount of memory and considerably limit how many tasks an MM could run.
Proposed changes
A new process type, Indexer (tentative name), will be added. This new process type is an alternative to the MiddleManager that runs all tasks in the single Indexer JVM instead of forking new processes.
Query processing
- All queries made to tasks managed by the Indexer will be executed using shared processing buffers and threads (the standard
druid.processing.numThreads and related configs).
- Appenderator should be modified to support external segment management (i.e., the Indexer will take care of announcing segments and serving them instead of the individual tasks).
sinkTimeline could be externally provided by the Indexer, used to back QueryRunner instances created by Indexer.
- Each task would still maintain its own Appenderator instance for per-task memory tracking
Task API resources
The Indexer will have a single ChatHandlerResource at /druid/worker/v1/chat/{id}, serving requests for all tasks managed by the Indexer.
Task Resource Management
The Indexer will have a configurable global heap limit globalIngestionHeapLimitBytes across all tasks that applies to ingestion workloads (what maxBytesInMemory is currently estimating) and merging workloads (related to ).
globalIngestionHeapLimitBytes should be lower than the total JVM heap size, to leave space for query workloads and to account for the fuzziness of the memory estimates being used.
When the sum of ingestion memory usage and merging memory usage across all tasks reaches globalIngestionHeapLimitBytes, the Indexer will trigger a global persist, causing each managed task to persist its in-heap segment data.
Per-task ingestion heap memory usage will continue to be tracked using the same mechanisms that support maxBytesInMemory.
To track per-task merging memory usage, the task flow will change slightly:
- Before attempting to merge segments, a task will make a request to the Indexer, indicating that it is about to merge.
- When a merge notice is received, the Indexer will attempt to record two memory allocations in its memory usage tracking state:
- a fixed amount of heap. This can scale based on some proportion of
globalIngestionHeapLimitBytes.
- a fixed amount of direct memory, needed for decompression buffers and dictionary conversions. This can scale based on the size in bytes of the fuzzy
+1 factor in the familiar (druid.processing.numThreads + druid.processing.numMergeBuffers + 1) * druid.processing.buffer.sizeBytes formula.
- Note that this will require changes to segment merging to allow it to respect memory bounds
The Indexer will impose a resource management model that only considers byte-based memory estimates and limits. Row-based limits in any task specs will be ignored (rewritten to whatever represents "unlimited" for a given property).
Each task will not have a fixed individual memory limit by default, only the global limit is applied. The Indexer will also have a mode where the global limit is divided evenly across the number of worker slots.
The Indexer will allow optional per-task maxBytesInMemory configurations for tasks. If an individual maxBytesInMemory limit is hit, a task will persist individually. This is to help address potential memory starvation issues, when a subset of tasks have significantly higher data generation rates than other tasks. The per-task limit would be used to constrain tasks that are known ahead of time to have disparately high data generation rates.
Task Assignments
In the Indexer, different tasks can consume different amounts of resources, when using the default mode where tasks do not have individual maxBytesInMemory limits. This is a significant change from the existing MM model, where all tasks receive identical resource allocations.
While this can result in better resource utilization when a combination of small and big tasks run together, it opens potential for skew in assigned workloads (e.g. a majority of high-resource consumption tasks happen to be assigned at the same time to a single Indexer).
To address this initially, task assignments to Indexer processes can be done mostly randomly (with some consideration for how many tasks have already been assigned to a Indexer) , combined with deployment guidance in the docs that instruct users to limit task durations when using Indexer in this mode.
If the Indexer is running in the mode where each task has an equal share of the global heap limit, then the traditional MM task assignment algorithm can be used.
Rationale
To address the motivating concerns, other approaches are possible:
- To simplify MM configuration, we could have the MM take total heap/direct memory settings and divide that across the tasks. This would still have the potential for overprovisioning though.
- To avoid overprovisioning, we could have the MM spawn each task with unlimited heap/direct memory and apply a similar memory limiting protocol across the MM and tasks as described in this proposal. However, doing this memory management within a single process is simpler and more reliable than in a multi-process system.
- To avoid redundant provisioning (where lookups are the main concern presently), we could memory map the lookups from files on disk and share the page cache across tasks. The lookup maps could be potentially evicted from page cache however, and lookups are in the hot path of queries, which is likely undesirable.
The proposed approach was chosen because it addresses all three concerns.
The primary drawback of the proposed approach is the loss of fault isolation from having separate task processes. For this reason, this proposal suggests adding a new process type instead of replacing the MM.
The loss of fault isolation means that the Indexer will need supporting patches to function well, described in the future work section.
Operational impact
- This proposal will not deprecate anything.
- The Indexer should not be deployed until a user has fully upgraded their cluster to a version supporting Indexer. Older versions will not recognize the new process type.
- If a user is downgrading, they should replace their Indexer instances with MiddleManagers first.
Test plan (optional)
The Indexer will be deployed in large clusters that are currently using MiddleManagers, completely replacing the MiddleManagers, and the correctness and stability of ingestion will be verified over time.
Future work
For the proposed approach to work well, we will need to adjust the segment merging sequence such that its memory usage is more bounded: currently, ingestion tasks can be prone to OOM during index merging, and the impact of such failures would be amplified in the single-process Indexer.
We will also need to address the memory estimation issues with growable aggregators, described in issue #6743, although this is a less crucial supporting patch compared to memory bounded index merging.
The initial implementation of Indexer will not be exposed to users in documentation: until the bounded memory segment merging is supported, Indexers will not be stable enough to be a practical choice.
Other potential future work:
- GC watchdog: A watchdog that monitors GC counts/times and triggers spills when a threshold is exceeded could be useful for ensuring that the indexer does not die unnecessarily
- Intelligent task assignment: Some tasks like Hadoop tasks or the parallel batch indexing supervisor task have light resource requirements, while realtime tasks support querying. The task assignment could be enhanced to balance tasks based on these considerations.
- Consider adding some mechanism for controlling the distribution of CPU usage across tasks within a process (maybe a priority-based system)
- Consider adding disk-usage limits as a sanity check for tasks
Related Patches
Initial process and task runner PR: #8107
Motivation
The MiddleManager currently runs each task in a separate JVM, with identical configuration across all tasks spawned by a given MM. This model has some drawbacks:
druid.indexer.runner.javaOpts), and to understand total resource usage the user must consider both the MM config (num workers) and the individual sub-JVM task configurations.Proposed changes
A new process type, Indexer (tentative name), will be added. This new process type is an alternative to the MiddleManager that runs all tasks in the single Indexer JVM instead of forking new processes.
Query processing
druid.processing.numThreadsand related configs).sinkTimelinecould be externally provided by the Indexer, used to back QueryRunner instances created by Indexer.Task API resources
The Indexer will have a single ChatHandlerResource at
/druid/worker/v1/chat/{id}, serving requests for all tasks managed by the Indexer.Task Resource Management
The Indexer will have a configurable global heap limit
globalIngestionHeapLimitBytesacross all tasks that applies to ingestion workloads (whatmaxBytesInMemoryis currently estimating) and merging workloads (related to ).globalIngestionHeapLimitBytesshould be lower than the total JVM heap size, to leave space for query workloads and to account for the fuzziness of the memory estimates being used.When the sum of ingestion memory usage and merging memory usage across all tasks reaches
globalIngestionHeapLimitBytes, the Indexer will trigger a global persist, causing each managed task to persist its in-heap segment data.Per-task ingestion heap memory usage will continue to be tracked using the same mechanisms that support
maxBytesInMemory.To track per-task merging memory usage, the task flow will change slightly:
globalIngestionHeapLimitBytes.+1factor in the familiar (druid.processing.numThreads+druid.processing.numMergeBuffers+ 1) *druid.processing.buffer.sizeBytesformula.The Indexer will impose a resource management model that only considers byte-based memory estimates and limits. Row-based limits in any task specs will be ignored (rewritten to whatever represents "unlimited" for a given property).
Each task will not have a fixed individual memory limit by default, only the global limit is applied. The Indexer will also have a mode where the global limit is divided evenly across the number of worker slots.
The Indexer will allow optional per-task
maxBytesInMemoryconfigurations for tasks. If an individualmaxBytesInMemorylimit is hit, a task will persist individually. This is to help address potential memory starvation issues, when a subset of tasks have significantly higher data generation rates than other tasks. The per-task limit would be used to constrain tasks that are known ahead of time to have disparately high data generation rates.Task Assignments
In the Indexer, different tasks can consume different amounts of resources, when using the default mode where tasks do not have individual
maxBytesInMemorylimits. This is a significant change from the existing MM model, where all tasks receive identical resource allocations.While this can result in better resource utilization when a combination of small and big tasks run together, it opens potential for skew in assigned workloads (e.g. a majority of high-resource consumption tasks happen to be assigned at the same time to a single Indexer).
To address this initially, task assignments to Indexer processes can be done mostly randomly (with some consideration for how many tasks have already been assigned to a Indexer) , combined with deployment guidance in the docs that instruct users to limit task durations when using Indexer in this mode.
If the Indexer is running in the mode where each task has an equal share of the global heap limit, then the traditional MM task assignment algorithm can be used.
Rationale
To address the motivating concerns, other approaches are possible:
The proposed approach was chosen because it addresses all three concerns.
The primary drawback of the proposed approach is the loss of fault isolation from having separate task processes. For this reason, this proposal suggests adding a new process type instead of replacing the MM.
The loss of fault isolation means that the Indexer will need supporting patches to function well, described in the future work section.
Operational impact
Test plan (optional)
The Indexer will be deployed in large clusters that are currently using MiddleManagers, completely replacing the MiddleManagers, and the correctness and stability of ingestion will be verified over time.
Future work
For the proposed approach to work well, we will need to adjust the segment merging sequence such that its memory usage is more bounded: currently, ingestion tasks can be prone to OOM during index merging, and the impact of such failures would be amplified in the single-process Indexer.
We will also need to address the memory estimation issues with growable aggregators, described in issue #6743, although this is a less crucial supporting patch compared to memory bounded index merging.
The initial implementation of Indexer will not be exposed to users in documentation: until the bounded memory segment merging is supported, Indexers will not be stable enough to be a practical choice.
Other potential future work:
Related Patches
Initial process and task runner PR: #8107