Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1112,6 +1112,8 @@ These Overlord static configurations can be defined in the `overlord/runtime.pro
|`druid.indexer.storage.type`|Choices are "local" or "metadata". Indicates whether incoming tasks should be stored locally (in heap) or in metadata storage. "local" is mainly for internal testing while "metadata" is recommended in production because storing incoming tasks in metadata storage allows for tasks to be resumed if the Overlord should fail.|local|
|`druid.indexer.storage.recentlyFinishedThreshold`|Duration of time to store task results. Default is 24 hours. If you have hundreds of tasks running in a day, consider increasing this threshold.|PT24H|
|`druid.indexer.tasklock.forceTimeChunkLock`|_**Setting this to false is still experimental**_<br/> If set, all tasks are enforced to use time chunk lock. If not set, each task automatically chooses a lock type to use. This configuration can be overwritten by setting `forceTimeChunkLock` in the [task context](../ingestion/tasks.md#context). See [Task Locking & Priority](../ingestion/tasks.md#context) for more details about locking in tasks.|true|
|`druid.indexer.tasklock.batchSegmentAllocation`| If set to true, Druid performs segment allocate actions in batches to improve throughput and reduce the average `task/action/run/time`. See [batching `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions) for details.|false|
|`druid.indexer.tasklock.batchAllocationWaitTime`|Number of milliseconds after Druid adds the first segment allocate action to a batch, until it executes the batch. Allows the batch to add more requests and improve the average segment allocation run time. This configuration takes effect only if `batchSegmentAllocation` is enabled.|500|
|`druid.indexer.task.default.context`|Default task context that is applied to all tasks submitted to the Overlord. Any default in this config does not override neither the context values the user provides nor `druid.indexer.tasklock.forceTimeChunkLock`.|empty context|
|`druid.indexer.queue.maxSize`|Maximum number of active tasks at one time.|Integer.MAX_VALUE|
|`druid.indexer.queue.startDelay`|Sleep this long before starting Overlord queue management. This can be useful to give a cluster time to re-orient itself after e.g. a widespread network issue.|PT1M|
Expand Down
20 changes: 20 additions & 0 deletions docs/ingestion/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,26 @@ You can override the task priority by setting your priority in the task context
"priority" : 100
}
```
<a name="actions"></a>

## Task actions

Task actions are overlord actions performed by tasks during their lifecycle. Some typical task actions are:
- `lockAcquire`: acquires a time-chunk lock on an interval for the task
- `lockRelease`: releases a lock acquired by the task on an interval
- `segmentTransactionalInsert`: publishes new segments created by a task and optionally overwrites and/or drops existing segments in a single transaction
- `segmentAllocate`: allocates pending segments to a task to write rows

### Batching `segmentAllocate` actions

In a cluster with several concurrent tasks, `segmentAllocate` actions on the overlord can take a long time to finish, causing spikes in the `task/action/run/time`. This can result in ingestion lag building up while a task waits for a segment to be allocated.
The root cause of such spikes is likely to be one or more of the following:
- several concurrent tasks trying to allocate segments for the same datasource and interval
- large number of metadata calls made to the segments and pending segments tables
- concurrency limitations while acquiring a task lock required for allocating a segment

Since the contention typically arises from tasks allocating segments for the same datasource and interval, you can improve the run times by batching the actions together.
To enable batched segment allocation on the overlord, set `druid.indexer.tasklock.batchSegmentAllocation` to `true`. See [overlord configuration](../configuration/index.md#overlord-operations) for more details.

<a name="context"></a>

Expand Down
10 changes: 8 additions & 2 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,14 @@ Note: If the JVM does not support CPU time measurement for the current thread, `
|------|-----------|------------------------------------------------------------|------------|
|`task/run/time`|Milliseconds taken to run a task.| `dataSource`, `taskId`, `taskType`, `taskStatus`|Varies|
|`task/pending/time`|Milliseconds taken for a task to wait for running.| `dataSource`, `taskId`, `taskType`|Varies|
|`task/action/log/time`|Milliseconds taken to log a task action to the audit log.| `dataSource`, `taskId`, `taskType`|< 1000 (subsecond)|
|`task/action/run/time`|Milliseconds taken to execute a task action.| `dataSource`, `taskId`, `taskType`|Varies from subsecond to a few seconds, based on action type.|
|`task/action/log/time`|Milliseconds taken to log a task action to the audit log.| `dataSource`, `taskId`, `taskType`, `taskActionType`|< 1000 (subsecond)|
|`task/action/run/time`|Milliseconds taken to execute a task action.| `dataSource`, `taskId`, `taskType`, `taskActionType`|Varies from subsecond to a few seconds, based on action type.|
|`task/action/success/count`|Number of task actions that were executed successfully during the emission period. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskId`, `taskType`, `taskActionType`|Varies|
|`task/action/failed/count`|Number of task actions that failed during the emission period. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskId`, `taskType`, `taskActionType`|Varies|
|`task/action/batch/queueTime`|Milliseconds spent by a batch of task actions in queue. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskActionType`, `interval`|Varies based on the `batchAllocationWaitTime` and number of batches in queue.|
|`task/action/batch/runTime`|Milliseconds taken to execute a batch of task actions. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskActionType`, `interval`|Varies from subsecond to a few seconds, based on action type and batch size.|
|`task/action/batch/size`|Number of task actions in a batch that was executed during the emission period. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskActionType`, `interval`|Varies based on number of concurrent task actions.|
|`task/action/batch/attempts`|Number of execution attempts for a single batch of task actions. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskActionType`, `interval`|1 if there are no failures or retries.|
|`segment/added/bytes`|Size in bytes of new segments created.| `dataSource`, `taskId`, `taskType`, `interval`|Varies|
|`segment/moved/bytes`|Size in bytes of segments moved/archived via the Move Task.| `dataSource`, `taskId`, `taskType`, `interval`|Varies|
|`segment/nuked/bytes`|Size in bytes of segments deleted via the Kill Task.| `dataSource`, `taskId`, `taskType`, `interval`|Varies|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.DruidNodeAnnouncer;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.indexing.common.actions.SegmentInsertAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClientProvider;
Expand Down Expand Up @@ -331,7 +331,11 @@ public void publishSegments(Iterable<DataSegment> segments) throws IOException
DataSegment::getInterval
);
for (final Collection<DataSegment> segmentCollection : segmentMultimap.asMap().values()) {
getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.copyOf(segmentCollection)));
getTaskActionClient().submit(
SegmentTransactionalInsertAction.appendAction(
ImmutableSet.copyOf(segmentCollection), null, null
)
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class SegmentAllocationQueue
private static final Logger log = new Logger(SegmentAllocationQueue.class);

private static final int MAX_QUEUE_SIZE = 2000;
private static final int MAX_BATCH_SIZE = 500;

private final long maxWaitTimeMillis;

Expand All @@ -94,7 +95,7 @@ public SegmentAllocationQueue(
this.emitter = emitter;
this.taskLockbox = taskLockbox;
this.metadataStorage = metadataStorage;
this.maxWaitTimeMillis = taskLockConfig.getBatchAllocationMaxWaitTime();
this.maxWaitTimeMillis = taskLockConfig.getBatchAllocationWaitTime();

this.executor = taskLockConfig.isBatchSegmentAllocation()
? executorFactory.create(1, "SegmentAllocQueue-%s") : null;
Expand Down Expand Up @@ -173,7 +174,7 @@ public Future<SegmentIdWithShardSpec> add(SegmentAllocateRequest request)
throw new ISE("Batched segment allocation is disabled.");
}

final AllocateRequestKey requestKey = new AllocateRequestKey(request, maxWaitTimeMillis);
final AllocateRequestKey requestKey = getKeyForAvailableBatch(request);
final AtomicReference<Future<SegmentIdWithShardSpec>> futureReference = new AtomicReference<>();

// Possible race condition:
Expand All @@ -198,6 +199,24 @@ public Future<SegmentIdWithShardSpec> add(SegmentAllocateRequest request)
return futureReference.get();
}

/**
* Returns the key for a batch that is not added to the queue yet and/or has
* available space. Throws an exception if the queue is already full and no
* batch has available capacity.
*/
private AllocateRequestKey getKeyForAvailableBatch(SegmentAllocateRequest request)
{
for (int batchIncrementalId = 0; batchIncrementalId < MAX_QUEUE_SIZE; ++batchIncrementalId) {
AllocateRequestKey nextKey = new AllocateRequestKey(request, maxWaitTimeMillis, batchIncrementalId);
AllocateRequestBatch nextBatch = keyToBatch.get(nextKey);
if (nextBatch == null || nextBatch.size() < MAX_BATCH_SIZE) {
return nextKey;
}
}

throw new ISE("Allocation queue is at capacity, all batches are full.");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is MAX_QUEUE_SIZE measured in batches or allocations? What happens if it gets hit? How likely is it that we will hit it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MAX_QUEUE_SIZE applies to batches. I am using it in the loop above because that is the maximum possible number of batches, even for the same datasource, group id, etc

It is extremely unlikely to hit the max queue limit. It can happen if say, we have 2000 datasources, all doing streaming ingestion concurrently. If the queue is full, new incoming allocate actions are rejected.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Sounds good to me.

}

/**
* Tries to add the given batch to the processing queue. Fails all the pending
* requests in the batch if we are not leader or if the queue is full.
Expand Down Expand Up @@ -616,6 +635,11 @@ synchronized int size()
*/
private static class AllocateRequestKey
{
/**
* ID to distinguish between two batches for the same datasource, groupId, etc.
*/
private final int batchIncrementalId;

private long queueTimeMillis;
private final long maxWaitTimeMillis;

Expand All @@ -635,11 +659,12 @@ private static class AllocateRequestKey
* Creates a new key for the given request. The batch for a unique key will
* always contain a single request.
*/
AllocateRequestKey(SegmentAllocateRequest request, long maxWaitTimeMillis)
AllocateRequestKey(SegmentAllocateRequest request, long maxWaitTimeMillis, int batchIncrementalId)
{
final SegmentAllocateAction action = request.getAction();
final Task task = request.getTask();

this.batchIncrementalId = batchIncrementalId;
this.dataSource = action.getDataSource();
this.groupId = task.getGroupId();
this.skipSegmentLineageCheck = action.isSkipSegmentLineageCheck();
Expand All @@ -651,10 +676,11 @@ private static class AllocateRequestKey
.bucket(action.getTimestamp());

this.hash = Objects.hash(
skipSegmentLineageCheck,
useNonRootGenPartitionSpace,
dataSource,
groupId,
batchIncrementalId,
skipSegmentLineageCheck,
useNonRootGenPartitionSpace,
preferredAllocationInterval,
lockGranularity
);
Expand Down Expand Up @@ -687,10 +713,11 @@ public boolean equals(Object o)
return false;
}
AllocateRequestKey that = (AllocateRequestKey) o;
return skipSegmentLineageCheck == that.skipSegmentLineageCheck
&& useNonRootGenPartitionSpace == that.useNonRootGenPartitionSpace
&& dataSource.equals(that.dataSource)
return dataSource.equals(that.dataSource)
&& groupId.equals(that.groupId)
&& batchIncrementalId == that.batchIncrementalId
&& skipSegmentLineageCheck == that.skipSegmentLineageCheck
&& useNonRootGenPartitionSpace == that.useNonRootGenPartitionSpace
&& preferredAllocationInterval.equals(that.preferredAllocationInterval)
&& lockGranularity == that.lockGranularity;
}
Expand All @@ -707,6 +734,7 @@ public String toString()
return "{" +
"ds='" + dataSource + '\'' +
", gr='" + groupId + '\'' +
", incId=" + batchIncrementalId +
", lock=" + lockGranularity +
", invl=" + preferredAllocationInterval +
", slc=" + skipSegmentLineageCheck +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class TaskLockConfig
private boolean batchSegmentAllocation = false;

