Skip to content

Add support for parallel native indexing with shuffle for perfect rollup#8257

Merged
jihoonson merged 28 commits intoapache:masterfrom
jihoonson:superbatch-shuffle
Aug 16, 2019
Merged

Add support for parallel native indexing with shuffle for perfect rollup#8257
jihoonson merged 28 commits intoapache:masterfrom
jihoonson:superbatch-shuffle

Conversation

@jihoonson
Copy link
Copy Markdown
Contributor

Part of #8061.

This PR is based on #8236.

Description

This PR adds support for parallel native indexing with shuffle for perfect rollup.

New configurations in tuningConfig for parallel index task:

  • forceGuaranteedRollup: parallel indexing is executed in two phases with shuffle if this is set.
  • maxNumSegmentsToMerge: Max limit for the number of segments that a single task can merge at the same time in the second phase. Used only forceGuaranteedRollup is set.
  • totalNumMergeTasks: Total number of tasks to merge segments in the second phase when forceGuaranteedRollup is set.

Refactoring

  • FiniteFirehoseProcessor is added to share the duplicate codes to process a firehose
  • ParallelIndexPhaseRunner is added to share the duplicate codes in the implementations of ParallelIndexTaskRunner
  • Added SubTaskReport interface to support different types of subtask reports

Multi-phase indexing

If forceGuaranteedRollup is set, ParallelIndexSupervisorTask runs PartialSegmentGenerateParallelIndexTaskRunner and PartialSegmentMergeParallelIndexTaskRunner for the first and second phases, respectively.


This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added unit tests or modified existing tests to cover new code paths.
  • been tested in a test Druid cluster.
    • tested by ingesting the lineitem data of TPC-H@100GB

Comment thread docs/content/ingestion/native_tasks.md Outdated
each sub task creates segments individually and reports them to the supervisor task.

If `forceGuaranteedRollup` = true, it's executed in two phases with data shuffle which is similar to [MapReduce](https://en.wikipedia.org/wiki/MapReduce).
In the first phase, each sub task partitions input data based on `segmentGranularity` (primary partition key) in `granaulritySpec`
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

granaulritySpec -> granularitySpec

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, fixed.

static boolean isGuaranteedRollup(IndexIOConfig ioConfig, IndexTuningConfig tuningConfig)
{
Preconditions.checkState(
!tuningConfig.isForceGuaranteedRollup() || !ioConfig.isAppendToExisting(),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, I wonder if this restriction is truly necessary (would it make sense if someone wants to append a perfectly-rolled up set of segments to a non-perfectly rolled up set?)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or, pls add a comment explaining why, technically, we can't append new perfectly rolled up segments to existing. Is this something not possible or just not enabled at this point.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, good point. I think, technically, we can append perfectly-rolled up set of segments to non-perfectly rolled up one even though some code should be fixed to support it (ex, HashBasedNumberedShardSpec always assumes that the start partitionId in the perfectly-rolled up set is 0). I'm not sure how useful it is though. Maybe it could be useful in some use cases. Added javadoc.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HashBasedNumberedShardSpec always assumes that the start partitionId in the perfectly-rolled up set is 0

Should you put some of the things that it would take to support it in the javadoc just in case it does become useful to save someone in the future some trouble?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some comment.

GranularitySpec granularitySpec,
IndexIOConfig ioConfig,
IndexTuningConfig tuningConfig,
PartitionsSpec nonNullPartitionsSpec
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest a @Nonnull annotation instead

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the annotation, but kept the name as it is as well since it doesn't look harm.


if (isGuaranteedRollup(ioConfig, tuningConfig)) {
// Overwrite mode, guaranteed rollup: shardSpecs must be known in advance.
assert nonNullPartitionsSpec instanceof HashedPartitionsSpec;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this because we need to run job like DeterminePartitionsJob to figure out partitions otherwise, which will add another phase and avoided right now?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, the thing about DeterminePartitionsJob is correct, but this assertion is because the index task and the parallel index task currently only supports the hashed partitions spec. The range partitions spec will be supported as well and this assertion will be removed in the future.

Or, if you're asking about the comment, the index task already has a similar mode to determine partitions automatically and this method will not be called in that mode.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't asking about the comment but was trying to understand what limits us technically from using dimension partition spec . it seems my guess was correct. it would be nice to add that in the comment.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment.


try {
if (eachSpec.isReady(toolbox.getTaskActionClient())) {
if (currentSubTaskHolder.setTask(eachSpec) && eachSpec.isReady(toolbox.getTaskActionClient())) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should currentSubTaskHolder.setTask(eachSpec) be handled separately, where the prevSpec == SPECIAL_VALUE_STOPPED check happened? With this change the log message about the task being intentionally stopped is gone and this loop would continue to check all task specs

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, fixed.

);
}
}
} else if (taskMonitor.getNumRunningTasks() < maxNumTasks) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would trigger this if case? It looks like the subTaskSpecIterator would be exhausted before entering the while (isRunning()) loop and all the subtasks should've been submitted already

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can happen if the number of sub tasks to execute is larger than maxNumTasks which is "the max number of sub tasks which can be executed concurrently at the same time". Renamed it to maxNumConcurrentSubTasks and added a javadoc.

public void onFailure(Throwable t)
{
// this callback is called only when there were some problems in TaskMonitor.
LOG.error(t, "Error while running a task for subTaskSpec[%s]", spec);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest revising the error log message to mention that it indicates issues with TaskMonitor specifically

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you elaborate more on what you think should be mentioned? The Future returned from taskMonitor.submit() indicates the result of processing a sub task spec after a certain amount of task retries on failures.

Copy link
Copy Markdown
Contributor

@jon-wei jon-wei Aug 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the comments here could discuss how onFailure is triggered after task retries are exhausted across the set of subtasks, and onSuccess is called for individual subtask success

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a javadoc to TaskMonitor.submit().

@Override
public void onSuccess(SubTaskCompleteEvent<SubTaskType> completeEvent)
{
// this callback is called if a task completed wheter it succeeded or not.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wheter -> whether

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, fixed.

// We have more subTasks to run
submitNewTask(taskMonitor, subTaskSpecIterator.next());
} else {
// We have more subTasks to run, but don't have enough available task slots
Copy link
Copy Markdown
Contributor

@jon-wei jon-wei Aug 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't have to be done now, but I think it could be nice to have a warning log for cases where tasks couldn't be scheduled for X number of loop iterations or some other threshold, indicating that there may too much contention for free task slots in the cluster

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

*
* @return the number of bytes copied
*/
public static <T> long fetch(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this could goto FileUtils class because, as dev, If I needed something like this, I would guess and look into FileUtils class to see if this code exists.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved 👍


@GET
@Path("/phase")
@Produces(MediaType.APPLICATION_ATOM_XML)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is MediaType.APPLICATION_ATOM_XML used here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, should be MediaType.APPLICATION_JSON. Fixed.

private static final Duration DEFAULT_CHAT_HANDLER_TIMEOUT = new Period("PT10S").toStandardDuration();
private static final int DEFAULT_CHAT_HANDLER_NUM_RETRIES = 5;
private static final int DEFAULT_MAX_NUM_SEGMENTS_TO_MERGE = 100;
private static final int DEFAULT_TOTAL_NUM_MERGE_TASKS = 10;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could consider making the default number of merge tasks the same as the effective value for maxNumSubTasks instead

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maxNumSubTasks is actually the number of max number of sub tasks that can be run concurrently at the same time. Raised #8318.

{
return URI.create(
StringUtils.format(
"http://%s:%d/druid/worker/v1/shuffle/task/%s/%s/partition?startTime=%s&endTime=%s&partitionId=%d",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if TLS is enabled and plaintext port is disabled on the data server?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! Fixed to use https if TLS is enabled.


final List<PartialSegmentMergeIOConfig> assignedPartitionLocations = new ArrayList<>(numMergeTasks);
for (int i = 0; i < numMergeTasks - 1; i++) {
final List<PartitionLocation> assingedToSameTask = partitions
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assingedToSameTask -> assignedToSameTask, as well as below

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, fixed.

}
} else {
// If it's still running, update last access time.
supervisorTaskCheckTimes.put(supervisorTaskId, DateTimes.nowUtc());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this and possibly other access time updates use DateTimes.nowUtc().plus(intermediaryPartitionTimeout) instead?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 fixed.

@himanshug
Copy link
Copy Markdown
Contributor

I read through this PR and looked LGTM to me overall. thumbs up for testing it with a large dataset

currently, user needs to set totalNumMergeTasks which by default is set to a static value of 10. as a follow up, it would be great to change the default value dynamically based on size of data, number of partitions, number of segment intervals etc ... so that default behavior just works for most cases.

Copy link
Copy Markdown
Member

@clintropolis clintropolis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall lgtm, 👍

/**
* Max number of segments to merge at the same time.
* Used only by {@link PartialSegmentMergeTask}.
* This configuration was temporally added to avoid using too much memory while merging segments,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think maybe 'temporarily' is more appropriate word here

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, fixed.


final SegmentsAndMetadata pushed = driver.pushAllAndClear(pushTimeout);
log.info("Pushed segments[%s]", pushed.getSegments());
final FiniteFirehoseProcessor firehoseProcessor = new FiniteFirehoseProcessor(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is nice 👍

static boolean isGuaranteedRollup(IndexIOConfig ioConfig, IndexTuningConfig tuningConfig)
{
Preconditions.checkState(
!tuningConfig.isForceGuaranteedRollup() || !ioConfig.isAppendToExisting(),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HashBasedNumberedShardSpec always assumes that the start partitionId in the perfectly-rolled up set is 0

Should you put some of the things that it would take to support it in the javadoc just in case it does become useful to save someone in the future some trouble?

if (mergedFiles.size() == 1) {
return Pair.of(mergedFiles.get(0), Preconditions.checkNotNull(dimensionNames, "dimensionNames"));
} else {
return mergeSegmentsInSamePartition(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the sizes involved here make the recursion not an issue 😅?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think the level of recursion shouldn't be very high.

@jihoonson
Copy link
Copy Markdown
Contributor Author

Thank you for the review.

currently, user needs to set totalNumMergeTasks which by default is set to a static value of 10. as a follow up, it would be great to change the default value dynamically based on size of data, number of partitions, number of segment intervals etc ... so that default behavior just works for most cases.

Yes, the number of merge tasks can be automatically computed in the future. PartitionStat was added to collect some statistics which could be useful for such computation. I think it would be pretty cool if we support it, but not sure when I can work on it yet. Will try to do it before 0.17.

Copy link
Copy Markdown
Member

@clintropolis clintropolis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 after CI

Copy link
Copy Markdown
Contributor

@jon-wei jon-wei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 after CI

@jihoonson
Copy link
Copy Markdown
Contributor Author

@jon-wei @himanshug @clintropolis thank you for the review!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants