Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions docs/content/development/extensions-core/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,7 @@ The MySQL extension provides an implementation of an [SqlFirehose](../../ingesti
}
},
"tuningconfig": {
"type": "index",
"maxNumSubTasks": 1
"type": "index"
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions docs/content/development/extensions-core/postgresql.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,7 @@ The PostgreSQL extension provides an implementation of an [SqlFirehose](../../in
}
},
"tuningconfig": {
"type": "index",
"maxNumSubTasks": 1
"type": "index"
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion docs/content/ingestion/hadoop-vs-native-batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ ingestion method.

| |Hadoop-based ingestion|Native parallel ingestion|Native local ingestion|
|---|----------------------|-------------------------|----------------------|
| Parallel indexing | Always parallel | Parallel if firehose is splittable <br/> & maxNumSubTasks > 1 in tuningConfig | Always sequential |
| Parallel indexing | Always parallel | Parallel if firehose is splittable <br/> & maxNumConcurrentSubTasks > 1 in tuningConfig | Always sequential |
| Supported indexing modes | Overwriting mode | Both appending and overwriting modes | Both appending and overwriting modes |
| External dependency | Hadoop (it internally submits Hadoop jobs) | No dependency | No dependency |
| Supported [rollup modes](./index.html#roll-up-modes) | Perfect rollup | Both perfect and best-effort rollup | Both perfect and best-effort rollup |
Expand Down
24 changes: 12 additions & 12 deletions docs/content/ingestion/native_tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ where the first phase tasks ran. In the second phase, each sub task fetches
partitioned data from middleManagers or indexers and merges them to create the final segments.
As in the single phase execution, the created segments are reported to the supervisor task to publish at once.

To use this task, the `firehose` in `ioConfig` should be _splittable_ and `maxNumSubTasks` should be set something larger than 1 in `tuningConfig`.
To use this task, the `firehose` in `ioConfig` should be _splittable_ and `maxNumConcurrentSubTasks` should be set something larger than 1 in `tuningConfig`.
Otherwise, this task runs sequentially. Here is the list of currently splittable fireshoses.

- [`LocalFirehose`](./firehose.html#localfirehose)
Expand All @@ -73,8 +73,8 @@ if one of them fails.

You may want to consider the below things:

- The number of concurrent tasks run in parallel ingestion is determined by `maxNumSubTasks` in the `tuningConfig`.
The supervisor task checks the number of current running sub tasks and creates more if it's smaller than `maxNumSubTasks` no matter how many task slots are currently available.
- The number of concurrent tasks run in parallel ingestion is determined by `maxNumConcurrentSubTasks` in the `tuningConfig`.
The supervisor task checks the number of current running sub tasks and creates more if it's smaller than `maxNumConcurrentSubTasks` no matter how many task slots are currently available.
This may affect to other ingestion performance. See the below [Capacity Planning](#capacity-planning) section for more details.
- By default, batch ingestion replaces all data in any segment that it writes to. If you'd like to add to the segment
instead, set the `appendToExisting` flag in `ioConfig`. Note that it only replaces data in segments where it actively adds
Expand Down Expand Up @@ -151,7 +151,7 @@ An example ingestion spec is:
},
"tuningconfig": {
"type": "index_parallel",
"maxNumSubTasks": 2
"maxNumConcurrentSubTasks": 2
}
}
}
Expand Down Expand Up @@ -205,7 +205,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|reportParseExceptions|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|false|no|
|pushTimeout|Milliseconds to wait for pushing segments. It must be >= 0, where 0 means to wait forever.|0|no|
|segmentWriteOutMediumFactory|Segment write-out medium to use when creating segments. See [SegmentWriteOutMediumFactory](#segmentWriteOutMediumFactory).|Not specified, the value from `druid.peon.defaultSegmentWriteOutMediumFactory.type` is used|no|
|maxNumSubTasks|Maximum number of tasks which can be run at the same time. The supervisor task would spawn worker tasks up to `maxNumSubTasks` regardless of the current available task slots. If this value is set to 1, the supervisor task processes data ingestion on its own instead of spawning worker tasks. If this value is set to too large, too many worker tasks can be created which might block other ingestion. Check [Capacity Planning](#capacity-planning) for more details.|1|no|
|maxNumConcurrentSubTasks|Maximum number of tasks which can be run in parallel at the same time. The supervisor task would spawn worker tasks up to `maxNumConcurrentSubTasks` regardless of the current available task slots. If this value is set to 1, the supervisor task processes data ingestion on its own instead of spawning worker tasks. If this value is set to too large, too many worker tasks can be created which might block other ingestion. Check [Capacity Planning](#capacity-planning) for more details.|1|no|
|maxRetry|Maximum number of retries on task failures.|3|no|
|maxNumSegmentsToMerge|Max limit for the number of segments that a single task can merge at the same time in the second phase. Used only `forceGuaranteedRollup` is set.|100|no|
|totalNumMergeTasks|Total number of tasks to merge segments in the second phase when `forceGuaranteedRollup` is set.|10|no|
Expand Down Expand Up @@ -430,7 +430,7 @@ An example of the result is
"reportParseExceptions": false,
"pushTimeout": 0,
"segmentWriteOutMediumFactory": null,
"maxNumSubTasks": 4,
"maxNumConcurrentSubTasks": 4,
"maxRetry": 3,
"taskStatusCheckPeriodMs": 1000,
"chatHandlerTimeout": "PT10S",
Expand Down Expand Up @@ -468,22 +468,22 @@ Returns the task attempt history of the worker task spec of the given id, or HTT

### Capacity Planning

The supervisor task can create up to `maxNumSubTasks` worker tasks no matter how many task slots are currently available.
As a result, total number of tasks which can be run at the same time is `(maxNumSubTasks + 1)` (including the supervisor task).
The supervisor task can create up to `maxNumConcurrentSubTasks` worker tasks no matter how many task slots are currently available.
As a result, total number of tasks which can be run at the same time is `(maxNumConcurrentSubTasks + 1)` (including the supervisor task).
Please note that this can be even larger than total number of task slots (sum of the capacity of all workers).
If `maxNumSubTasks` is larger than `n (available task slots)`, then
`maxNumSubTasks` tasks are created by the supervisor task, but only `n` tasks would be started.
If `maxNumConcurrentSubTasks` is larger than `n (available task slots)`, then
`maxNumConcurrentSubTasks` tasks are created by the supervisor task, but only `n` tasks would be started.
Others will wait in the pending state until any running task is finished.

If you are using the Parallel Index Task with stream ingestion together,
we would recommend to limit the max capacity for batch ingestion to prevent
stream ingestion from being blocked by batch ingestion. Suppose you have
`t` Parallel Index Tasks to run at the same time, but want to limit
the max number of tasks for batch ingestion to `b`. Then, (sum of `maxNumSubTasks`
the max number of tasks for batch ingestion to `b`. Then, (sum of `maxNumConcurrentSubTasks`
of all Parallel Index Tasks + `t` (for supervisor tasks)) must be smaller than `b`.

If you have some tasks of a higher priority than others, you may set their
`maxNumSubTasks` to a higher value than lower priority tasks.
`maxNumConcurrentSubTasks` to a higher value than lower priority tasks.
This may help the higher priority tasks to finish earlier than lower priority tasks
by assigning more task slots to them.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public abstract class ParallelIndexPhaseRunner<SubTaskType extends Task, SubTask
this.groupId = groupId;
this.tuningConfig = tuningConfig;
this.context = context;
this.maxNumConcurrentSubTasks = tuningConfig.getMaxNumSubTasks();
this.maxNumConcurrentSubTasks = tuningConfig.getMaxNumConcurrentSubTasks();
this.indexingServiceClient = Preconditions.checkNotNull(indexingServiceClient, "indexingServiceClient");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,10 +395,10 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
"firehoseFactory[%s] is not splittable. Running sequentially.",
baseFirehoseFactory.getClass().getSimpleName()
);
} else if (ingestionSchema.getTuningConfig().getMaxNumSubTasks() == 1) {
} else if (ingestionSchema.getTuningConfig().getMaxNumConcurrentSubTasks() == 1) {
LOG.warn(
"maxNumSubTasks is 1. Running sequentially. "
+ "Please set maxNumSubTasks to something higher than 1 if you want to run in parallel ingestion mode."
"maxNumConcurrentSubTasks is 1. Running sequentially. "
+ "Please set maxNumConcurrentSubTasks to something higher than 1 if you want to run in parallel ingestion mode."
);
} else {
throw new ISE("Unknown reason for sequentail mode. Failing this task.");
Expand Down Expand Up @@ -430,7 +430,7 @@ private void initializeSubTaskCleaner()

private boolean isParallelMode()
{
return baseFirehoseFactory.isSplittable() && ingestionSchema.getTuningConfig().getMaxNumSubTasks() > 1;
return baseFirehoseFactory.isSplittable() && ingestionSchema.getTuningConfig().getMaxNumConcurrentSubTasks() > 1;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.base.Preconditions;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.joda.time.Duration;
Expand All @@ -36,7 +37,7 @@
@JsonTypeName("index_parallel")
public class ParallelIndexTuningConfig extends IndexTuningConfig
{
private static final int DEFAULT_MAX_NUM_BATCH_TASKS = 1;
private static final int DEFAULT_MAX_NUM_CONCURRENT_SUB_TASKS = 1;
private static final int DEFAULT_MAX_RETRY = 3;
private static final long DEFAULT_TASK_STATUS_CHECK_PERIOD_MS = 1000;

Expand All @@ -45,7 +46,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
private static final int DEFAULT_MAX_NUM_SEGMENTS_TO_MERGE = 100;
private static final int DEFAULT_TOTAL_NUM_MERGE_TASKS = 10;

private final int maxNumSubTasks;
private final int maxNumConcurrentSubTasks;
private final int maxRetry;
private final long taskStatusCheckPeriodMs;

Expand Down Expand Up @@ -92,6 +93,7 @@ static ParallelIndexTuningConfig defaultConfig()
null,
null,
null,
null,
null
);
}
Expand All @@ -112,7 +114,8 @@ public ParallelIndexTuningConfig(
@JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions,
@JsonProperty("pushTimeout") @Nullable Long pushTimeout,
@JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
@JsonProperty("maxNumSubTasks") @Nullable Integer maxNumSubTasks,
@JsonProperty("maxNumSubTasks") @Deprecated @Nullable Integer maxNumSubTasks,
@JsonProperty("maxNumConcurrentSubTasks") @Nullable Integer maxNumConcurrentSubTasks,
@JsonProperty("maxRetry") @Nullable Integer maxRetry,
@JsonProperty("taskStatusCheckPeriodMs") @Nullable Integer taskStatusCheckPeriodMs,
@JsonProperty("chatHandlerTimeout") @Nullable Duration chatHandlerTimeout,
Expand Down Expand Up @@ -147,7 +150,15 @@ public ParallelIndexTuningConfig(
maxSavedParseExceptions
);

this.maxNumSubTasks = maxNumSubTasks == null ? DEFAULT_MAX_NUM_BATCH_TASKS : maxNumSubTasks;
if (maxNumSubTasks != null && maxNumConcurrentSubTasks != null) {
throw new IAE("Can't use both maxNumSubTasks and maxNumConcurrentSubTasks. Use maxNumConcurrentSubTasks instead");
}

if (maxNumConcurrentSubTasks == null) {
this.maxNumConcurrentSubTasks = maxNumSubTasks == null ? DEFAULT_MAX_NUM_CONCURRENT_SUB_TASKS : maxNumSubTasks;
} else {
this.maxNumConcurrentSubTasks = maxNumConcurrentSubTasks;
}
this.maxRetry = maxRetry == null ? DEFAULT_MAX_RETRY : maxRetry;
this.taskStatusCheckPeriodMs = taskStatusCheckPeriodMs == null ?
DEFAULT_TASK_STATUS_CHECK_PERIOD_MS :
Expand All @@ -166,15 +177,15 @@ public ParallelIndexTuningConfig(
? DEFAULT_TOTAL_NUM_MERGE_TASKS
: totalNumMergeTasks;

Preconditions.checkArgument(this.maxNumSubTasks > 0, "maxNumSubTasks must be positive");
Preconditions.checkArgument(this.maxNumConcurrentSubTasks > 0, "maxNumConcurrentSubTasks must be positive");
Preconditions.checkArgument(this.maxNumSegmentsToMerge > 0, "maxNumSegmentsToMerge must be positive");
Preconditions.checkArgument(this.totalNumMergeTasks > 0, "totalNumMergeTasks must be positive");
}

@JsonProperty
public int getMaxNumSubTasks()
public int getMaxNumConcurrentSubTasks()
{
return maxNumSubTasks;
return maxNumConcurrentSubTasks;
}

@JsonProperty
Expand Down Expand Up @@ -226,7 +237,7 @@ public boolean equals(Object o)
return false;
}
ParallelIndexTuningConfig that = (ParallelIndexTuningConfig) o;
return maxNumSubTasks == that.maxNumSubTasks &&
return maxNumConcurrentSubTasks == that.maxNumConcurrentSubTasks &&
maxRetry == that.maxRetry &&
taskStatusCheckPeriodMs == that.taskStatusCheckPeriodMs &&
chatHandlerNumRetries == that.chatHandlerNumRetries &&
Expand All @@ -240,7 +251,7 @@ public int hashCode()
{
return Objects.hash(
super.hashCode(),
maxNumSubTasks,
maxNumConcurrentSubTasks,
maxRetry,
taskStatusCheckPeriodMs,
chatHandlerTimeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ public void testMissingNumShards()
null,
null,
null,
null,
2,
null,
null,
Expand Down Expand Up @@ -221,6 +222,7 @@ private ParallelIndexSupervisorTask newTask(
null,
null,
null,
null,
2,
null,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ private ParallelIndexSupervisorTask newTask(
null,
null,
null,
null,
numTotalSubTasks,
null,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ private TestSupervisorTask newTask(
null,
null,
null,
null,
NUM_SUB_TASKS,
null,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ private ParallelIndexSupervisorTask newTask(
null,
null,
null,
null,
2,
null,
null,
Expand Down
Loading