@JsonProperty
private long batchAllocationMaxWaitTime = 500L;
private long batchAllocationWaitTime = 500L;

public boolean isForceTimeChunkLock()
{
Expand All @@ -46,8 +46,8 @@ public boolean isBatchSegmentAllocation()
return batchSegmentAllocation;
}

public long getBatchAllocationMaxWaitTime()
public long getBatchAllocationWaitTime()
{
return batchAllocationMaxWaitTime;
return batchAllocationWaitTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public boolean isBatchSegmentAllocation()
}

@Override
public long getBatchAllocationMaxWaitTime()
public long getBatchAllocationWaitTime()
{
return 0;
}
Expand Down Expand Up @@ -249,6 +249,23 @@ public void testFullAllocationQueue()
);
}

@Test
public void testMaxBatchSize()
{
for (int i = 0; i < 500; ++i) {
SegmentAllocateRequest request =
allocateRequest().forTask(createTask(DS_WIKI, "group_1")).build();
allocationQueue.add(request);
}

// Verify that next request is added to a new batch
Assert.assertEquals(1, allocationQueue.size());
SegmentAllocateRequest request =
allocateRequest().forTask(createTask(DS_WIKI, "group_1")).build();
allocationQueue.add(request);
Assert.assertEquals(2, allocationQueue.size());
}

@Test
public void testMultipleRequestsForSameSegment()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public boolean isBatchSegmentAllocation()
}

@Override
public long getBatchAllocationMaxWaitTime()
public long getBatchAllocationWaitTime()
{
return 10L;
}
Expand Down