Skip to content

[Backport] MSQ: Rework memory management. (#17057)#17210

Merged
kfaraz merged 1 commit intoapache:31.0.0from
kfaraz:backport_17057
Oct 1, 2024
Merged

[Backport] MSQ: Rework memory management. (#17057)#17210
kfaraz merged 1 commit intoapache:31.0.0from
kfaraz:backport_17057

Conversation

@kfaraz
Copy link
Copy Markdown
Contributor

@kfaraz kfaraz commented Oct 1, 2024

  • MSQ: Rework memory management.

This patch reworks memory management to better support multi-threaded workers running in shared JVMs. There are two main changes.

First, processing buffers and threads are moved from a per-JVM model to a per-worker model. This enables queries to hold processing buffers without blocking other concurrently-running queries. Changes:

  • Introduce ProcessingBuffersSet and ProcessingBuffers to hold the per-worker and per-work-order processing buffers (respectively). On Peons, this is the JVM-wide processing pool. On Indexers, this is a per-worker pool of on-heap buffers. (This change fixes a bug on Indexers where excessive processing buffers could be used if MSQ tasks ran concurrently with realtime tasks.)

  • Add "bufferPool" argument to GroupingEngine#process so a per-worker pool can be passed in.

  • Add "druid.msq.task.memory.maxThreads" property, which controls the maximum number of processing threads to use per task. This allows usage of multiple processing buffers per task if admins desire.

  • IndexerWorkerContext acquires processingBuffers when creating the FrameContext for a work order, and releases them when closing the FrameContext.

  • Add "usesProcessingBuffers()" to FrameProcessorFactory so workers know how many sets of processing buffers are needed to run a given query.

Second, adjustments to how WorkerMemoryParameters slices up bundles, to favor more memory for sorting and segment generation. Changes:

  • Instead of using same-sized bundles for processing and for sorting, workers now use minimally-sized processing bundles (just enough to read inputs plus a little overhead). The rest is devoted to broadcast data buffering, sorting, and segment-building.

  • Segment-building is now limited to 1 concurrent segment per work order. This allows each segment-building action to use more memory. Note that segment-building is internally multi-threaded to a degree. (Build and persist can run concurrently.)

  • Simplify frame size calculations by removing the distinction between "standard" and "large" frames. The new default frame size is the same as the old "standard" frames, 1 MB. The original goal of of the large frames was to reduce the number of temporary files during sorting, but I think we can achieve the same thing by simply merging a larger number of standard frames at once.

  • Remove the small worker adjustment that was added in Fix memory calculations for WorkerMemoryParameters for machines with relatively less heap space #14117 to account for an extra frame involved in writing to durable storage. Instead, account for the extra frame whenever we are actually using durable storage.

  • Cap super-sorter parallelism using the number of output partitions, rather than using a hard coded cap at 4. Note that in practice, so far, this cap has not been relevant for tasks because they have only been using a single processing thread anyway.

  • Remove unused import.

  • Fix errorprone annotation.

  • Fixes for javadocs and inspections.

  • Additional test coverage.

  • Fix test.

Fixes #XXXX.

Description

Fixed the bug ...

Renamed the class ...

Added a forbidden-apis entry ...

Release note


Key changed/added classes in this PR
  • MyFoo
  • OurBar
  • TheirBaz

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

* MSQ: Rework memory management.

This patch reworks memory management to better support multi-threaded
workers running in shared JVMs. There are two main changes.

First, processing buffers and threads are moved from a per-JVM model to
a per-worker model. This enables queries to hold processing buffers
without blocking other concurrently-running queries. Changes:

- Introduce ProcessingBuffersSet and ProcessingBuffers to hold the
  per-worker and per-work-order processing buffers (respectively). On Peons,
  this is the JVM-wide processing pool. On Indexers, this is a per-worker
  pool of on-heap buffers. (This change fixes a bug on Indexers where
  excessive processing buffers could be used if MSQ tasks ran concurrently
  with realtime tasks.)

- Add "bufferPool" argument to GroupingEngine#process so a per-worker pool
  can be passed in.

- Add "druid.msq.task.memory.maxThreads" property, which controls the
  maximum number of processing threads to use per task. This allows usage of
  multiple processing buffers per task if admins desire.

- IndexerWorkerContext acquires processingBuffers when creating the FrameContext
  for a work order, and releases them when closing the FrameContext.

- Add "usesProcessingBuffers()" to FrameProcessorFactory so workers know
  how many sets of processing buffers are needed to run a given query.

Second, adjustments to how WorkerMemoryParameters slices up bundles, to
favor more memory for sorting and segment generation. Changes:

- Instead of using same-sized bundles for processing and for sorting,
  workers now use minimally-sized processing bundles (just enough to read
  inputs plus a little overhead). The rest is devoted to broadcast data
  buffering, sorting, and segment-building.

- Segment-building is now limited to 1 concurrent segment per work order.
  This allows each segment-building action to use more memory. Note that
  segment-building is internally multi-threaded to a degree. (Build and
  persist can run concurrently.)

- Simplify frame size calculations by removing the distinction between
  "standard" and "large" frames. The new default frame size is the same
  as the old "standard" frames, 1 MB. The original goal of of the large
  frames was to reduce the number of temporary files during sorting, but
  I think we can achieve the same thing by simply merging a larger number
  of standard frames at once.

- Remove the small worker adjustment that was added in apache#14117 to account
  for an extra frame involved in writing to durable storage. Instead,
  account for the extra frame whenever we are actually using durable storage.

- Cap super-sorter parallelism using the number of output partitions, rather
  than using a hard coded cap at 4. Note that in practice, so far, this cap
  has not been relevant for tasks because they have only been using a single
  processing thread anyway.

* Remove unused import.

* Fix errorprone annotation.

* Fixes for javadocs and inspections.

* Additional test coverage.

* Fix test.
@kfaraz kfaraz added the Backport label Oct 1, 2024
@kfaraz kfaraz added this to the 31.0.0 milestone Oct 1, 2024
@github-actions github-actions Bot added Area - Batch Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 labels Oct 1, 2024
@kfaraz kfaraz merged commit 23b9039 into apache:31.0.0 Oct 1, 2024
@kfaraz kfaraz deleted the backport_17057 branch October 1, 2024 14:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Area - Batch Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 Backport

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants