Faster batch segment allocation by reducing metadata IO#17420
Faster batch segment allocation by reducing metadata IO#17420AmatyaAvadhanula wants to merge 4 commits intoapache:masterfrom
Conversation
| private long batchAllocationWaitTime = 0L; | ||
|
|
||
| @JsonProperty | ||
| private boolean segmentAllocationReduceMetadataIO = false; |
There was a problem hiding this comment.
This is used only for batch segment allocation, IIUC. Let's rename it to batchAllocationReduceMetadataIO.
Once this has been tested thoroughly, we can remove the flag altogether and use the new approach for both regular and batch allocation.
There was a problem hiding this comment.
Are you against its usage in the normal flow with the same feature flag?
It's behind a flag anyway so I don't think there's a need to not make the changes there.
| boolean skipSegmentLineageCheck, | ||
| Collection<SegmentAllocationHolder> holders | ||
| Collection<SegmentAllocationHolder> holders, | ||
| boolean skipSegmentPayloadFetchForAllocation |
There was a problem hiding this comment.
Please rename this argument everywhere.
You can either call it reduceMetadataIO same as the config or fetchRequiredSegmentsOnly or something.
| boolean skipSegmentPayloadFetchForAllocation | |
| boolean reduceMetadataIO |
| private final ConcurrentHashMap<AllocateRequestKey, AllocateRequestBatch> keyToBatch = new ConcurrentHashMap<>(); | ||
| private final BlockingDeque<AllocateRequestBatch> processingQueue = new LinkedBlockingDeque<>(MAX_QUEUE_SIZE); | ||
|
|
||
| private final boolean skipSegmentPayloadFetchForAllocation; |
There was a problem hiding this comment.
Please rename as suggested.
| } | ||
|
|
||
| @Test | ||
| @Ignore |
There was a problem hiding this comment.
Let's remove this test if we are not running it as a UT. If it is meant to be a benchmark, please put it in a Benchmark class and share the details of a some sample run in the PR description.
There was a problem hiding this comment.
Understood, will remove it.
| @Override | ||
| public boolean isSegmentAllocationReduceMetadataIO() | ||
| { | ||
| return true; |
There was a problem hiding this comment.
Would be nice to run the tests in this class for both true and false and not just true.
There was a problem hiding this comment.
Will do this
| ); | ||
| } | ||
|
|
||
| // Populate the required segment info |
There was a problem hiding this comment.
| // Populate the required segment info | |
| // Create dummy segments for each segmentId with only the shard spec populated |
| Segments.ONLY_VISIBLE | ||
| ) | ||
| ); | ||
| return metadataStorage.getSegmentTimelineForAllocation( |
There was a problem hiding this comment.
Is it inefficient here if skipSegmentPayloadFetchForAllocation is true? We are getting segments from retrieveUsedSegmentsForAllocation then creating a timeline via SegmentTimeline.forSegments and then getting segments back again via findNonOvershadowedObjectsInInterval. Why do we even need to create a timeline?
There was a problem hiding this comment.
Do you mean when it is false?
If it is true, we are simply getting all the used ids in the retrieval call, so we'd have to create a SegmentTimeline as we're interested only in the visible segment set
|
|
||
| // Retrieve the segments for the ids stored in the map to get the numCorePartitions | ||
| final Set<String> segmentIdsToRetrieve = new HashSet<>(); | ||
| for (Map<Interval, SegmentId> itvlMap : versionIntervalToSmallestSegmentId.values()) { |
There was a problem hiding this comment.
Why do we want/use the Smallest here?
There was a problem hiding this comment.
The idea was so that we get a consistent result irrespective of the order in which the metadata store returns results
There was a problem hiding this comment.
can you add as comment on the use of Smallest. Thanks
| return retrieveSegmentIds(dataSource, Collections.singletonList(interval)); | ||
| } | ||
|
|
||
| private Set<SegmentId> retrieveSegmentIds( |
There was a problem hiding this comment.
should this also be called retrieveUsedSegmentIds?
There was a problem hiding this comment.
Yes, it should. Thanks, I'll make the change.
|
I've added a TaskLockPosse level cache for segment allocations with the idea that the used segment set and pending segments are covered by an appending lock and cannot change unless altered by tasks holding these locks. There are 24 intervals, each with 2000 datasegments, each with 1000 dimensions. Here are the results using a simple unit test:
|
|
@AmatyaAvadhanula , caching the segment state seems like a good idea. But it would be better to limit this PR to have only the The caching changes can be done as a follow up to this PR. |
…onReduceMetadataIO
|
Do we have plan to implement TaskLockPosse level cache for segment allocations too? @AmatyaAvadhanula @kfaraz |
|
@maytasm , I am currently working on caching segment metadata on the Overlord to speed up segment allocations. |
Problem
Metadata IO is the main bottleneck for segment allocation times, which can contribute to lag significantly.
Currently, the metadata call to fetch used segments limits the allocation rate, and contributes to allocation wait time which can grow quite quickly. The sum of allocation times is a direct contributor to lag.
Idea
Segment allocation with
time chunk lockingrequires only the segment ids and the number of core partitions to determine the next id.We can issue a much less expensive call to fetch segmentIds instead of segments if the segment payloads are significantly large due to the dimensions and metrics.
The core partitions do not change for a given (datasource, interval, version) and we can fetch exactly one segment by id for such combinations and reduce the load on the metadata store, and also reduce the time taken to build the DataSegment object on the Overlord.
Usage
Adds a new feature flag
druid.indexer.tasklock.segmentAllocationReduceMetadataIOwith default valuefalseSetting this flag to true allows segment allocation to fetch only the required segmentIds and fewer segment payloads from the metadata store.
At present, this flag is only applicable for TimeChunk locking with batch segment allocation.
This PR has: