Add sequential sketch merging to MSQ#13205
Conversation
cryptoe
left a comment
There was a problem hiding this comment.
A partial review. Will try to get this finished tomorrow.
LakshSingla
left a comment
There was a problem hiding this comment.
Thanks for the contribution! Left a few comments on the same.
| */ | ||
| public class WorkerSketchFetcher | ||
| { | ||
| private static final int DEFAULT_THREAD_COUNT = 10; |
There was a problem hiding this comment.
Going through this again, with regards to memory usage, we should limit the threads in relation to the number of sketches the controller can maintain at a time in memory.
There was a problem hiding this comment.
good catch. Yeah, we need to limit the number of parallel threads here. Lets start with 4 ?
| */ | ||
| public class WorkerSketchFetcher | ||
| { | ||
| private static final int DEFAULT_THREAD_COUNT = 10; |
There was a problem hiding this comment.
good catch. Yeah, we need to limit the number of parallel threads here. Lets start with 4 ?
cryptoe
left a comment
There was a problem hiding this comment.
Left my final review. Thanks for waiting on this @adarshsanjeev .
cryptoe
left a comment
There was a problem hiding this comment.
Overall lgtm. Some comments here and there.
|
Thanks for the contribution @adarshsanjeev. |
* Add sketch fetching framework * Refactor code to support sequential merge * Update worker sketch fetcher * Refactor sketch fetcher * Refactor sketch fetcher * Add context parameter and threshold to trigger sequential merge * Fix test * Add integration test for non sequential merge * Address review comments * Address review comments * Address review comments * Resolve maxRetainedBytes * Add new classes * Renamed key statistics information class * Rename fetchStatisticsSnapshotForTimeChunk function * Address review comments * Address review comments * Update documentation and add comments * Resolve build issues * Resolve build issues * Change worker APIs to async * Address review comments * Resolve build issues * Add null time check * Update integration tests * Address review comments * Add log messages and comments * Resolve build issues * Add unit tests * Add unit tests * Fix timing issue in tests
* Add sketch fetching framework * Refactor code to support sequential merge * Update worker sketch fetcher * Refactor sketch fetcher * Refactor sketch fetcher * Add context parameter and threshold to trigger sequential merge * Fix test * Add integration test for non sequential merge * Address review comments * Address review comments * Address review comments * Resolve maxRetainedBytes * Add new classes * Renamed key statistics information class * Rename fetchStatisticsSnapshotForTimeChunk function * Address review comments * Address review comments * Update documentation and add comments * Resolve build issues * Resolve build issues * Change worker APIs to async * Address review comments * Resolve build issues * Add null time check * Update integration tests * Address review comments * Add log messages and comments * Resolve build issues * Add unit tests * Add unit tests * Fix timing issue in tests
Since apache#13205, a special deserializer module has no longer been necessary to read key collector snapshots. This patch removes the unnecessary code.
Since #13205, a special deserializer module has no longer been necessary to read key collector snapshots. This patch removes the unnecessary code.
Current Implementation
In the current MSQ implementation, each worker maintains a ClusterByStatisticsCollector. If we are clustering by time, one sketch is maintained for each time chunk. During the merge, we sent the entire ClusterByStatisticsCollector from the worker to the controller. These are merged together, downsampling if memory taken is too much and partitions are generated from this.
Potential Improvements
Taking all worker sketches into controller memory is an unneeded step, which leads to downsampling of sketches and generating partitions outside the targeted weight. Since we have a primary partitioning on time, we only need to maintain the sketch from all workers for a particular time partition, generate the partition and remove the sketches. This is the sequential sketch merging approach.
Since this increases time taken however, this has been moved to a separate WorkerSketchFetcher. This will use an executor service to avoid blocking the main controller thread. This will not remove the current functionality, it will only uses sequential merging under cases that it is likely to provide greater benefits. For small sketches, it continues to fetch the entire sketch for the speed.
Key changes
MERGING_STATISTICSand necessary transitionsContext changes
clusterStatisticsMergeMode. It controls whether to parallel or sequential merging of worker sketches. Can bePARALLEL,SEQUENTIALorAUTO. OnAUTOtries to find the best approach based on number of workers and size of input rows.Release note
/keyStatistics/{queryId}/{stageNumber}/{workerNumber}/partialKeyStatisticsInformation/{queryId}/{stageNumber}/{workerNumber}that accepts an object that is deserializable to PartialKeyStatisticsInformation.Key changed/added classes in this PR
ClusterByStatisticsCollectorWorkerClientControllerClientStageDefinitionWorkerSketchFetcherControllerStageTrackerClusterByStatisticsWorkerReportThis PR has: