Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 38 additions & 8 deletions docs/content/ingestion/native_tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,17 @@ which specifies a split and submits worker tasks using those specs. As a result,
the implementation of splittable firehoses. Please note that multiple tasks can be created for the same worker task spec
if one of them fails.

Since this task doesn't shuffle intermediate data, it isn't available for [perfect rollup](../ingestion/index.html#roll-up-modes).
You may want to consider the below points:
- Since this task doesn't shuffle intermediate data, it isn't available for [perfect rollup](../ingestion/index.html#roll-up-modes).
- The number of tasks for parallel ingestion is decided by `maxNumSubTasks` in the tuningConfig.
Since the supervisor task creates up to `maxNumSubTasks` worker tasks regardless of the available task slots,
it may affect to other ingestion performance. As a result, it's important to set `maxNumSubTasks` properly.
See the below [Capacity Planning](#capacity-planning) section for more details.
- By default, batch ingestion replaces all data in any segment that it writes to. If you'd like to add to the segment
instead, set the appendToExisting flag in ioConfig. Note that it only replaces data in segments where it actively adds
data: if there are segments in your granularitySpec's intervals that have no data written by this task, they will be
left alone.


An example ingestion spec is:

Expand Down Expand Up @@ -122,16 +132,15 @@ An example ingestion spec is:
"baseDir": "examples/indexing/",
"filter": "wikipedia_index_data*"
}
},
"tuningconfig": {
"type": "index_parallel",
"maxNumSubTasks": 2
}
}
}
```

By default, batch ingestion replaces all data in any segment that it writes to. If you'd like to add to the segment
instead, set the appendToExisting flag in ioConfig. Note that it only replaces data in segments where it actively adds
data: if there are segments in your granularitySpec's intervals that have no data written by this task, they will be
left alone.

#### Task Properties

|property|description|required?|
Expand Down Expand Up @@ -181,7 +190,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|reportParseExceptions|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|false|no|
|pushTimeout|Milliseconds to wait for pushing segments. It must be >= 0, where 0 means to wait forever.|0|no|
|segmentWriteOutMediumFactory|Segment write-out medium to use when creating segments. See [SegmentWriteOutMediumFactory](#segmentWriteOutMediumFactory).|Not specified, the value from `druid.peon.defaultSegmentWriteOutMediumFactory.type` is used|no|
|maxNumSubTasks|Maximum number of tasks which can be run at the same time.|Integer.MAX_VALUE|no|
|maxNumSubTasks|Maximum number of tasks which can be run at the same time. The supervisor task would spawn worker tasks up to `maxNumSubTasks` regardless of the available task slots. If this value is set to 1, the supervisor task processes data ingestion on its own instead of spawning worker tasks. If this value is set to too large, too many worker tasks can be created which might block other ingestion. Check [Capacity Planning](#capacity-planning) for more details.|1|no|
|maxRetry|Maximum number of retries on task failures.|3|no|
|taskStatusCheckPeriodMs|Polling period in milleseconds to check running task statuses.|1000|no|
|chatHandlerTimeout|Timeout for reporting the pushed segments in worker tasks.|PT10S|no|
Expand Down Expand Up @@ -372,7 +381,7 @@ An example of the result is
"reportParseExceptions": false,
"pushTimeout": 0,
"segmentWriteOutMediumFactory": null,
"maxNumSubTasks": 2147483647,
"maxNumSubTasks": 4,
"maxRetry": 3,
"taskStatusCheckPeriodMs": 1000,
"chatHandlerTimeout": "PT10S",
Expand Down Expand Up @@ -408,6 +417,27 @@ An example of the result is

Returns the task attempt history of the worker task spec of the given id, or HTTP 404 Not Found error if the supervisor task is running in the sequential mode.

### Capacity Planning

The supervisor task can create up to `maxNumSubTasks` worker tasks no matter how many task slots are currently available.
As a result, total number of tasks which can be run at the same time is `(maxNumSubTasks + 1)` (including the supervisor task).
Please note that this can be even larger than total number of task slots (sum of the capacity of all workers).
If `maxNumSubTasks` is larger than `n (available task slots)`, then
`maxNumSubTasks` tasks are created by the supervisor task, but only `n` tasks would be started.
Others will wait in the pending state until any running task is finished.

If you are using the Parallel Index Task with stream ingestion together,
we would recommend to limit the max capacity for batch ingestion to prevent
stream ingestion from being blocked by batch ingestion. Suppose you have
`t` Parallel Index Tasks to run at the same time, but want to limit
the max number of tasks for batch ingestion to `b`. Then, (sum of `maxNumSubTasks`
of all Parallel Index Tasks + `t` (for supervisor tasks)) must be smaller than `b`.

If you have some tasks of a higher priority than others, you may set their
`maxNumSubTasks` to a higher value than lower priority tasks.
This may help the higher priority tasks to finish earlier than lower priority tasks
by assigning more task slots to them.

Local Index Task
----------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,16 @@ public TaskStatus run(final TaskToolbox toolbox)
try {
if (chatHandlerProvider.isPresent()) {
log.info("Found chat handler of class[%s]", chatHandlerProvider.get().getClass().getName());
chatHandlerProvider.get().register(getId(), this, false);

if (chatHandlerProvider.get().get(getId()).isPresent()) {
// This is a workaround for ParallelIndexSupervisorTask to avoid double registering when it runs in the
// sequential mode. See ParallelIndexSupervisorTask.runSequential().
// Note that all HTTP endpoints are not available in this case. This works only for
// ParallelIndexSupervisorTask because it doesn't support APIs for live ingestion reports.
log.warn("Chat handler is already registered. Skipping chat handler registration.");
} else {
chatHandlerProvider.get().register(getId(), this, false);
}
} else {
log.warn("No chat handler detected");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,13 +256,23 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception
chatHandlerProvider.register(getId(), this, false);

try {
if (baseFirehoseFactory.isSplittable()) {
if (isParallelMode()) {
return runParallel(toolbox);
} else {
log.warn(
"firehoseFactory[%s] is not splittable. Running sequentially",
baseFirehoseFactory.getClass().getSimpleName()
);
if (!baseFirehoseFactory.isSplittable()) {
log.warn(
"firehoseFactory[%s] is not splittable. Running sequentially.",
baseFirehoseFactory.getClass().getSimpleName()
);
} else if (ingestionSchema.getTuningConfig().getMaxNumSubTasks() == 1) {
log.warn(
"maxNumSubTasks is 1. Running sequentially. "
+ "Please set maxNumSubTasks to something higher than 1 if you want to run in parallel ingestion mode."
);
} else {
throw new ISE("Unknown reason for sequentail mode. Failing this task.");
}

return runSequential(toolbox);
}
}
Expand All @@ -271,6 +281,15 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception
}
}

private boolean isParallelMode()
{
if (baseFirehoseFactory.isSplittable() && ingestionSchema.getTuningConfig().getMaxNumSubTasks() > 1) {
return true;
} else {
return false;
}
}

@VisibleForTesting
void setToolbox(TaskToolbox toolbox)
{
Expand All @@ -280,7 +299,7 @@ void setToolbox(TaskToolbox toolbox)
private TaskStatus runParallel(TaskToolbox toolbox) throws Exception
{
createRunner(toolbox);
return TaskStatus.fromCode(getId(), runner.run());
return TaskStatus.fromCode(getId(), Preconditions.checkNotNull(runner, "runner").run());
}

private TaskStatus runSequential(TaskToolbox toolbox)
Expand Down Expand Up @@ -479,11 +498,7 @@ public Response report(
public Response getMode(@Context final HttpServletRequest req)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
if (runner == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build();
} else {
return Response.ok(baseFirehoseFactory.isSplittable() ? "parallel" : "sequential").build();
}
return Response.ok(isParallelMode() ? "parallel" : "sequential").build();
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
Expand All @@ -34,7 +35,7 @@
@JsonTypeName("index_parallel")
public class ParallelIndexTuningConfig extends IndexTuningConfig
{
private static final int DEFAULT_MAX_NUM_BATCH_TASKS = Integer.MAX_VALUE; // unlimited
private static final int DEFAULT_MAX_NUM_BATCH_TASKS = 1;
private static final int DEFAULT_MAX_RETRY = 3;
private static final long DEFAULT_TASK_STATUS_CHECK_PERIOD_MS = 1000;

Expand Down Expand Up @@ -131,6 +132,8 @@ public ParallelIndexTuningConfig(

this.chatHandlerTimeout = DEFAULT_CHAT_HANDLER_TIMEOUT;
this.chatHandlerNumRetries = DEFAULT_CHAT_HANDLER_NUM_RETRIES;

Preconditions.checkArgument(this.maxNumSubTasks > 0, "maxNumSubTasks must be positive");
}

@JsonProperty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,22 +294,6 @@ public Authorizer getAuthorizer(String name)
new DropwizardRowIngestionMetersFactory()
);
}

@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
return TaskStatus.fromCode(
getId(),
new TestParallelIndexTaskRunner(
toolbox,
getId(),
getGroupId(),
getIngestionSchema(),
getContext(),
new NoopIndexingServiceClient()
).run()
);
}
}

static class TestParallelIndexTaskRunner extends SinglePhaseParallelIndexTaskRunner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,20 +281,16 @@ private TestSupervisorTask(
}

@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
ParallelIndexTaskRunner createRunner(TaskToolbox toolbox)
{
setToolbox(toolbox);
setRunner(
new TestRunner(
toolbox,
this,
indexingServiceClient
)
);
return TaskStatus.fromCode(
getId(),
getRunner().run()
);
return getRunner();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ private class TestSupervisorTask extends TestParallelIndexSupervisorTask
}

@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
ParallelIndexTaskRunner createRunner(TaskToolbox toolbox)
{
setRunner(
new TestRunner(
Expand All @@ -509,10 +509,7 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception
indexingServiceClient
)
);
return TaskStatus.fromCode(
getId(),
getRunner().run()
);
return getRunner();
}
}

Expand Down
Loading