Using deep storage as intermediate store for shuffle tasks
Description
If autoscaling for MM is enabled then MM which generated the intermediate index might not be available as it may have been scaled down. So it is a good idea to have an option to use deep storage for intermediate data.
Changes
For pushing partial segments
ShuffleDataSegmentPusher uses IntermediaryDataManager. It can be converted to an interface having methods -
long addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, URI segmentLocation)
Optional<ByteSource> findPartitionFile(String supervisorTaskId, String subTaskId, Interval interval, int bucketId)
void deletePartitions(String supervisorTaskId)
Default implementation of IntermediaryDataManager can be LocalIntermediaryDataManager which manages partial segments locally on MM. Optional implementation can be added via extensions to support different deep storages or other places.
For pulling partial segments
ShuffleClient is already interfaced having default implementation of HttpShuffleClient, so just need to implement ones for other storage. Interface method need to be changed to File fetchSegmentFile(URI partitionDir, String supervisorTaskId, P location). Might need to check if different implementation of PartitionLocation is also needed.
Motivation
To make shuffle work with MM auto scaling.
Using deep storage as intermediate store for shuffle tasks
Description
If autoscaling for MM is enabled then MM which generated the intermediate index might not be available as it may have been scaled down. So it is a good idea to have an option to use deep storage for intermediate data.
Changes
For pushing partial segments
ShuffleDataSegmentPusherusesIntermediaryDataManager. It can be converted to an interface having methods -long addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, URI segmentLocation)Optional<ByteSource> findPartitionFile(String supervisorTaskId, String subTaskId, Interval interval, int bucketId)void deletePartitions(String supervisorTaskId)Default implementation of
IntermediaryDataManagercan beLocalIntermediaryDataManagerwhich manages partial segments locally on MM. Optional implementation can be added via extensions to support different deep storages or other places.For pulling partial segments
ShuffleClientis already interfaced having default implementation ofHttpShuffleClient, so just need to implement ones for other storage. Interface method need to be changed toFile fetchSegmentFile(URI partitionDir, String supervisorTaskId, P location). Might need to check if different implementation ofPartitionLocationis also needed.Motivation
To make shuffle work with MM auto scaling.