From 991a40a81e049580f878bb89431cf838e4b45623 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Tue, 6 Dec 2022 10:55:58 +0530 Subject: [PATCH 1/4] Docs: Add changes for batched segment allocation --- docs/configuration/index.md | 2 ++ docs/ingestion/tasks.md | 21 +++++++++++++++++++ docs/operations/metrics.md | 10 +++++++-- .../actions/SegmentAllocationQueue.java | 2 +- .../overlord/config/TaskLockConfig.java | 6 +++--- .../actions/SegmentAllocationQueueTest.java | 2 +- .../common/actions/TaskActionTestKit.java | 2 +- 7 files changed, 37 insertions(+), 8 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 9582bbd99b20..4ef44a0eafbe 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1112,6 +1112,8 @@ These Overlord static configurations can be defined in the `overlord/runtime.pro |`druid.indexer.storage.type`|Choices are "local" or "metadata". Indicates whether incoming tasks should be stored locally (in heap) or in metadata storage. "local" is mainly for internal testing while "metadata" is recommended in production because storing incoming tasks in metadata storage allows for tasks to be resumed if the Overlord should fail.|local| |`druid.indexer.storage.recentlyFinishedThreshold`|Duration of time to store task results. Default is 24 hours. If you have hundreds of tasks running in a day, consider increasing this threshold.|PT24H| |`druid.indexer.tasklock.forceTimeChunkLock`|_**Setting this to false is still experimental**_
If set, all tasks are enforced to use time chunk lock. If not set, each task automatically chooses a lock type to use. This configuration can be overwritten by setting `forceTimeChunkLock` in the [task context](../ingestion/tasks.md#context). See [Task Locking & Priority](../ingestion/tasks.md#context) for more details about locking in tasks.|true| +|`druid.indexer.tasklock.batchSegmentAllocation`| If set to true, segment allocate actions are performed in batches to improve the throughput and reduce the average `task/action/run/time`.|false| +|`druid.indexer.tasklock.batchAllocationWaitTime`|Milliseconds to wait between adding the first segment allocate action to a batch and executing that batch. The waiting time allows the batch to add more requests and thus improve the average segment allocation run time. This configuration takes effect only if `batchSegmentAllocation` is enabled.
This value should be decreased __only if__ there are failures while allocating segments due to metadata operations on a very large batch.|500| |`druid.indexer.task.default.context`|Default task context that is applied to all tasks submitted to the Overlord. Any default in this config does not override neither the context values the user provides nor `druid.indexer.tasklock.forceTimeChunkLock`.|empty context| |`druid.indexer.queue.maxSize`|Maximum number of active tasks at one time.|Integer.MAX_VALUE| |`druid.indexer.queue.startDelay`|Sleep this long before starting Overlord queue management. This can be useful to give a cluster time to re-orient itself after e.g. a widespread network issue.|PT1M| diff --git a/docs/ingestion/tasks.md b/docs/ingestion/tasks.md index c8a2e915d472..bffb215371b0 100644 --- a/docs/ingestion/tasks.md +++ b/docs/ingestion/tasks.md @@ -343,6 +343,27 @@ You can override the task priority by setting your priority in the task context "priority" : 100 } ``` + + +## Task actions + +These are various overlord actions performed by tasks during their lifecycle. Some typical actions are as follows: +- `lockAcquire`: acquires a time-chunk lock on an interval for the task +- `lockRelease`: releases a lock acquired by the task on an interval +- `segmentInsertion`: inserts segments into metadata store +- `segmentAllocate`: allocates pending segments to a task to write rows +- etc. + +### Batching `segmentAllocate` actions + +In a cluster with a large number of concurrent tasks (say > 1000), `segmentAllocate` actions on the overlord may take very long intervals of time to finish thus causing spikes in the `task/action/run/time`. This may result in lag building up while a task waits for a segment to get allocated. +The root causes of such spikes are: +- several tasks trying to allocate segments for the same datasource and interval +- large number of metadata calls made to the segments and pending segments tables +- concurrency limitations while acquiring a task lock required for allocating a segment + +Since the contention typically arises from tasks allocating segments for the same datasource and interval, the run times can be improved by batching the actions together. +Batched segment allocation can be enabled on the overlord by setting `druid.indexer.tasklock.batchSegmentAllocation=true`.See [overlord configuration](../configuration/index.md#overlord-operations) for more details. diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 8822f3fea73d..3b3dc904be25 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -230,8 +230,14 @@ Note: If the JVM does not support CPU time measurement for the current thread, ` |------|-----------|------------------------------------------------------------|------------| |`task/run/time`|Milliseconds taken to run a task.| `dataSource`, `taskId`, `taskType`, `taskStatus`|Varies| |`task/pending/time`|Milliseconds taken for a task to wait for running.| `dataSource`, `taskId`, `taskType`|Varies| -|`task/action/log/time`|Milliseconds taken to log a task action to the audit log.| `dataSource`, `taskId`, `taskType`|< 1000 (subsecond)| -|`task/action/run/time`|Milliseconds taken to execute a task action.| `dataSource`, `taskId`, `taskType`|Varies from subsecond to a few seconds, based on action type.| +|`task/action/log/time`|Milliseconds taken to log a task action to the audit log.| `dataSource`, `taskId`, `taskType`, `taskActionType`|< 1000 (subsecond)| +|`task/action/run/time`|Milliseconds taken to execute a task action.| `dataSource`, `taskId`, `taskType`, `taskActionType`|Varies from subsecond to a few seconds, based on action type.| +|`task/action/success/count`|Number of task actions that were executed successfully during the emission period. Currently only being emitted for batched `segmentAllocate` actions.| `dataSource`, `taskId`, `taskType`, `taskActionType`|Varies| +|`task/action/failed/count`|Number of task actions that failed during the emission period. Currently only being emitted for batched `segmentAllocate` actions.| `dataSource`, `taskId`, `taskType`, `taskActionType`|Varies| +|`task/action/batch/queueTime`|Milliseconds spent by a batch of task actions in queue. Currently only being emitted for batched `segmentAllocate` actions.| `dataSource`, `taskActionType`, `interval`|Varies based on the `batchAllocationWaitTime` and number of batches in queue.| +|`task/action/batch/runTime`|Milliseconds taken to execute a batch of task actions. Currently only being emitted for batched `segmentAllocate` actions.| `dataSource`, `taskActionType`, `interval`|Varies from subsecond to a few seconds, based on action type and batch size.| +|`task/action/batch/size`|Number of task actions in a batch that was executed during the emission period. Currently only being emitted for batched `segmentAllocate` actions.| `dataSource`, `taskActionType`, `interval`|Varies based on number of concurrent task actions.| +|`task/action/batch/attempts`|Number of execution attempts for a single batch of task actions. Currently only being emitted for batched `segmentAllocate` actions.| `dataSource`, `taskActionType`, `interval`|1 if there are no failures or retries.| |`segment/added/bytes`|Size in bytes of new segments created.| `dataSource`, `taskId`, `taskType`, `interval`|Varies| |`segment/moved/bytes`|Size in bytes of segments moved/archived via the Move Task.| `dataSource`, `taskId`, `taskType`, `interval`|Varies| |`segment/nuked/bytes`|Size in bytes of segments deleted via the Kill Task.| `dataSource`, `taskId`, `taskType`, `interval`|Varies| diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java index 9ed53d99faf5..ef8e27f1e132 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java @@ -94,7 +94,7 @@ public SegmentAllocationQueue( this.emitter = emitter; this.taskLockbox = taskLockbox; this.metadataStorage = metadataStorage; - this.maxWaitTimeMillis = taskLockConfig.getBatchAllocationMaxWaitTime(); + this.maxWaitTimeMillis = taskLockConfig.getBatchAllocationWaitTime(); this.executor = taskLockConfig.isBatchSegmentAllocation() ? executorFactory.create(1, "SegmentAllocQueue-%s") : null; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java index acbc318baabc..c860e4e1d156 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java @@ -34,7 +34,7 @@ public class TaskLockConfig private boolean batchSegmentAllocation = false; @JsonProperty - private long batchAllocationMaxWaitTime = 500L; + private long batchAllocationWaitTime = 500L; public boolean isForceTimeChunkLock() { @@ -46,8 +46,8 @@ public boolean isBatchSegmentAllocation() return batchSegmentAllocation; } - public long getBatchAllocationMaxWaitTime() + public long getBatchAllocationWaitTime() { - return batchAllocationMaxWaitTime; + return batchAllocationWaitTime; } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java index 536e9ffac24c..1c829ccb20ce 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java @@ -71,7 +71,7 @@ public boolean isBatchSegmentAllocation() } @Override - public long getBatchAllocationMaxWaitTime() + public long getBatchAllocationWaitTime() { return 0; } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java index 189d3fe8779c..eebf78a7ddcb 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java @@ -112,7 +112,7 @@ public boolean isBatchSegmentAllocation() } @Override - public long getBatchAllocationMaxWaitTime() + public long getBatchAllocationWaitTime() { return 10L; } From e34b3e9fdd8f8e29f73636ba3f708d19b4b66861 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Tue, 6 Dec 2022 13:31:35 +0530 Subject: [PATCH 2/4] Add limit on allocation batch size --- docs/configuration/index.md | 2 +- docs/ingestion/tasks.md | 8 ++-- docs/operations/metrics.md | 12 +++--- .../druid/indexing/common/TaskToolbox.java | 8 +++- .../actions/SegmentAllocationQueue.java | 42 +++++++++++++++---- .../actions/SegmentAllocationQueueTest.java | 17 ++++++++ 6 files changed, 69 insertions(+), 20 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 4ef44a0eafbe..12b8b1f4628e 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1112,7 +1112,7 @@ These Overlord static configurations can be defined in the `overlord/runtime.pro |`druid.indexer.storage.type`|Choices are "local" or "metadata". Indicates whether incoming tasks should be stored locally (in heap) or in metadata storage. "local" is mainly for internal testing while "metadata" is recommended in production because storing incoming tasks in metadata storage allows for tasks to be resumed if the Overlord should fail.|local| |`druid.indexer.storage.recentlyFinishedThreshold`|Duration of time to store task results. Default is 24 hours. If you have hundreds of tasks running in a day, consider increasing this threshold.|PT24H| |`druid.indexer.tasklock.forceTimeChunkLock`|_**Setting this to false is still experimental**_
If set, all tasks are enforced to use time chunk lock. If not set, each task automatically chooses a lock type to use. This configuration can be overwritten by setting `forceTimeChunkLock` in the [task context](../ingestion/tasks.md#context). See [Task Locking & Priority](../ingestion/tasks.md#context) for more details about locking in tasks.|true| -|`druid.indexer.tasklock.batchSegmentAllocation`| If set to true, segment allocate actions are performed in batches to improve the throughput and reduce the average `task/action/run/time`.|false| +|`druid.indexer.tasklock.batchSegmentAllocation`| If set to true, segment allocate actions are performed in batches to improve the throughput and reduce the average `task/action/run/time`. See [batching `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions) for details.|false| |`druid.indexer.tasklock.batchAllocationWaitTime`|Milliseconds to wait between adding the first segment allocate action to a batch and executing that batch. The waiting time allows the batch to add more requests and thus improve the average segment allocation run time. This configuration takes effect only if `batchSegmentAllocation` is enabled.
This value should be decreased __only if__ there are failures while allocating segments due to metadata operations on a very large batch.|500| |`druid.indexer.task.default.context`|Default task context that is applied to all tasks submitted to the Overlord. Any default in this config does not override neither the context values the user provides nor `druid.indexer.tasklock.forceTimeChunkLock`.|empty context| |`druid.indexer.queue.maxSize`|Maximum number of active tasks at one time.|Integer.MAX_VALUE| diff --git a/docs/ingestion/tasks.md b/docs/ingestion/tasks.md index bffb215371b0..6b211a055632 100644 --- a/docs/ingestion/tasks.md +++ b/docs/ingestion/tasks.md @@ -348,17 +348,17 @@ You can override the task priority by setting your priority in the task context ## Task actions These are various overlord actions performed by tasks during their lifecycle. Some typical actions are as follows: -- `lockAcquire`: acquires a time-chunk lock on an interval for the task +- `lockAcquire`: acquires a time-chunk lock on an interval for the task - `lockRelease`: releases a lock acquired by the task on an interval -- `segmentInsertion`: inserts segments into metadata store +- `segmentTransactionalInsert`: publishes new segments created by a task and optionally overwrites and/or drops existing segments in a single transaction - `segmentAllocate`: allocates pending segments to a task to write rows - etc. ### Batching `segmentAllocate` actions -In a cluster with a large number of concurrent tasks (say > 1000), `segmentAllocate` actions on the overlord may take very long intervals of time to finish thus causing spikes in the `task/action/run/time`. This may result in lag building up while a task waits for a segment to get allocated. +In a cluster with several concurrent tasks, `segmentAllocate` actions on the overlord may take very long intervals of time to finish thus causing spikes in the `task/action/run/time`. This may result in lag building up while a task waits for a segment to get allocated. The root causes of such spikes are: -- several tasks trying to allocate segments for the same datasource and interval +- several concurrent tasks trying to allocate segments for the same datasource and interval - large number of metadata calls made to the segments and pending segments tables - concurrency limitations while acquiring a task lock required for allocating a segment diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 3b3dc904be25..4e3c961dcb8d 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -232,12 +232,12 @@ Note: If the JVM does not support CPU time measurement for the current thread, ` |`task/pending/time`|Milliseconds taken for a task to wait for running.| `dataSource`, `taskId`, `taskType`|Varies| |`task/action/log/time`|Milliseconds taken to log a task action to the audit log.| `dataSource`, `taskId`, `taskType`, `taskActionType`|< 1000 (subsecond)| |`task/action/run/time`|Milliseconds taken to execute a task action.| `dataSource`, `taskId`, `taskType`, `taskActionType`|Varies from subsecond to a few seconds, based on action type.| -|`task/action/success/count`|Number of task actions that were executed successfully during the emission period. Currently only being emitted for batched `segmentAllocate` actions.| `dataSource`, `taskId`, `taskType`, `taskActionType`|Varies| -|`task/action/failed/count`|Number of task actions that failed during the emission period. Currently only being emitted for batched `segmentAllocate` actions.| `dataSource`, `taskId`, `taskType`, `taskActionType`|Varies| -|`task/action/batch/queueTime`|Milliseconds spent by a batch of task actions in queue. Currently only being emitted for batched `segmentAllocate` actions.| `dataSource`, `taskActionType`, `interval`|Varies based on the `batchAllocationWaitTime` and number of batches in queue.| -|`task/action/batch/runTime`|Milliseconds taken to execute a batch of task actions. Currently only being emitted for batched `segmentAllocate` actions.| `dataSource`, `taskActionType`, `interval`|Varies from subsecond to a few seconds, based on action type and batch size.| -|`task/action/batch/size`|Number of task actions in a batch that was executed during the emission period. Currently only being emitted for batched `segmentAllocate` actions.| `dataSource`, `taskActionType`, `interval`|Varies based on number of concurrent task actions.| -|`task/action/batch/attempts`|Number of execution attempts for a single batch of task actions. Currently only being emitted for batched `segmentAllocate` actions.| `dataSource`, `taskActionType`, `interval`|1 if there are no failures or retries.| +|`task/action/success/count`|Number of task actions that were executed successfully during the emission period. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskId`, `taskType`, `taskActionType`|Varies| +|`task/action/failed/count`|Number of task actions that failed during the emission period. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskId`, `taskType`, `taskActionType`|Varies| +|`task/action/batch/queueTime`|Milliseconds spent by a batch of task actions in queue. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskActionType`, `interval`|Varies based on the `batchAllocationWaitTime` and number of batches in queue.| +|`task/action/batch/runTime`|Milliseconds taken to execute a batch of task actions. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskActionType`, `interval`|Varies from subsecond to a few seconds, based on action type and batch size.| +|`task/action/batch/size`|Number of task actions in a batch that was executed during the emission period. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskActionType`, `interval`|Varies based on number of concurrent task actions.| +|`task/action/batch/attempts`|Number of execution attempts for a single batch of task actions. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskActionType`, `interval`|1 if there are no failures or retries.| |`segment/added/bytes`|Size in bytes of new segments created.| `dataSource`, `taskId`, `taskType`, `interval`|Varies| |`segment/moved/bytes`|Size in bytes of segments moved/archived via the Move Task.| `dataSource`, `taskId`, `taskType`, `interval`|Varies| |`segment/nuked/bytes`|Size in bytes of segments deleted via the Kill Task.| `dataSource`, `taskId`, `taskType`, `interval`|Varies| diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java index 897db9750354..03a4849e602e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java @@ -32,7 +32,7 @@ import org.apache.druid.discovery.DataNodeService; import org.apache.druid.discovery.DruidNodeAnnouncer; import org.apache.druid.discovery.LookupNodeService; -import org.apache.druid.indexing.common.actions.SegmentInsertAction; +import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClientProvider; @@ -331,7 +331,11 @@ public void publishSegments(Iterable segments) throws IOException DataSegment::getInterval ); for (final Collection segmentCollection : segmentMultimap.asMap().values()) { - getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.copyOf(segmentCollection))); + getTaskActionClient().submit( + SegmentTransactionalInsertAction.appendAction( + ImmutableSet.copyOf(segmentCollection), null, null + ) + ); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java index ef8e27f1e132..d60149d6245c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java @@ -70,6 +70,7 @@ public class SegmentAllocationQueue private static final Logger log = new Logger(SegmentAllocationQueue.class); private static final int MAX_QUEUE_SIZE = 2000; + private static final int MAX_BATCH_SIZE = 500; private final long maxWaitTimeMillis; @@ -173,7 +174,7 @@ public Future add(SegmentAllocateRequest request) throw new ISE("Batched segment allocation is disabled."); } - final AllocateRequestKey requestKey = new AllocateRequestKey(request, maxWaitTimeMillis); + final AllocateRequestKey requestKey = getKeyForAvailableBatch(request); final AtomicReference> futureReference = new AtomicReference<>(); // Possible race condition: @@ -198,6 +199,24 @@ public Future add(SegmentAllocateRequest request) return futureReference.get(); } + /** + * Returns the key for a batch that is not added to the queue yet and/or has + * available space. Throws an exception if the queue is already full and no + * batch has available capacity. + */ + private AllocateRequestKey getKeyForAvailableBatch(SegmentAllocateRequest request) + { + for (int batchIncrementalId = 0; batchIncrementalId < MAX_QUEUE_SIZE; ++batchIncrementalId) { + AllocateRequestKey nextKey = new AllocateRequestKey(request, maxWaitTimeMillis, batchIncrementalId); + AllocateRequestBatch nextBatch = keyToBatch.get(nextKey); + if (nextBatch == null || nextBatch.size() < MAX_BATCH_SIZE) { + return nextKey; + } + } + + throw new ISE("Allocation queue is at capacity, all batches are full."); + } + /** * Tries to add the given batch to the processing queue. Fails all the pending * requests in the batch if we are not leader or if the queue is full. @@ -616,6 +635,11 @@ synchronized int size() */ private static class AllocateRequestKey { + /** + * ID to distinguish between two batches for the same datasource, groupId, etc. + */ + private final int batchIncrementalId; + private long queueTimeMillis; private final long maxWaitTimeMillis; @@ -635,11 +659,12 @@ private static class AllocateRequestKey * Creates a new key for the given request. The batch for a unique key will * always contain a single request. */ - AllocateRequestKey(SegmentAllocateRequest request, long maxWaitTimeMillis) + AllocateRequestKey(SegmentAllocateRequest request, long maxWaitTimeMillis, int batchIncrementalId) { final SegmentAllocateAction action = request.getAction(); final Task task = request.getTask(); + this.batchIncrementalId = batchIncrementalId; this.dataSource = action.getDataSource(); this.groupId = task.getGroupId(); this.skipSegmentLineageCheck = action.isSkipSegmentLineageCheck(); @@ -651,10 +676,11 @@ private static class AllocateRequestKey .bucket(action.getTimestamp()); this.hash = Objects.hash( - skipSegmentLineageCheck, - useNonRootGenPartitionSpace, dataSource, groupId, + batchIncrementalId, + skipSegmentLineageCheck, + useNonRootGenPartitionSpace, preferredAllocationInterval, lockGranularity ); @@ -687,10 +713,11 @@ public boolean equals(Object o) return false; } AllocateRequestKey that = (AllocateRequestKey) o; - return skipSegmentLineageCheck == that.skipSegmentLineageCheck - && useNonRootGenPartitionSpace == that.useNonRootGenPartitionSpace - && dataSource.equals(that.dataSource) + return dataSource.equals(that.dataSource) && groupId.equals(that.groupId) + && batchIncrementalId == that.batchIncrementalId + && skipSegmentLineageCheck == that.skipSegmentLineageCheck + && useNonRootGenPartitionSpace == that.useNonRootGenPartitionSpace && preferredAllocationInterval.equals(that.preferredAllocationInterval) && lockGranularity == that.lockGranularity; } @@ -707,6 +734,7 @@ public String toString() return "{" + "ds='" + dataSource + '\'' + ", gr='" + groupId + '\'' + + ", incId=" + batchIncrementalId + ", lock=" + lockGranularity + ", invl=" + preferredAllocationInterval + ", slc=" + skipSegmentLineageCheck + diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java index 1c829ccb20ce..974b3096f92d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java @@ -249,6 +249,23 @@ public void testFullAllocationQueue() ); } + @Test + public void testMaxBatchSize() + { + for (int i = 0; i < 500; ++i) { + SegmentAllocateRequest request = + allocateRequest().forTask(createTask(DS_WIKI, "group_1")).build(); + allocationQueue.add(request); + } + + // Verify that next request is added to a new batch + Assert.assertEquals(1, allocationQueue.size()); + SegmentAllocateRequest request = + allocateRequest().forTask(createTask(DS_WIKI, "group_1")).build(); + allocationQueue.add(request); + Assert.assertEquals(2, allocationQueue.size()); + } + @Test public void testMultipleRequestsForSameSegment() { From f3308da6d46ee5cc27d1de2c58f38279ed734791 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Tue, 6 Dec 2022 13:43:43 +0530 Subject: [PATCH 3/4] Minor fixes to docs --- docs/configuration/index.md | 2 +- docs/ingestion/tasks.md | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 12b8b1f4628e..c8cec84bd664 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1113,7 +1113,7 @@ These Overlord static configurations can be defined in the `overlord/runtime.pro |`druid.indexer.storage.recentlyFinishedThreshold`|Duration of time to store task results. Default is 24 hours. If you have hundreds of tasks running in a day, consider increasing this threshold.|PT24H| |`druid.indexer.tasklock.forceTimeChunkLock`|_**Setting this to false is still experimental**_
If set, all tasks are enforced to use time chunk lock. If not set, each task automatically chooses a lock type to use. This configuration can be overwritten by setting `forceTimeChunkLock` in the [task context](../ingestion/tasks.md#context). See [Task Locking & Priority](../ingestion/tasks.md#context) for more details about locking in tasks.|true| |`druid.indexer.tasklock.batchSegmentAllocation`| If set to true, segment allocate actions are performed in batches to improve the throughput and reduce the average `task/action/run/time`. See [batching `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions) for details.|false| -|`druid.indexer.tasklock.batchAllocationWaitTime`|Milliseconds to wait between adding the first segment allocate action to a batch and executing that batch. The waiting time allows the batch to add more requests and thus improve the average segment allocation run time. This configuration takes effect only if `batchSegmentAllocation` is enabled.
This value should be decreased __only if__ there are failures while allocating segments due to metadata operations on a very large batch.|500| +|`druid.indexer.tasklock.batchAllocationWaitTime`|Milliseconds to wait between adding the first segment allocate action to a batch and executing that batch. The waiting time allows the batch to add more requests and thus improve the average segment allocation run time. This configuration takes effect only if `batchSegmentAllocation` is enabled.|500| |`druid.indexer.task.default.context`|Default task context that is applied to all tasks submitted to the Overlord. Any default in this config does not override neither the context values the user provides nor `druid.indexer.tasklock.forceTimeChunkLock`.|empty context| |`druid.indexer.queue.maxSize`|Maximum number of active tasks at one time.|Integer.MAX_VALUE| |`druid.indexer.queue.startDelay`|Sleep this long before starting Overlord queue management. This can be useful to give a cluster time to re-orient itself after e.g. a widespread network issue.|PT1M| diff --git a/docs/ingestion/tasks.md b/docs/ingestion/tasks.md index 6b211a055632..7757b4d1f928 100644 --- a/docs/ingestion/tasks.md +++ b/docs/ingestion/tasks.md @@ -350,15 +350,15 @@ You can override the task priority by setting your priority in the task context These are various overlord actions performed by tasks during their lifecycle. Some typical actions are as follows: - `lockAcquire`: acquires a time-chunk lock on an interval for the task - `lockRelease`: releases a lock acquired by the task on an interval -- `segmentTransactionalInsert`: publishes new segments created by a task and optionally overwrites and/or drops existing segments in a single transaction +- `segmentTransactionalInsert`: publishes new segments created by a task and optionally overwrites and/or drops existing segments in a single transaction - `segmentAllocate`: allocates pending segments to a task to write rows - etc. ### Batching `segmentAllocate` actions -In a cluster with several concurrent tasks, `segmentAllocate` actions on the overlord may take very long intervals of time to finish thus causing spikes in the `task/action/run/time`. This may result in lag building up while a task waits for a segment to get allocated. +In a cluster with several concurrent tasks, `segmentAllocate` actions on the overlord may take very long intervals of time to finish thus causing spikes in the `task/action/run/time`. This may result in ingestion lag building up while a task waits for a segment to get allocated. The root causes of such spikes are: -- several concurrent tasks trying to allocate segments for the same datasource and interval +- several concurrent tasks trying to allocate segments for the same datasource and interval - large number of metadata calls made to the segments and pending segments tables - concurrency limitations while acquiring a task lock required for allocating a segment From f58cc1e07b963de3f896d432ec2c170dcc471092 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Tue, 6 Dec 2022 21:58:47 +0530 Subject: [PATCH 4/4] Apply suggestions from code review Co-authored-by: Jill Osborne --- docs/configuration/index.md | 4 ++-- docs/ingestion/tasks.md | 11 +++++------ 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index c8cec84bd664..a14b3beb56c7 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1112,8 +1112,8 @@ These Overlord static configurations can be defined in the `overlord/runtime.pro |`druid.indexer.storage.type`|Choices are "local" or "metadata". Indicates whether incoming tasks should be stored locally (in heap) or in metadata storage. "local" is mainly for internal testing while "metadata" is recommended in production because storing incoming tasks in metadata storage allows for tasks to be resumed if the Overlord should fail.|local| |`druid.indexer.storage.recentlyFinishedThreshold`|Duration of time to store task results. Default is 24 hours. If you have hundreds of tasks running in a day, consider increasing this threshold.|PT24H| |`druid.indexer.tasklock.forceTimeChunkLock`|_**Setting this to false is still experimental**_
If set, all tasks are enforced to use time chunk lock. If not set, each task automatically chooses a lock type to use. This configuration can be overwritten by setting `forceTimeChunkLock` in the [task context](../ingestion/tasks.md#context). See [Task Locking & Priority](../ingestion/tasks.md#context) for more details about locking in tasks.|true| -|`druid.indexer.tasklock.batchSegmentAllocation`| If set to true, segment allocate actions are performed in batches to improve the throughput and reduce the average `task/action/run/time`. See [batching `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions) for details.|false| -|`druid.indexer.tasklock.batchAllocationWaitTime`|Milliseconds to wait between adding the first segment allocate action to a batch and executing that batch. The waiting time allows the batch to add more requests and thus improve the average segment allocation run time. This configuration takes effect only if `batchSegmentAllocation` is enabled.|500| +|`druid.indexer.tasklock.batchSegmentAllocation`| If set to true, Druid performs segment allocate actions in batches to improve throughput and reduce the average `task/action/run/time`. See [batching `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions) for details.|false| +|`druid.indexer.tasklock.batchAllocationWaitTime`|Number of milliseconds after Druid adds the first segment allocate action to a batch, until it executes the batch. Allows the batch to add more requests and improve the average segment allocation run time. This configuration takes effect only if `batchSegmentAllocation` is enabled.|500| |`druid.indexer.task.default.context`|Default task context that is applied to all tasks submitted to the Overlord. Any default in this config does not override neither the context values the user provides nor `druid.indexer.tasklock.forceTimeChunkLock`.|empty context| |`druid.indexer.queue.maxSize`|Maximum number of active tasks at one time.|Integer.MAX_VALUE| |`druid.indexer.queue.startDelay`|Sleep this long before starting Overlord queue management. This can be useful to give a cluster time to re-orient itself after e.g. a widespread network issue.|PT1M| diff --git a/docs/ingestion/tasks.md b/docs/ingestion/tasks.md index 7757b4d1f928..5afbadb3d43a 100644 --- a/docs/ingestion/tasks.md +++ b/docs/ingestion/tasks.md @@ -347,23 +347,22 @@ You can override the task priority by setting your priority in the task context ## Task actions -These are various overlord actions performed by tasks during their lifecycle. Some typical actions are as follows: +Task actions are overlord actions performed by tasks during their lifecycle. Some typical task actions are: - `lockAcquire`: acquires a time-chunk lock on an interval for the task - `lockRelease`: releases a lock acquired by the task on an interval - `segmentTransactionalInsert`: publishes new segments created by a task and optionally overwrites and/or drops existing segments in a single transaction - `segmentAllocate`: allocates pending segments to a task to write rows -- etc. ### Batching `segmentAllocate` actions -In a cluster with several concurrent tasks, `segmentAllocate` actions on the overlord may take very long intervals of time to finish thus causing spikes in the `task/action/run/time`. This may result in ingestion lag building up while a task waits for a segment to get allocated. -The root causes of such spikes are: +In a cluster with several concurrent tasks, `segmentAllocate` actions on the overlord can take a long time to finish, causing spikes in the `task/action/run/time`. This can result in ingestion lag building up while a task waits for a segment to be allocated. +The root cause of such spikes is likely to be one or more of the following: - several concurrent tasks trying to allocate segments for the same datasource and interval - large number of metadata calls made to the segments and pending segments tables - concurrency limitations while acquiring a task lock required for allocating a segment -Since the contention typically arises from tasks allocating segments for the same datasource and interval, the run times can be improved by batching the actions together. -Batched segment allocation can be enabled on the overlord by setting `druid.indexer.tasklock.batchSegmentAllocation=true`.See [overlord configuration](../configuration/index.md#overlord-operations) for more details. +Since the contention typically arises from tasks allocating segments for the same datasource and interval, you can improve the run times by batching the actions together. +To enable batched segment allocation on the overlord, set `druid.indexer.tasklock.batchSegmentAllocation` to `true`. See [overlord configuration](../configuration/index.md#overlord-operations) for more details.