From 9776b48391d9dc0eabd636ceeb3cee3f38537e07 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 1 Mar 2019 23:44:10 -0800 Subject: [PATCH 1/7] Reduce # of max subTasks to 2 --- docs/content/ingestion/native_tasks.md | 2 +- .../common/task/batch/parallel/ParallelIndexTuningConfig.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/content/ingestion/native_tasks.md b/docs/content/ingestion/native_tasks.md index 837574cb5894..698c9ea6f188 100644 --- a/docs/content/ingestion/native_tasks.md +++ b/docs/content/ingestion/native_tasks.md @@ -181,7 +181,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |reportParseExceptions|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|false|no| |pushTimeout|Milliseconds to wait for pushing segments. It must be >= 0, where 0 means to wait forever.|0|no| |segmentWriteOutMediumFactory|Segment write-out medium to use when creating segments. See [SegmentWriteOutMediumFactory](#segmentWriteOutMediumFactory).|Not specified, the value from `druid.peon.defaultSegmentWriteOutMediumFactory.type` is used|no| -|maxNumSubTasks|Maximum number of tasks which can be run at the same time.|Integer.MAX_VALUE|no| +|maxNumSubTasks|Maximum number of tasks which can be run at the same time. The supervisor task would spawn sub tasks up to `maxNumSubTasks` regardless of the reamining task slots. If this value is set to too large, too many sub tasks can be created which might block other ingestion.|2|no| |maxRetry|Maximum number of retries on task failures.|3|no| |taskStatusCheckPeriodMs|Polling period in milleseconds to check running task statuses.|1000|no| |chatHandlerTimeout|Timeout for reporting the pushed segments in worker tasks.|PT10S|no| diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java index 85929dbd880c..c44bcfbad8b8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java @@ -34,7 +34,7 @@ @JsonTypeName("index_parallel") public class ParallelIndexTuningConfig extends IndexTuningConfig { - private static final int DEFAULT_MAX_NUM_BATCH_TASKS = Integer.MAX_VALUE; // unlimited + private static final int DEFAULT_MAX_NUM_BATCH_TASKS = 2; private static final int DEFAULT_MAX_RETRY = 3; private static final long DEFAULT_TASK_STATUS_CHECK_PERIOD_MS = 1000; From 65258f825055da922960f133479ef41af19e1ef7 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Sun, 3 Mar 2019 21:26:48 -0800 Subject: [PATCH 2/7] fix typo and add more doc --- docs/content/ingestion/native_tasks.md | 42 +++++++++++++++++++++----- 1 file changed, 35 insertions(+), 7 deletions(-) diff --git a/docs/content/ingestion/native_tasks.md b/docs/content/ingestion/native_tasks.md index 698c9ea6f188..476a46b54ae2 100644 --- a/docs/content/ingestion/native_tasks.md +++ b/docs/content/ingestion/native_tasks.md @@ -54,7 +54,17 @@ which specifies a split and submits worker tasks using those specs. As a result, the implementation of splittable firehoses. Please note that multiple tasks can be created for the same worker task spec if one of them fails. -Since this task doesn't shuffle intermediate data, it isn't available for [perfect rollup](../ingestion/index.html#roll-up-modes). +You may want to consider the below points: +- Since this task doesn't shuffle intermediate data, it isn't available for [perfect rollup](../ingestion/index.html#roll-up-modes). +- The number of tasks for parallel ingestion is decided by `maxNumSubTasks` in the tuningConfig. + Since the supervisor task creates up to `maxNumSubTasks` worker tasks regardless of the available task slots, + it may affect to other ingestion performance. As a result, it's important to set `maxNumSubTasks` properly. + See the below [Capacity Planning](#capacity-planning) section for more details. +- By default, batch ingestion replaces all data in any segment that it writes to. If you'd like to add to the segment + instead, set the appendToExisting flag in ioConfig. Note that it only replaces data in segments where it actively adds + data: if there are segments in your granularitySpec's intervals that have no data written by this task, they will be + left alone. + An example ingestion spec is: @@ -122,16 +132,15 @@ An example ingestion spec is: "baseDir": "examples/indexing/", "filter": "wikipedia_index_data*" } + }, + "tuningconfig": { + "type": "index_parallel", + "maxNumSubTasks": 2 } } } ``` -By default, batch ingestion replaces all data in any segment that it writes to. If you'd like to add to the segment -instead, set the appendToExisting flag in ioConfig. Note that it only replaces data in segments where it actively adds -data: if there are segments in your granularitySpec's intervals that have no data written by this task, they will be -left alone. - #### Task Properties |property|description|required?| @@ -181,7 +190,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |reportParseExceptions|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|false|no| |pushTimeout|Milliseconds to wait for pushing segments. It must be >= 0, where 0 means to wait forever.|0|no| |segmentWriteOutMediumFactory|Segment write-out medium to use when creating segments. See [SegmentWriteOutMediumFactory](#segmentWriteOutMediumFactory).|Not specified, the value from `druid.peon.defaultSegmentWriteOutMediumFactory.type` is used|no| -|maxNumSubTasks|Maximum number of tasks which can be run at the same time. The supervisor task would spawn sub tasks up to `maxNumSubTasks` regardless of the reamining task slots. If this value is set to too large, too many sub tasks can be created which might block other ingestion.|2|no| +|maxNumSubTasks|Maximum number of tasks which can be run at the same time. The supervisor task would spawn worker tasks up to `maxNumSubTasks` regardless of the available task slots. If this value is set to too large, too many worker tasks can be created which might block other ingestion.|2|no| |maxRetry|Maximum number of retries on task failures.|3|no| |taskStatusCheckPeriodMs|Polling period in milleseconds to check running task statuses.|1000|no| |chatHandlerTimeout|Timeout for reporting the pushed segments in worker tasks.|PT10S|no| @@ -408,6 +417,25 @@ An example of the result is Returns the task attempt history of the worker task spec of the given id, or HTTP 404 Not Found error if the supervisor task is running in the sequential mode. +### Capacity Planning + +The supervisor task can create up to `maxNumSubTasks` worker tasks no matter how many task slots are currently available. +As a result, total number of tasks which can be run at the same time is `(maxNumSubTasks + 1)` (including the supervisor task). +Please note that this can be even larger than total number of task slots (sum of the capacity of all workers). +If `maxNumSubTasks` is larger than `n (available task slots)`, then +`maxNumSubTasks` tasks are created by the supervisor task, but only `n` tasks would be started. +Others will wait in the pending state until any running task is finished. + +If you are using the Parallel Index Task with stream ingestion together, +we would recommend to limit the max capacity for batch ingestion to prevent +stream ingestion from being blocked by batch ingestion. Suppose you have +`t` Parallel Index Tasks to run at the same time, but want to limit +the max number of tasks for batch ingestion to `b`. Then, (sum of `maxNumSubTasks` +of all Parallel Index Tasks + `t` (for supervisor tasks)) must be smaller than `b`. + +If you have tasks of a higher priority than others, you may set their +`maxNumSubTasks` to a higher value than other tasks of a lower priority. + Local Index Task ---------------- From d7e41827825e83221691ed28ac5fcad4be55d548 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Mon, 4 Mar 2019 10:35:46 -0800 Subject: [PATCH 3/7] add more doc and link --- docs/content/ingestion/native_tasks.md | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/docs/content/ingestion/native_tasks.md b/docs/content/ingestion/native_tasks.md index 476a46b54ae2..000292840ee8 100644 --- a/docs/content/ingestion/native_tasks.md +++ b/docs/content/ingestion/native_tasks.md @@ -190,7 +190,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |reportParseExceptions|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|false|no| |pushTimeout|Milliseconds to wait for pushing segments. It must be >= 0, where 0 means to wait forever.|0|no| |segmentWriteOutMediumFactory|Segment write-out medium to use when creating segments. See [SegmentWriteOutMediumFactory](#segmentWriteOutMediumFactory).|Not specified, the value from `druid.peon.defaultSegmentWriteOutMediumFactory.type` is used|no| -|maxNumSubTasks|Maximum number of tasks which can be run at the same time. The supervisor task would spawn worker tasks up to `maxNumSubTasks` regardless of the available task slots. If this value is set to too large, too many worker tasks can be created which might block other ingestion.|2|no| +|maxNumSubTasks|Maximum number of tasks which can be run at the same time. The supervisor task would spawn worker tasks up to `maxNumSubTasks` regardless of the available task slots. If this value is set to too large, too many worker tasks can be created which might block other ingestion. Check [Capacity Planning](#capacity-planning) for more details.|2|no| |maxRetry|Maximum number of retries on task failures.|3|no| |taskStatusCheckPeriodMs|Polling period in milleseconds to check running task statuses.|1000|no| |chatHandlerTimeout|Timeout for reporting the pushed segments in worker tasks.|PT10S|no| @@ -433,8 +433,10 @@ stream ingestion from being blocked by batch ingestion. Suppose you have the max number of tasks for batch ingestion to `b`. Then, (sum of `maxNumSubTasks` of all Parallel Index Tasks + `t` (for supervisor tasks)) must be smaller than `b`. -If you have tasks of a higher priority than others, you may set their -`maxNumSubTasks` to a higher value than other tasks of a lower priority. +If you have some tasks of a higher priority than others, you may set their +`maxNumSubTasks` to a higher value than lower priority tasks. +This may help the higher priority tasks to finish earlier than lower priority tasks +by assigning more task slots to them. Local Index Task ---------------- From 409a9d15f266b80e7451e2cd095b2e0722eeb828 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Mon, 4 Mar 2019 16:44:38 -0800 Subject: [PATCH 4/7] change default and add warning --- .../task/batch/parallel/ParallelIndexSupervisorTask.java | 8 +++++++- .../task/batch/parallel/ParallelIndexTuningConfig.java | 2 +- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 90e359df17dd..9c30bc91b68a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -258,9 +258,15 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception try { if (baseFirehoseFactory.isSplittable()) { return runParallel(toolbox); + } else if (ingestionSchema.getTuningConfig().getMaxNumSubTasks() == 1) { + log.warn( + "maxNumSubTasks is 1. Running sequentially. " + + "Please set maxNumSubTasks to something higher than 1 if you want to run in parallel ingestion mode." + ); + return runSequential(toolbox); } else { log.warn( - "firehoseFactory[%s] is not splittable. Running sequentially", + "firehoseFactory[%s] is not splittable. Running sequentially.", baseFirehoseFactory.getClass().getSimpleName() ); return runSequential(toolbox); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java index c44bcfbad8b8..27eaecb80143 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java @@ -34,7 +34,7 @@ @JsonTypeName("index_parallel") public class ParallelIndexTuningConfig extends IndexTuningConfig { - private static final int DEFAULT_MAX_NUM_BATCH_TASKS = 2; + private static final int DEFAULT_MAX_NUM_BATCH_TASKS = 1; private static final int DEFAULT_MAX_RETRY = 3; private static final long DEFAULT_TASK_STATUS_CHECK_PERIOD_MS = 1000; From b264a4327b9314d11e6eb2263b274d31e5bac3b4 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Mon, 4 Mar 2019 20:36:14 -0800 Subject: [PATCH 5/7] fix doc --- docs/content/ingestion/native_tasks.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/content/ingestion/native_tasks.md b/docs/content/ingestion/native_tasks.md index 000292840ee8..4ecaccf53ac3 100644 --- a/docs/content/ingestion/native_tasks.md +++ b/docs/content/ingestion/native_tasks.md @@ -190,7 +190,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |reportParseExceptions|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|false|no| |pushTimeout|Milliseconds to wait for pushing segments. It must be >= 0, where 0 means to wait forever.|0|no| |segmentWriteOutMediumFactory|Segment write-out medium to use when creating segments. See [SegmentWriteOutMediumFactory](#segmentWriteOutMediumFactory).|Not specified, the value from `druid.peon.defaultSegmentWriteOutMediumFactory.type` is used|no| -|maxNumSubTasks|Maximum number of tasks which can be run at the same time. The supervisor task would spawn worker tasks up to `maxNumSubTasks` regardless of the available task slots. If this value is set to too large, too many worker tasks can be created which might block other ingestion. Check [Capacity Planning](#capacity-planning) for more details.|2|no| +|maxNumSubTasks|Maximum number of tasks which can be run at the same time. The supervisor task would spawn worker tasks up to `maxNumSubTasks` regardless of the available task slots. If this value is set to 1, the supervisor task processes data ingestion on its own instead of spawning worker tasks. If this value is set to too large, too many worker tasks can be created which might block other ingestion. Check [Capacity Planning](#capacity-planning) for more details.|1|no| |maxRetry|Maximum number of retries on task failures.|3|no| |taskStatusCheckPeriodMs|Polling period in milleseconds to check running task statuses.|1000|no| |chatHandlerTimeout|Timeout for reporting the pushed segments in worker tasks.|PT10S|no| @@ -381,7 +381,7 @@ An example of the result is "reportParseExceptions": false, "pushTimeout": 0, "segmentWriteOutMediumFactory": null, - "maxNumSubTasks": 2147483647, + "maxNumSubTasks": 4, "maxRetry": 3, "taskStatusCheckPeriodMs": 1000, "chatHandlerTimeout": "PT10S", From ae4c1d0a9b4720c4ef134f818ddbf851a7788c16 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 5 Mar 2019 13:20:37 -0800 Subject: [PATCH 6/7] add test --- .../parallel/ParallelIndexSupervisorTask.java | 18 +-- ...stractParallelIndexSupervisorTaskTest.java | 16 --- .../ParallelIndexSupervisorTaskKillTest.java | 8 +- ...rallelIndexSupervisorTaskResourceTest.java | 7 +- .../ParallelIndexSupervisorTaskTest.java | 108 +++++++++++++----- 5 files changed, 93 insertions(+), 64 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 9c30bc91b68a..3fc5f66e0570 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -257,13 +257,15 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception try { if (baseFirehoseFactory.isSplittable()) { - return runParallel(toolbox); - } else if (ingestionSchema.getTuningConfig().getMaxNumSubTasks() == 1) { - log.warn( - "maxNumSubTasks is 1. Running sequentially. " - + "Please set maxNumSubTasks to something higher than 1 if you want to run in parallel ingestion mode." - ); - return runSequential(toolbox); + if (ingestionSchema.getTuningConfig().getMaxNumSubTasks() == 1) { + log.warn( + "maxNumSubTasks is 1. Running sequentially. " + + "Please set maxNumSubTasks to something higher than 1 if you want to run in parallel ingestion mode." + ); + return runSequential(toolbox); + } else { + return runParallel(toolbox); + } } else { log.warn( "firehoseFactory[%s] is not splittable. Running sequentially.", @@ -286,7 +288,7 @@ void setToolbox(TaskToolbox toolbox) private TaskStatus runParallel(TaskToolbox toolbox) throws Exception { createRunner(toolbox); - return TaskStatus.fromCode(getId(), runner.run()); + return TaskStatus.fromCode(getId(), Preconditions.checkNotNull(runner, "runner").run()); } private TaskStatus runSequential(TaskToolbox toolbox) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java index 7dbdc133f7d4..8792956bc95a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java @@ -294,22 +294,6 @@ public Authorizer getAuthorizer(String name) new DropwizardRowIngestionMetersFactory() ); } - - @Override - public TaskStatus run(TaskToolbox toolbox) throws Exception - { - return TaskStatus.fromCode( - getId(), - new TestParallelIndexTaskRunner( - toolbox, - getId(), - getGroupId(), - getIngestionSchema(), - getContext(), - new NoopIndexingServiceClient() - ).run() - ); - } } static class TestParallelIndexTaskRunner extends SinglePhaseParallelIndexTaskRunner diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java index 6e71656fe2b9..c7c8d0b897e3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java @@ -281,9 +281,8 @@ private TestSupervisorTask( } @Override - public TaskStatus run(TaskToolbox toolbox) throws Exception + ParallelIndexTaskRunner createRunner(TaskToolbox toolbox) { - setToolbox(toolbox); setRunner( new TestRunner( toolbox, @@ -291,10 +290,7 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception indexingServiceClient ) ); - return TaskStatus.fromCode( - getId(), - getRunner().run() - ); + return getRunner(); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java index 668c8ec19856..04aa5a7a1b07 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java @@ -500,7 +500,7 @@ private class TestSupervisorTask extends TestParallelIndexSupervisorTask } @Override - public TaskStatus run(TaskToolbox toolbox) throws Exception + ParallelIndexTaskRunner createRunner(TaskToolbox toolbox) { setRunner( new TestRunner( @@ -509,10 +509,7 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception indexingServiceClient ) ); - return TaskStatus.fromCode( - getId(), - getRunner().run() - ); + return getRunner(); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java index 241e9f57fae5..efc1fc493bf2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java @@ -24,7 +24,6 @@ import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.indexer.TaskState; -import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.task.TaskResource; @@ -229,33 +228,55 @@ public void testPublishEmptySegments() throws Exception Assert.assertEquals(TaskState.SUCCESS, task.run(toolbox).getStatusCode()); } + @Test + public void testWith1MaxNumSubTasks() throws Exception + { + final ParallelIndexSupervisorTask task = newTask( + Intervals.of("2017/2018"), + new ParallelIndexIOConfig( + new LocalFirehoseFactory(inputDir, "test_*", null), + false + ), + new ParallelIndexTuningConfig( + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + 1, + null, + null, + null, + null, + null, + null, + null + ) + ); + actionClient = createActionClient(task); + toolbox = createTaskToolbox(task); + + prepareTaskForLocking(task); + Assert.assertTrue(task.isReady(actionClient)); + Assert.assertEquals(TaskState.SUCCESS, task.run(toolbox).getStatusCode()); + Assert.assertNull("Runner must be null if the task was in the sequential mode", task.getRunner()); + } + private ParallelIndexSupervisorTask newTask( Interval interval, ParallelIndexIOConfig ioConfig ) { - // set up ingestion spec - final ParallelIndexIngestionSpec ingestionSpec = new ParallelIndexIngestionSpec( - new DataSchema( - "dataSource", - getObjectMapper().convertValue( - new StringInputRowParser( - DEFAULT_PARSE_SPEC, - null - ), - Map.class - ), - new AggregatorFactory[]{ - new LongSumAggregatorFactory("val", "val") - }, - new UniformGranularitySpec( - Granularities.DAY, - Granularities.MINUTE, - interval == null ? null : Collections.singletonList(interval) - ), - null, - getObjectMapper() - ), + return newTask( + interval, ioConfig, new ParallelIndexTuningConfig( null, @@ -281,6 +302,39 @@ private ParallelIndexSupervisorTask newTask( null ) ); + } + + private ParallelIndexSupervisorTask newTask( + Interval interval, + ParallelIndexIOConfig ioConfig, + ParallelIndexTuningConfig tuningConfig + ) + { + // set up ingestion spec + final ParallelIndexIngestionSpec ingestionSpec = new ParallelIndexIngestionSpec( + new DataSchema( + "dataSource", + getObjectMapper().convertValue( + new StringInputRowParser( + DEFAULT_PARSE_SPEC, + null + ), + Map.class + ), + new AggregatorFactory[]{ + new LongSumAggregatorFactory("val", "val") + }, + new UniformGranularitySpec( + Granularities.DAY, + Granularities.MINUTE, + interval == null ? null : Collections.singletonList(interval) + ), + null, + getObjectMapper() + ), + ioConfig, + tuningConfig + ); // set up test tools return new TestSupervisorTask( @@ -315,9 +369,8 @@ private static class TestSupervisorTask extends TestParallelIndexSupervisorTask } @Override - public TaskStatus run(TaskToolbox toolbox) throws Exception + ParallelIndexTaskRunner createRunner(TaskToolbox toolbox) { - setToolbox(toolbox); setRunner( new TestRunner( toolbox, @@ -325,10 +378,7 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception indexingServiceClient ) ); - return TaskStatus.fromCode( - getId(), - getRunner().run() - ); + return getRunner(); } } From a051f7e4547692efbb44e0a64e209864030c5c70 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 5 Mar 2019 15:16:01 -0800 Subject: [PATCH 7/7] fix it test --- .../druid/indexing/common/task/IndexTask.java | 11 +++++- .../parallel/ParallelIndexSupervisorTask.java | 35 +++++++++++-------- .../parallel/ParallelIndexTuningConfig.java | 3 ++ 3 files changed, 34 insertions(+), 15 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index d0e083a2b6ed..689ff1af5513 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -405,7 +405,16 @@ public TaskStatus run(final TaskToolbox toolbox) try { if (chatHandlerProvider.isPresent()) { log.info("Found chat handler of class[%s]", chatHandlerProvider.get().getClass().getName()); - chatHandlerProvider.get().register(getId(), this, false); + + if (chatHandlerProvider.get().get(getId()).isPresent()) { + // This is a workaround for ParallelIndexSupervisorTask to avoid double registering when it runs in the + // sequential mode. See ParallelIndexSupervisorTask.runSequential(). + // Note that all HTTP endpoints are not available in this case. This works only for + // ParallelIndexSupervisorTask because it doesn't support APIs for live ingestion reports. + log.warn("Chat handler is already registered. Skipping chat handler registration."); + } else { + chatHandlerProvider.get().register(getId(), this, false); + } } else { log.warn("No chat handler detected"); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 3fc5f66e0570..a9a8066a0f13 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -256,21 +256,23 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception chatHandlerProvider.register(getId(), this, false); try { - if (baseFirehoseFactory.isSplittable()) { - if (ingestionSchema.getTuningConfig().getMaxNumSubTasks() == 1) { + if (isParallelMode()) { + return runParallel(toolbox); + } else { + if (!baseFirehoseFactory.isSplittable()) { + log.warn( + "firehoseFactory[%s] is not splittable. Running sequentially.", + baseFirehoseFactory.getClass().getSimpleName() + ); + } else if (ingestionSchema.getTuningConfig().getMaxNumSubTasks() == 1) { log.warn( "maxNumSubTasks is 1. Running sequentially. " + "Please set maxNumSubTasks to something higher than 1 if you want to run in parallel ingestion mode." ); - return runSequential(toolbox); } else { - return runParallel(toolbox); + throw new ISE("Unknown reason for sequentail mode. Failing this task."); } - } else { - log.warn( - "firehoseFactory[%s] is not splittable. Running sequentially.", - baseFirehoseFactory.getClass().getSimpleName() - ); + return runSequential(toolbox); } } @@ -279,6 +281,15 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception } } + private boolean isParallelMode() + { + if (baseFirehoseFactory.isSplittable() && ingestionSchema.getTuningConfig().getMaxNumSubTasks() > 1) { + return true; + } else { + return false; + } + } + @VisibleForTesting void setToolbox(TaskToolbox toolbox) { @@ -487,11 +498,7 @@ public Response report( public Response getMode(@Context final HttpServletRequest req) { IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); - if (runner == null) { - return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build(); - } else { - return Response.ok(baseFirehoseFactory.isSplittable() ? "parallel" : "sequential").build(); - } + return Response.ok(isParallelMode() ? "parallel" : "sequential").build(); } @GET diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java index 27eaecb80143..c0e93704d257 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; @@ -131,6 +132,8 @@ public ParallelIndexTuningConfig( this.chatHandlerTimeout = DEFAULT_CHAT_HANDLER_TIMEOUT; this.chatHandlerNumRetries = DEFAULT_CHAT_HANDLER_NUM_RETRIES; + + Preconditions.checkArgument(this.maxNumSubTasks > 0, "maxNumSubTasks must be positive"); } @JsonProperty