Skip to content

Automatically determine numShards for parallel ingestion hash partitioning#10419

Merged
jon-wei merged 9 commits intoapache:masterfrom
jon-wei:numshards
Sep 24, 2020
Merged

Automatically determine numShards for parallel ingestion hash partitioning#10419
jon-wei merged 9 commits intoapache:masterfrom
jon-wei:numshards

Conversation

@jon-wei
Copy link
Copy Markdown
Contributor

@jon-wei jon-wei commented Sep 22, 2020

This PR allows parallel batch ingestion to automatically determine numShards when forceGuaranteedRollup is used with hash partitioning.

This is accomplished with a new phase of subtasks (PartialDimensionCardinalityTask). These subtasks build a map of <Interval, HyperLogLogCollector> where the HLL collector records the cardinality of the partitioning dimensions for each segment granularity interval in the input data.

The supervisor task aggregates the HLL collectors by interval, and determines the highest cardinality across all the intervals. This max cardinality is divided by maxRowsPerSegment to determine numShards automatically.

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.


@VisibleForTesting
static List<Object> getGroupKey(final List<String> partitionDimensions, final long timestamp, final InputRow inputRow)
public static List<Object> getGroupKey(final List<String> partitionDimensions, final long timestamp, final InputRow inputRow)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to keep the @VisibleForTesting since this is now public (so that it can be used in PartialDimensionCardinalityTask)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the @VisibleForTesting

if (!(ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof HashedPartitionsSpec)) {
// only range and hash partitioning is supported for multiphase parallel ingestion, see runMultiPhaseParallel()
throw new ISE(
"forceGuaranteedRollup is set but partitionsSpec [%s] is not a ranged or hash partition spec.",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using "single_dim" instead of "ranged" in the error message maps better to the naming in the docs

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the message to use "single_dim"

//noinspection OptionalGetWithoutIsPresent (InputRowIterator returns rows with present intervals)
Interval interval = granularitySpec.bucketInterval(timestamp).get();

LOG.info("TS: " + timestamp + " INTV: " + interval + " GSC: " + granularitySpec.getClass());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about changing the logging level to reduce the logging amount since this is printed for each row?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll remove this, it was a debugging-only message that I don't think should be retained

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been removed


LOG.info("TS: " + timestamp + " INTV: " + interval + " GSC: " + granularitySpec.getClass());

HyperLogLogCollector hllCollector = intervalToCardinalities.computeIfAbsent(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about using HllSketch instead of HyperLogLogCollector since HllSketch provides much more accurate estimates if the cardinality does not exceed the sketch's k value: http://datasketches.apache.org/docs/HLL/HllSketchVsDruidHyperLogLogCollector.html. Using HllSketch also will be more accurate and faster when the partial HLLs are merged.

Using HllSketch would mean that the implementation for parallel ingestion is different from the one for sequential ingestion though.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using HllSketch would mean that the implementation for parallel ingestion is different from the one for sequential ingestion though.

That was my reasoning for using HyperLogLogCollector, but I think it makes sense to change these to use HllSketch. I'll update this one to use HllSketch and a follow-on could be to do the same for IndexTask.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated this to use HllSketch instead

Comment on lines +127 to +133
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
return tryTimeChunkLock(
taskActionClient,
getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the logic in PartialHashSegmentGenerateTask.isReady() will need to be adjusted; otherwise if PartialDimensionCardinalityTask runs and grabs the locks here it'll get stuck. For example, PartialRangeSegmentGenerateTask.isReady() does not grab the locks since they're acquired earlier by PartialDimensionDistributionTask.

I think a good place to add a test would be in HashPartitionMultiPhaseParallelIndexingTest. Currently, its test cases all specify a value of 2 for numShards, so we can add cases with null.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, will look into this

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't see any failures when I was testing this on a cluster or in unit tests with the current locking (maybe related to the javadoc comment on isReady(): "This method must be idempotent, as it may be run multiple times per task."?).

I updated PartialHashSegmentGenerateTask.isReady() to skip the lock acquisition if the numShardsOverride is set (indicating that that cardinality phase ran).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think acquiring locks here should be fine since it's idempotent. The supervisor task and all its subtasks share the same lock based on their groupId. I actually think it's better to call tryTimeChunkLock() in every subtask since it will make the task fail early when its lock is revoked. Otherwise, the task will fail when it publishes segments which happens at the last stage in batch ingestion.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I seem to remember running into the self deadlock issue when implementing #8925, which is why the locks are only acquired in the first phase of the range partitioning subtasks. I don't remember if I discovered this via RangePartitionMultiPhaseParallelIndexingTest or ITPerfectRollupParallelIndexTest, but if the problem isn't showing up in this PR's tests, then that's good.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't able to reproduce the self deadlock with RangePartitionMultiPhaseParallelIndexingTest or ITPerfectRollupParallelIndexTest in master, so either I misremembered stuff or the issue has been fixed in the meantime.

public String getForceGuaranteedRollupIncompatiblityReason()
{
return getNumShards() == null ? NUM_SHARDS + " must be specified" : FORCE_GUARANTEED_ROLLUP_COMPATIBLE;
return FORCE_GUARANTEED_ROLLUP_COMPATIBLE;
Copy link
Copy Markdown
Contributor

@ccaominh ccaominh Sep 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/apache/druid/blob/master/docs/ingestion/native-batch.md#hash-based-partitioning needs to be updated to say that numShards is no longer required and also to mention the new partial dimension cardinality task.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the docs to mark numShards as optional and added a description of the new cardinality scan phase.


final Integer numShardsOverride;
HashedPartitionsSpec partitionsSpec = (HashedPartitionsSpec) ingestionSchema.getTuningConfig().getPartitionsSpec();
if (ingestionSchema.getTuningConfig().isForceGuaranteedRollup() && partitionsSpec.getNumShards() == null) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking isForceGuaranteedRollup() is probably redundant here

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the isForceGuaranteedRollup check here

import java.util.Map;

@RunWith(Enclosed.class)
public class PartialDimensionCardinalityTaskTest
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the extensive set of tests!

});

// determine the highest cardinality in any interval
long maxCardinality = Long.MIN_VALUE;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about using 0 instead here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed this to 0


try {
hllSketch.update(
IndexTask.HASH_FUNCTION.hashBytes(jsonMapper.writeValueAsBytes(groupKey)).asBytes()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of curiosity, what is the effect if we pass jsonMapper.writeValueAsBytes(groupKey) directly to hllSketch? does it affect performance or accuracy in any way? hllSketch is already doing hashing on the byte array input.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I got rid of the first level of hashing there, it should be more accurate this way

Comment thread docs/ingestion/native-batch.md Outdated
|--------|-----------|-------|---------|
|type|This should always be `hashed`|none|yes|
|numShards|Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data.|null|yes|
|numShards|Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data.|null|no|
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the new parameter and its description.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I added any new parameters to HashedPartitionsSpec, were you referring to something else?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I should have been more specific. I meant, the newly supported parameter, maxRowsPerSegment (or targetRowsPerSegment which I suggested below). Neither of them is not described here since we didn't support it before. Maybe https://github.com/apache/druid/blob/master/docs/ingestion/hadoop.md#hash-based-partitioning helps.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added docs for targetRowsPerSegment

Comment thread docs/ingestion/native-batch.md Outdated
The Parallel task splits the input data and assigns them to worker tasks based on the split hint spec.
Each worker task (type `partial_dimension_cardinality`) gathers estimates of partitioning dimensions cardinality for
each time chunk. The Parallel task will aggregate these estimates from the worker tasks and determine the highest
cardinality across all of the time chunks in the input data, dividing this cardinality by `maxRowsPerSegment` to
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like that it will guarantee that you will not have segments than the computed numShards, but it will not be guaranteed that number of rows per segment doesn't exceed maxRowsPerSegment since the partition dimensions can be skewed. Is this correct? Then, I suggest targetRowsPerSegment since it's not a hard limit.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went with targetRowsPerSegment

return Math.toIntExact(numShards);
}
catch (ArithmeticException ae) {
return Integer.MAX_VALUE;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. Should we fail instead? Since the timeline in the coordinator and the broker will explode if you have this many segments per interval.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this to throw an exception now

Comment thread docs/ingestion/native-batch.md Outdated

The Parallel task with hash-based partitioning is similar to [MapReduce](https://en.wikipedia.org/wiki/MapReduce).
The task runs in 2 phases, i.e., `partial segment generation` and `partial segment merge`.
The task runs in up to 3 phases: `partial_dimension_cardinality`, `partial segment generation` and `partial segment merge`.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I wrote this, my intention was showing the name of phases (e.g., https://github.com/apache/druid/pull/10419/files#diff-05bbc55d565a3d9462353d9b4771cb09R34). This phase name is not shown anywhere currently, but will be available in the task live reports and metrics after #10352. How about removing underscores from the here too?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the underscores here

Comment on lines +127 to +133
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
return tryTimeChunkLock(
taskActionClient,
getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think acquiring locks here should be fine since it's idempotent. The supervisor task and all its subtasks share the same lock based on their groupId. I actually think it's better to call tryTimeChunkLock() in every subtask since it will make the task fail early when its lock is revoked. Otherwise, the task will fail when it publishes segments which happens at the last stage in batch ingestion.

);
List<Object> groupKey = HashBasedNumberedShardSpec.getGroupKey(
partitionDimensions,
interval.getStartMillis(),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, thanks, I fixed this and updated PartialDimensionCardinalityTaskTest.sendsCorrectReportWithMultipleIntervalsInData() to test that case

Map<Interval, HllSketch> intervalToCardinalities = new HashMap<>();
while (inputRowIterator.hasNext()) {
InputRow inputRow = inputRowIterator.next();
if (inputRow == null) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: inputRow is safely non-null since FilteringCloseableInputRowIterator filters out all null rows. See AbstractBatchIndexTask.defaultRowFilter().

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the null check

parseExceptionHandler
);
HandlingInputRowIterator iterator =
new DefaultIndexTaskInputRowIteratorBuilder()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: DefaultIndexTaskInputRowIteratorBuilder effectively does nothing here since its core functionality has been moved to FilteringCloseableInputRowIterator in #10336. I haven't cleaned up this interface yet. Now, it's only useful in range partitioning as some more useful inputRowHandlers are appended to the default builder.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this to just use the iterator above this directly

@jon-wei
Copy link
Copy Markdown
Contributor Author

jon-wei commented Sep 24, 2020

I added the locking back on PartialHashSegmentGenerateTask to address #10419 (comment)

Copy link
Copy Markdown
Contributor

@jihoonson jihoonson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks @jon-wei!

@jihoonson
Copy link
Copy Markdown
Contributor

One thing to note: after this PR, the parallel task can compute the number of partitions automatically, but the same number will be applied to all intervals. I think it will be better to compute and apply different numbers of partitions per interval to handle potential data skew between intervals which is what the simple task (IndexTask) does. But I'm OK with doing this improvement as a follow-up.

@jon-wei jon-wei merged commit cb30b1f into apache:master Sep 24, 2020
ccaominh added a commit to ccaominh/druid that referenced this pull request Sep 26, 2020
Specifying `numShards` for hashed partitions is no longer required after
apache#10419. Update the UI to make
`numShards` an optional field for hash partitions.
vogievetsky pushed a commit that referenced this pull request Sep 29, 2020
* Compaction config UI optional numShards

Specifying `numShards` for hashed partitions is no longer required after
#10419. Update the UI to make
`numShards` an optional field for hash partitions.

* Update snapshot
@jon-wei jon-wei added this to the 0.20.0 milestone Sep 30, 2020
JulianJaffePinterest pushed a commit to JulianJaffePinterest/druid that referenced this pull request Jan 22, 2021
…oning (apache#10419)

* Automatically determine numShards for parallel ingestion hash partitioning

* Fix inspection, tests, coverage

* Docs and some PR comments

* Adjust locking

* Use HllSketch instead of HyperLogLogCollector

* Fix tests

* Address some PR comments

* Fix granularity bug

* Small doc fix
JulianJaffePinterest pushed a commit to JulianJaffePinterest/druid that referenced this pull request Jan 22, 2021
* Compaction config UI optional numShards

Specifying `numShards` for hashed partitions is no longer required after
apache#10419. Update the UI to make
`numShards` an optional field for hash partitions.

* Update snapshot
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants