Parallel indexing single dim partitions#8925
Conversation
Implements single dimension range partitioning for native parallel batch indexing as described in apache#8769. This initial version requires the druid-datasketches extension to be loaded. The algorithm has 5 phases that are orchestrated by the supervisor in `ParallelIndexSupervisorTask#runRangePartitionMultiPhaseParallel()`. These phases and the main classes involved are described below: 1) In parallel, determine the distribution of dimension values for each input source split. `PartialDimensionDistributionTask` uses `StringSketch` to generate the approximate distribution of dimension values for each input source split. If the rows are ungrouped, `PartialDimensionDistributionTask.UngroupedRowDimensionValueFilter` uses a Bloom filter to skip rows that would be grouped. The final distribution is sent back to the supervisor via `DimensionDistributionReport`. 2) The range partitions are determined. In `ParallelIndexSupervisorTask#determineAllRangePartitions()`, the supervisor uses `StringSketchMerger` to merge the individual `StringSketch`es created in the preceding phase. The merged sketch is then used to create the range partitions. 3) In parallel, generate partial range-partitioned segments. `PartialRangeSegmentGenerateTask` uses the range partitions determined in the preceding phase and `RangePartitionCachingLocalSegmentAllocator` to generate `SingleDimensionShardSpec`s. The partition information is sent back to the supervisor via `GeneratedGenericPartitionsReport`. 4) The partial range segments are grouped. In `ParallelIndexSupervisorTask#groupGenericPartitionLocationsPerPartition()`, the supervisor creates the `PartialGenericSegmentMergeIOConfig`s necessary for the next phase. 5) In parallel, merge partial range-partitioned segments. `PartialGenericSegmentMergeTask` uses `GenericPartitionLocation` to retrieve the partial range-partitioned segments generated earlier and then merges and publishes them.
0130ff7 to
a166b7e
Compare
| For perfect rollup, you should use either `hashed` (partitioning based on the hash of dimensions in each row) or | ||
| `single_dim` (based on ranges of a single dimension. For best-effort rollup, you should use `dynamic`. | ||
|
|
||
| Hashed partitioning is recommended in most cases, as it will improve indexing performance and create more uniformly |
There was a problem hiding this comment.
I'm not sure how hashed partitioning can improve indexing performance or create more uniformly sized data segments relative to dynamic partitioning. With dynamic partitioning, the parallel indexing task will run in a single phase mode whereas hash-based partitioning requires to run in two phases mode. Also, the uniformity in segment size with hashed partitioning will depend on the partition key distribution whereas dynamic partitioning guarantees a max size for segments. Am I missing something?
There was a problem hiding this comment.
I've reworded it to only recommend hash partitioning over single dim partitioning for perfect rollup.
|
|
||
| private TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) throws Exception | ||
| { | ||
| assertDataSketchesAvailable(); |
There was a problem hiding this comment.
This method will be called after the task is assigned to a middleManager or an indexer which could be resource and time wasting if the datasketch extension is not loaded. I think it's not common to load datasketch in middleManagers but not in the overlord. Can we check this in isReady()?
| return TaskStatus.failure(getId()); | ||
| } | ||
|
|
||
| Map<Interval, String[]> intervalToPartitions = |
There was a problem hiding this comment.
Suggest to create a concrete class rather than using Map. It will be more intuitive and clear to understand.
There was a problem hiding this comment.
I think it makes more sense to add a class wrapping String[] rather than the map.
There was a problem hiding this comment.
I've created a class Partitions and used it to replace usages of String[]
| <groupId>org.apache.logging.log4j</groupId> | ||
| <artifactId>log4j-api</artifactId> | ||
| </dependency> | ||
| <dependency> |
There was a problem hiding this comment.
Would you please add a comment where datasketches are used?
| metricsNames | ||
| ), | ||
| inputFormat, | ||
| null |
There was a problem hiding this comment.
Shouldn't be null. toolbox.getIndexingTmpDir()?
| String minDimensionValue = intervalToMinDimensionValue.get(interval); | ||
| if (minDimensionValue == null || dimensionValue.compareTo(minDimensionValue) < 0) { | ||
| intervalToMinDimensionValue.put(interval, dimensionValue); | ||
| } |
There was a problem hiding this comment.
super nit: you can use compute. It's better in a sense that it computes hash code only one time, but I don't believe that matters here.
intervalToMinDimensionValue.compute(
interval,
(intervalKey, currentMinValue) -> {
if (currentMinValue == null || dimensionValue.compareTo(currentMinValue) < 0) {
return dimensionValue;
} else {
return currentMinValue;
}
}
);There was a problem hiding this comment.
Changed this method and updateMaxDimensionValue to use Map.compute()
| { | ||
| Map<Interval, StringDistribution> intervalToDistribution = new HashMap<>(); | ||
| DimensionValueFilter dimValueFilter = | ||
| isAssumeGrouped |
There was a problem hiding this comment.
Looks like it should check rollup is true or not.
There was a problem hiding this comment.
Added a check for granularitySpec.isRollup()
| } | ||
|
|
||
| @VisibleForTesting | ||
| static class UngroupedRowDimensionValueFilter implements DimensionValueFilter |
There was a problem hiding this comment.
Would you please add a javadoc what rows this will filter out?
| public interface StringDistribution | ||
| { | ||
| /** | ||
| * Record occurence of {@link String} |
There was a problem hiding this comment.
type: occurence -> occurrence
| @@ -63,19 +66,21 @@ interface IntervalToSegmentIdsCreator | |||
| CachingLocalSegmentAllocator( | |||
There was a problem hiding this comment.
Hmm maybe this should be renamed to something else since it's a bit strange that callers don't extends this class directly which I think I see why now.
There was a problem hiding this comment.
Renamed to CachingLocalSegmentAllocatorHelper
| //noinspection ResultOfObjectAllocationIgnored | ||
| new StringSketch(); | ||
| } | ||
| catch (Throwable t) { |
There was a problem hiding this comment.
I think this should catch the particular type of error. I guess it's ClassNotFoundException to be caught?
There was a problem hiding this comment.
Changed to catch NoClassDefFoundError.
| return Collections.emptyList(); | ||
| } | ||
|
|
||
| String[] uniquePartitions = Arrays.stream(partitions).distinct().toArray(String[]::new); |
There was a problem hiding this comment.
Can partitions have duplicate values? It seems to get from StringSketch.getEventPartitionsByCount() which shouldn't return duplicates. If this shouldn't have duplicates, I would suggest to add a sanity check instead of deduplicating them.
There was a problem hiding this comment.
Currently, StringSketch.getEventPartitionsByCount() can return duplicate values since it's a wrapper for ItemsSketch.getQuantiles() (i.e., if the distribution has many duplicates, adjacent quantiles may have the same value).
I'll also fix the typo in "getEventPartitionsByCount".
|
|
||
| if (isLastPartitionOnlyMaxValue(partitions)) { | ||
| // The last partition only contains the max value. A shard that just contains the max value is likely to be | ||
| // small, so combine it with the second to last one. |
There was a problem hiding this comment.
Hmm, I'm not sure this assumption makes sense. The indexing might be a bit better if the assumption holds, but if not, the indexing speed could get worse significantly by the skewed data distribution. I think we shouldn't depend on this kind of assumption. Instead, we can utilize PartitionStat once we collect it properly in the future.
There was a problem hiding this comment.
Or probably we should collect the number of elements in each partition together. Does sketch provide such functionality?
There was a problem hiding this comment.
The sketch does not have an API that does that. I've remove the logic to combine the last two partitions.
| uniquePartitions[i + 1], | ||
| i | ||
| )) | ||
| .collect(Collectors.toCollection(ArrayList::new)); |
There was a problem hiding this comment.
Can be simplified into Collectors.toList().
There was a problem hiding this comment.
The javadoc for Collectors.toList() states:
There are no guarantees on the type, mutability, serializability, or thread-safety of the List returned
Since the returned List needs to be mutated, I specified the list implementation used by the Collector.
| return intervalToSegmentIds; | ||
| } | ||
|
|
||
| private List<SegmentIdWithShardSpec> translatePartitions( |
There was a problem hiding this comment.
Please add a Javadoc since it's not easy to guess what this method does from its name.
| * | ||
| * @see PartialHashSegmentMergeParallelIndexTaskRunner | ||
| */ | ||
| class PartialRangeSegmentGenerateParallelIndexTaskRunner |
There was a problem hiding this comment.
Similarly, suggest PartialRangePartitionedSegmentGenerateRunner.
There was a problem hiding this comment.
I'll leave the current name for now
There was a problem hiding this comment.
Do you mean you're planning to do it later? Would you please open an issue for it then?
| { | ||
| static final String NAME = "sketch"; | ||
| static final int SKETCH_K = 1 << 12; // smallest value with normalized rank error < 0.1%; retain up to ~86k elements | ||
| static final Comparator<String> SKETCH_COMPARATOR = Comparator.naturalOrder(); |
There was a problem hiding this comment.
nit: this doesn't necessarily have to be done in this PR, but it would be nice if it supports different orderings for the string type.
There was a problem hiding this comment.
I like the idea, but think it'll be best to do it in a subsequent PR.
| public abstract class IngestionTestBase | ||
| { | ||
| static { | ||
| NullHandling.initializeForTests(); |
There was a problem hiding this comment.
Please extend InitializedNullHandlingTest instead of this.
| @Override | ||
| ShardSpec createShardSpec(TaskToolbox toolbox, Interval interval, int partitionNum) | ||
| { | ||
| return createIntervalAndIntegerToShardSpec.get(interval, partitionNum); |
There was a problem hiding this comment.
This should fail if there is no shardSpec for the given interval and partitionNum.
There was a problem hiding this comment.
Added a Precondition
| } | ||
|
|
||
| @Override | ||
| ShardSpec createShardSpec(TaskToolbox toolbox, Interval interval, int partitionNum) |
There was a problem hiding this comment.
Suggest to use partitionId instead of partitionNum because I think it's more clear. Same for other places.
| List<String> metricsNames = Arrays.stream(dataSchema.getAggregators()) | ||
| .map(AggregatorFactory::getName) | ||
| .collect(Collectors.toList()); | ||
| InputFormat inputFormat = ParallelIndexSupervisorTask.getInputFormat(ingestionSchema); |
There was a problem hiding this comment.
Should be inputSource.needsFormat() ? ParallelIndexSupervisorTask.getInputFormat(ingestionSchema) : null.
jihoonson
left a comment
There was a problem hiding this comment.
A task with the range partitioning can fail if the set of input files has changed between the distribution investigation phase and the partial segment generation phase because CachingLocalSegmentAllocatorHelper.allocate() will return null for unknown sequence names. Even though it also looks good to have an option to continue indexing instead of failing, I think it's ok for now to always fail. Would you please call out this in the document as well?
| For perfect rollup, you should use either `hashed` (partitioning based on the hash of dimensions in each row) or | ||
| `single_dim` (based on ranges of a single dimension. For best-effort rollup, you should use `dynamic`. | ||
|
|
||
| For perfect rollup, `hashed` partitioning is recommended in most cases, as it will improve indexing |
There was a problem hiding this comment.
I think it's worth to clearly mention what are the pros/cons of using each partitions spec instead of promoting using hashed partitioning.
- With
dynamicpartitioning, you can expect the fastest ingestion speed compared to when using other partitions specs. It also always guarantees a well-balanced distribution in segment size. - With
hashed, your wording is correct. - With
single_dim, its partitioning can be skewed depending on the partition key, but the broker can use of the partition information to prune segments to query earlier. If the query has a filter on the partition key column, the broker can filter out segments which have only the values not satisfying the filter.
There was a problem hiding this comment.
Added a section describing the pros/cons.
|
|
||
| > Single-dimension range partitioning currently requires the | ||
| > [druid-datasketches](../development/extensions-core/datasketches-extension.md) | ||
| > extension to be added to the classpath. |
There was a problem hiding this comment.
How about extension to be [loaded](https://druid.apache.org/docs/0.16.0-incubating/development/extensions.html#loading-extensions)?
There was a problem hiding this comment.
Added a link to loading the extension from the classpath: https://druid.apache.org/docs/latest/development/extensions.html#loading-extensions-from-the-classpath
There was a problem hiding this comment.
Also added a warning here about possible errors if the input changes during the two passes over the input.
| int numUniquePartition = uniquePartitions.length; | ||
|
|
||
| // First partition starts with null (see StringPartitionChunk.isStart()) | ||
| uniquePartitions[0] = null; |
There was a problem hiding this comment.
It looks like the last value also needs to be null. Also please add a comment why it's ok to update those values.
There was a problem hiding this comment.
And probably it would be better to move this logic into Partitions.
There was a problem hiding this comment.
The last partition is handled by createLastSegmentIdWithShardSpec(). I've moved the dedup/first/last logic into Partitions (which is renamed to PartitionBoundaries.
| /** | ||
| * Convenience wrapper to make code more readable. | ||
| */ | ||
| public class Partitions extends ForwardingList<String> implements List<String> |
There was a problem hiding this comment.
I think the name of the class is a bit confusing to me. Should it be PartitionBoundaries?
| new StringSketch(); | ||
| } | ||
| catch (NoClassDefFoundError e) { | ||
| throw new ISE(e, "DataSketches is unvailable. Try adding the druid-datasketches extension to the classpath."); |
There was a problem hiding this comment.
How about Try loading the druid-datasketches extension in the overlord and middleManagers/indexers?
There was a problem hiding this comment.
Adjusted the wording in the error message
| ParallelIndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig(); | ||
|
|
||
| SingleDimensionPartitionsSpec partitionsSpec = (SingleDimensionPartitionsSpec) tuningConfig.getPartitionsSpec(); | ||
| Preconditions.checkNotNull(partitionsSpec); |
There was a problem hiding this comment.
It would be nice to print that partitionsSpec in tuningConfig is null if it's null.
There was a problem hiding this comment.
Added an error message
| } | ||
| } | ||
|
|
||
| // UngroupedDimValueFilter may not accept the min/max dimensionValue. If needed, add the min/max |
There was a problem hiding this comment.
Please update the doc here.
There was a problem hiding this comment.
Updated the class name
| * | ||
| * @see PartialHashSegmentMergeParallelIndexTaskRunner | ||
| */ | ||
| class PartialRangeSegmentGenerateParallelIndexTaskRunner |
There was a problem hiding this comment.
Do you mean you're planning to do it later? Would you please open an issue for it then?
| * partition key). The {@link ShardSpec} is later used by {@link PartialGenericSegmentMergeTask} to merge the partial | ||
| * segments. | ||
| */ | ||
| public class PartitionMetadata extends PartitionStat<ShardSpec> |
There was a problem hiding this comment.
Do you want to rename PartitionStat as well? I'm fine with doing it as a follow-up.
There was a problem hiding this comment.
GeneratedHashPartitionsReport has a JSON field name of partitionStats, which needs to be preserved for backward compatibility. I think keeping the "Stat" suffix is nice for having "HashPartitionStats" match the JSON. To make things symmetric, I've renamed "PartitionMetadata" back to "GenericPartitionStats".
| partitionBoundaries.set(0, null); | ||
|
|
||
| // Last partition ends with null (see StringPartitionChunk.isEnd()) | ||
| partitionBoundaries.add(null); |
There was a problem hiding this comment.
Is there a reason to handle the first and last partition differently? Looks like the last partition will be (max, null) which could be empty or very small.
There was a problem hiding this comment.
The last partition will never be empty because it'll have at least one row with the max value.
Previously, I had logic to combine it with the second-to-last partition when it was small:
#8925 (comment)
If it's still desired to not have that logic to decide whether to combine it or not, then it needs to either always be combined or never be combined.
There was a problem hiding this comment.
My question is why the null is added at last instead of replacing the max just like the first partition like below.
// First partition starts with null (see StringPartitionChunk.isStart())
partitionBoundaries.set(0, null);
// Last partition ends with null (see StringPartitionChunk.isEnd())
partitionBoundaries.set(partitionBoundaries.size() - 1, null);What is the assumption behind handling the first partition and the last one differently?
| @Test(groups = TestNGGroup.BATCH_INDEX) | ||
| @Guice(moduleFactory = DruidTestModuleFactory.class) | ||
| public class ITParallelIndexTest extends AbstractITBatchIndexTest | ||
| public class ITImperfectRollupParallelIndexTest extends AbstractITBatchIndexTest |
There was a problem hiding this comment.
ITBestEffortRollupParallelIndexTest? (https://druid.apache.org/docs/latest/ingestion/index.html#best-effort-rollup)
| Iterables.getOnlyElement(inputRow.getDimension(partitionDimension)) | ||
| ); | ||
|
|
||
| if (dimensionValue != null) { |
There was a problem hiding this comment.
What happens here if the row actually contained a null for the dimension? Does this need to distinguish that case?
There was a problem hiding this comment.
I can change the behavior of DimensionValueFilter.accept() to handle that case.
|
|
||
| TaskState distributionState = runNextPhase(distributionRunner); | ||
| if (distributionState.isFailure()) { | ||
| return TaskStatus.failure(getId()); |
There was a problem hiding this comment.
Suggest adding an error message to the failure status indicating what phase failed
jon-wei
left a comment
There was a problem hiding this comment.
The design LGTM, didn't completely review the code
Description
Implements single dimension range partitioning for native parallel batch indexing as described in #8769. This initial version requires the druid-datasketches extension to be added to the classpath.
The algorithm has 5 phases that are orchestrated by the supervisor in
ParallelIndexSupervisorTask#runRangePartitionMultiPhaseParallel(). These phases and the main classes involved are described below:In parallel, determine the distribution of dimension values for each input source split.
PartialDimensionDistributionTaskusesStringSketchto generate the approximate distribution of dimension values for each input source split. If the rows are ungrouped,PartialDimensionDistributionTask.UngroupedRowDimensionValueFilteruses a Bloom filter to skip rows that would be grouped. The final distribution is sent back to the supervisor viaDimensionDistributionReport.The range partitions are determined.
In
ParallelIndexSupervisorTask#determineAllRangePartitions(), the supervisor usesStringSketchMergerto merge the individualStringSketches created in the preceding phase. The merged sketch is then used to create the range partitions.In parallel, generate partial range-partitioned segments.
PartialRangeSegmentGenerateTaskuses the range partitions determined in the preceding phase andRangePartitionCachingLocalSegmentAllocatorto generateSingleDimensionShardSpecs. The partition information is sent back to the supervisor viaGeneratedGenericPartitionsReport.The partial range segments are grouped.
In
ParallelIndexSupervisorTask#groupGenericPartitionLocationsPerPartition(), the supervisor creates thePartialGenericSegmentMergeIOConfigs necessary for the next phase.In parallel, merge partial range-partitioned segments.
PartialGenericSegmentMergeTaskusesGenericPartitionLocationto retrieve the partial range-partitioned segments generated earlier and then merges and publishes them.This PR has:
Key changed/added classes in this PR
PartialDimensionDistributionTaskandPartialDimensionDistributionTaskTestRangePartitionCachingLocalSegmentAllocatorandRangePartitionCachingLocalSegmentAllocatorTestRangePartitionMultiPhaseParallelIndexingTest