Skip to content

Make SegmentAllocationQueue multithreaded#18098

Merged
gianm merged 6 commits intoapache:masterfrom
kfaraz:seg_alloc_queue_threads
Jun 11, 2025
Merged

Make SegmentAllocationQueue multithreaded#18098
gianm merged 6 commits intoapache:masterfrom
kfaraz:seg_alloc_queue_threads

Conversation

@kfaraz
Copy link
Copy Markdown
Contributor

@kfaraz kfaraz commented Jun 9, 2025

Description

Follow up to #17390

Once we start maintaining a TaskLockbox for each datasource, the single-threaded design of the SegmentAllocationQueue would become the bottleneck.

This patch makes SegmentAllocationQueue multithreaded so that allocation for multiple datasources
can happen in parallel.

Non-batch segment allocation is already multithreaded as each allocation runs on its individual jetty thread.

Changes

  • Add config druid.indexer.tasklock.batchAllocationNumThreads with default value 5
  • Use worker threads in SegmentAllocationQueue to perform segment allocation
  • Add a manager thread which polls the allocation queue and submits jobs to workers
  • Skip a job if another job for the same datasource is already in progress
  • Emit metric task/action/batch/submitted for the count of submitted jobs
  • Emit metric task/action/batch/skipped for the count of skipped jobs

Release note

Add config druid.indexer.tasklock.batchAllocationNumThreads with default value 5 to control the number of
segment allocation threads. This allows for concurrent segment allocations if there are segment allocations
happening for several different datasources.

Note that setting this config to a very large value will put undue strain on the metadata store and only hamper performance.


This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

private boolean batchAllocationReduceMetadataIO = true;

@JsonProperty
private int batchAllocationNumThreads = 5;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be documented.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

/**
* Thread-safe list of datasources for which a segment allocation is currently in-progress.
*/
private final List<String> runningDatasources = Collections.synchronizedList(new ArrayList<>());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

contains and remove run on this list- could it be a Set?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

final String dataSource = nextBatch.key.dataSource;
if (nextBatch.isDue()) {
if (runningDatasources.contains(dataSource)) {
// Skip this batch as another batch for the same datasource is in progress
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this cause a busy loop where we keep retrying this skipped batch over and over? I would think since it remains in processingQueue, we'll do a scheduleQueuePoll at the end of this function, and the default maxWaitTimeMilis is zero (meaning another immediate poll).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thanks for catching this! This was a mental TODO but I forgot to mark it.
I will check what we can do here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. Added a small delay of 5 millis in case anything was skipped or if all threads are busy.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I think this would still lead to a ton of task/action/batch/skipped metrics being emitted if we have a long-running allocation in flight with another queued up. Fixing that by extending the min wait time would be bad, because that would slow down our responsiveness to allocation requests. Delays are undesirable anyway- we want everything to be as reactive as possible.

Is there an alternate approach you could go with? Maybe when we skip a batch, put it into a separate data structure keyed by datasource. Then when the current batch finishes, the worker thread running that batch could move the skipped batches back to the main queue.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion, @gianm!

I have updated the approach in the PR but with some modifications that seemed to adhere
to the current design of the class better.

  • Remove the delay
  • When skipping a batch, mark it as "skipped" and emit the metric. Do not emit metric again if already skipped.
  • Do not reschedule queue poll if all workers are busy OR if queue is empty OR if all batches were skipped.
  • When a worker finishes, schedule a queue poll.

// All remaining entries in the queue were skipped
log.debug("Not scheduling again since datasources are already being processed.");
} else if (processingQueue.isEmpty()) {
log.debug("Not scheduling again since queue is empty.");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would be caught by the previous line -- if the queue is empty, numSkippedBatches and processingQueue.size() are both zero, and 0 >= 0. Consider collapsing them both into a block like Not scheduling again since there are no eligible batches (skipped[%d])

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Collapsed the condition but still retained the check on processingQueue.isEmpty() since it makes the compiler happy.

Otherwise, it warns about a potential NPE.

Screenshot 2025-06-11 at 9 15 08 PM

@gianm gianm merged commit 608abc6 into apache:master Jun 11, 2025
134 of 138 checks passed
jtuglu1 pushed a commit to jtuglu1/druid that referenced this pull request Jun 17, 2025
* Make SegmentAllocationQueue multithreaded

* Do not run multiple jobs for the same datasource

* Add docs, min schedule delay to avoid busy waiting

* Trigger queue poll when worker finishes

* Emit skip metric once per queued batch

* Simplify scheduling condition
@capistrant capistrant added this to the 34.0.0 milestone Jul 22, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants