From 61c22c929d9b0e20ae0b4000448d54af0f1d14a8 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 21 Nov 2019 14:42:24 -0800 Subject: [PATCH 1/4] Fix the potential race SplittableInputSource.getNumSplits() and SplittableInputSource.createSplits() in TaskMonitor --- .../FirehoseFactoryToInputSourceAdaptor.java | 2 +- .../druid/data/input/SplitHintSpec.java | 2 +- .../data/input/impl/HttpInputSource.java | 2 +- .../data/input/impl/LocalInputSource.java | 2 +- .../input/impl/SplittableInputSource.java | 16 +++-- .../google/GoogleCloudStorageInputSource.java | 2 +- .../inputsource/hdfs/HdfsInputSource.java | 2 +- .../inputsource/hdfs/HdfsInputSourceTest.java | 4 +- ...putSourceSplitParallelIndexTaskRunner.java | 4 +- .../parallel/ParallelIndexPhaseRunner.java | 38 +++++++++-- ...shSegmentMergeParallelIndexTaskRunner.java | 2 +- .../SinglePhaseParallelIndexTaskRunner.java | 4 +- .../task/batch/parallel/TaskMonitor.java | 65 +++++++++++-------- .../ParallelIndexSupervisorTaskKillTest.java | 4 +- ...rallelIndexSupervisorTaskResourceTest.java | 2 +- 15 files changed, 96 insertions(+), 55 deletions(-) diff --git a/core/src/main/java/org/apache/druid/data/input/FirehoseFactoryToInputSourceAdaptor.java b/core/src/main/java/org/apache/druid/data/input/FirehoseFactoryToInputSourceAdaptor.java index 37ef3ca9eee9..35322c2a0fdf 100644 --- a/core/src/main/java/org/apache/druid/data/input/FirehoseFactoryToInputSourceAdaptor.java +++ b/core/src/main/java/org/apache/druid/data/input/FirehoseFactoryToInputSourceAdaptor.java @@ -68,7 +68,7 @@ public Stream createSplits(InputFormat inputFormat, @Nullable SplitH } @Override - public int getNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) throws IOException + public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) throws IOException { if (firehoseFactory.isSplittable()) { return firehoseFactory.getNumSplits(splitHintSpec); diff --git a/core/src/main/java/org/apache/druid/data/input/SplitHintSpec.java b/core/src/main/java/org/apache/druid/data/input/SplitHintSpec.java index 2d63f0ac08b8..be4a07274e69 100644 --- a/core/src/main/java/org/apache/druid/data/input/SplitHintSpec.java +++ b/core/src/main/java/org/apache/druid/data/input/SplitHintSpec.java @@ -33,7 +33,7 @@ * @see FiniteFirehoseFactory#getSplits(SplitHintSpec) * @see FiniteFirehoseFactory#getNumSplits(SplitHintSpec) * @see SplittableInputSource#createSplits - * @see SplittableInputSource#getNumSplits + * @see SplittableInputSource#estimateNumSplits */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes(value = { diff --git a/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java b/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java index 7a822a8e9618..99780caf0408 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java @@ -86,7 +86,7 @@ public Stream> createSplits(InputFormat inputFormat, @Nullable S } @Override - public int getNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) + public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) { return uris.size(); } diff --git a/core/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java b/core/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java index 54e56136fcfa..c6447a5842d6 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java @@ -77,7 +77,7 @@ public Stream> createSplits(InputFormat inputFormat, @Nullable } @Override - public int getNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) + public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) { return Iterators.size(getFileIterator()); } diff --git a/core/src/main/java/org/apache/druid/data/input/impl/SplittableInputSource.java b/core/src/main/java/org/apache/druid/data/input/impl/SplittableInputSource.java index ed7de80d5469..00d0660fa431 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/SplittableInputSource.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/SplittableInputSource.java @@ -44,23 +44,27 @@ default boolean isSplittable() /** * Creates a {@link Stream} of {@link InputSplit}s. The returned stream is supposed to be evaluated lazily to avoid * consuming too much memory. - * Note that this interface also has {@link #getNumSplits} which is related to this method. The implementations + * Note that this interface also has {@link #estimateNumSplits} which is related to this method. The implementations * should be careful to NOT cache the created splits in memory. * * Implementations can consider {@link InputFormat#isSplittable()} and {@link SplitHintSpec} to create splits - * in the same way with {@link #getNumSplits}. + * in the same way with {@link #estimateNumSplits}. */ Stream> createSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) throws IOException; /** - * Returns the total number of splits to be created via {@link #createSplits}. - * This method can be expensive since it needs to iterate all directories or whatever substructure - * to find all input objects. + * Returns an estimated total number of splits to be created via {@link #createSplits}. The estimated number of splits + * doesn't have to be accurate and can be different to the actual number of InputSplits returned from + * {@link #createSplits}. This will be used to estimate the progress of a phase in parallel indexing. + * See TaskMonitor for more details of the progress estimation. + * + * This method can be expensive if an implementation iterates all directories or whatever substructure + * to find all input entities. * * Implementations can consider {@link InputFormat#isSplittable()} and {@link SplitHintSpec} to find splits * in the same way with {@link #createSplits}. */ - int getNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) throws IOException; + int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) throws IOException; /** * Helper method for ParallelIndexSupervisorTask. diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java index 1bc99ad8b4ba..dc68e3e54196 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java @@ -69,7 +69,7 @@ public Stream> createSplits(InputFormat inputFormat, @Nullable S } @Override - public int getNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) + public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) { return uris.size(); } diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java index 007e7e0d3af2..1409aa61aeb3 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java @@ -174,7 +174,7 @@ public Stream> createSplits(InputFormat inputFormat, @Nullable } @Override - public int getNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) throws IOException + public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) throws IOException { cachePathsIfNeeded(); return cachedPaths.size(); diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java index b0cfa20450f9..dae315c0b7cb 100644 --- a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java +++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java @@ -255,7 +255,7 @@ public void hasCorrectSplits() throws IOException @Test public void hasCorrectNumberOfSplits() throws IOException { - int numSplits = target.getNumSplits(null, null); + int numSplits = target.estimateNumSplits(null, null); Assert.assertEquals(NUM_FILE, numSplits); } } @@ -295,7 +295,7 @@ public void hasCorrectSplits() throws IOException @Test public void hasCorrectNumberOfSplits() throws IOException { - int numSplits = target.getNumSplits(null, null); + int numSplits = target.estimateNumSplits(null, null); Assert.assertEquals(0, numSplits); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/InputSourceSplitParallelIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/InputSourceSplitParallelIndexTaskRunner.java index c526ba7b8647..166692cf5837 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/InputSourceSplitParallelIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/InputSourceSplitParallelIndexTaskRunner.java @@ -74,9 +74,9 @@ Iterator> subTaskSpecIterator() throws IOException } @Override - final int getTotalNumSubTasks() throws IOException + final int estimateTotalNumSubTasks() throws IOException { - return baseInputSource.getNumSplits( + return baseInputSource.estimateNumSplits( ingestionSchema.getIOConfig().getInputFormat(), getTuningConfig().getSplitHintSpec() ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java index ac7305e92d23..4cd37ab4304e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java @@ -48,6 +48,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; @@ -112,23 +113,23 @@ public abstract class ParallelIndexPhaseRunner> subTaskSpecIterator = subTaskSpecIterator(); final long taskStatusCheckingPeriod = tuningConfig.getTaskStatusCheckPeriodMs(); taskMonitor = new TaskMonitor<>( Preconditions.checkNotNull(indexingServiceClient, "indexingServiceClient"), tuningConfig.getMaxRetry(), - getTotalNumSubTasks() + estimateTotalNumSubTasks() ); TaskState state = TaskState.RUNNING; @@ -165,7 +166,7 @@ public TaskState run() throws Exception // We have no more subTasks to run if (taskMonitor.getNumRunningTasks() == 0 && taskCompleteEvents.isEmpty()) { subTaskScheduleAndMonitorStopped = true; - if (taskMonitor.isSucceeded()) { + if (subTaskSpecIterator.count == taskMonitor.getNumSucceededTasks()) { // Succeeded state = TaskState.SUCCESS; } else { @@ -226,6 +227,33 @@ public TaskState run() throws Exception return state; } + private class CountingSubTaskSpecIterator implements Iterator> + { + private final Iterator> delegate; + private int count; + + private CountingSubTaskSpecIterator(Iterator> delegate) + { + this.delegate = delegate; + } + + @Override + public boolean hasNext() + { + return delegate.hasNext(); + } + + @Override + public SubTaskSpec next() + { + if (!delegate.hasNext()) { + throw new NoSuchElementException(); + } + count++; + return delegate.next(); + } + } + private boolean isRunning() { return !subTaskScheduleAndMonitorStopped && !Thread.currentThread().isInterrupted(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentMergeParallelIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentMergeParallelIndexTaskRunner.java index 16b54ab71ffb..36cf8d1979bc 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentMergeParallelIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentMergeParallelIndexTaskRunner.java @@ -70,7 +70,7 @@ Iterator> subTaskSpecIterator() } @Override - int getTotalNumSubTasks() + int estimateTotalNumSubTasks() { return mergeIOConfigs.size(); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java index da8aed91068e..c98ec2c99648 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java @@ -90,9 +90,9 @@ Iterator> subTaskSpecIterator() throws IOExcepti } @Override - int getTotalNumSubTasks() throws IOException + int estimateTotalNumSubTasks() throws IOException { - return baseInputSource.getNumSplits( + return baseInputSource.estimateNumSplits( ingestionSchema.getIOConfig().getInputFormat(), getTuningConfig().getSplitHintSpec() ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java index 8e21b68ee60d..8e214f9dfb81 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java @@ -24,6 +24,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; +import net.jcip.annotations.GuardedBy; import org.apache.druid.client.indexing.IndexingServiceClient; import org.apache.druid.client.indexing.TaskStatusResponse; import org.apache.druid.indexer.TaskState; @@ -80,26 +81,32 @@ public class TaskMonitor // overlord client private final IndexingServiceClient indexingServiceClient; private final int maxRetry; - private final int expectedNumSucceededTasks; + private final int estimatedNumSucceededTasks; + @GuardedBy("taskCountLock") private int numRunningTasks; + @GuardedBy("taskCountLock") private int numSucceededTasks; + @GuardedBy("taskCountLock") private int numFailedTasks; - // This metric is used only for unit tests because the current taskStatus system doesn't track the killed task status. - // Currently, this metric only represents # of killed tasks by ParallelIndexTaskRunner. - // See killAllRunningTasks(), SinglePhaseParallelIndexTaskRunner.run(), and - // SinglePhaseParallelIndexTaskRunner.stopGracefully() + /** + * This metric is used only for unit tests because the current task status system doesn't track the killed task + * status. Currently, this metric only represents number of killed tasks by {@link ParallelIndexTaskRunner}. + * See {@link #stop()}, {@link ParallelIndexPhaseRunner#run()}, and + * {@link ParallelIndexPhaseRunner#stopGracefully()}. + */ private int numKilledTasks; + @GuardedBy("startStopLock") private boolean running = false; - TaskMonitor(IndexingServiceClient indexingServiceClient, int maxRetry, int expectedNumSucceededTasks) + TaskMonitor(IndexingServiceClient indexingServiceClient, int maxRetry, int estimatedNumSucceededTasks) { this.indexingServiceClient = Preconditions.checkNotNull(indexingServiceClient, "indexingServiceClient"); this.maxRetry = maxRetry; - this.expectedNumSucceededTasks = expectedNumSucceededTasks; + this.estimatedNumSucceededTasks = estimatedNumSucceededTasks; - log.info("TaskMonitor is initialized with expectedNumSucceededTasks[%d]", expectedNumSucceededTasks); + log.info("TaskMonitor is initialized with estimatedNumSucceededTasks[%d]", estimatedNumSucceededTasks); } public void start(long taskStatusCheckingPeriod) @@ -187,23 +194,25 @@ public void stop() running = false; taskStatusChecker.shutdownNow(); - if (numRunningTasks > 0) { - final Iterator iterator = runningTasks.values().iterator(); - while (iterator.hasNext()) { - final MonitorEntry entry = iterator.next(); - iterator.remove(); - final String taskId = entry.runningTask.getId(); - log.info("Request to kill subtask[%s]", taskId); - indexingServiceClient.killTask(taskId); - numRunningTasks--; - numKilledTasks++; - } - + synchronized (taskCountLock) { if (numRunningTasks > 0) { - log.warn( - "Inconsistent state: numRunningTasks[%d] is still not zero after trying to kill all running tasks.", - numRunningTasks - ); + final Iterator iterator = runningTasks.values().iterator(); + while (iterator.hasNext()) { + final MonitorEntry entry = iterator.next(); + iterator.remove(); + final String taskId = entry.runningTask.getId(); + log.info("Request to kill subtask[%s]", taskId); + indexingServiceClient.killTask(taskId); + numRunningTasks--; + numKilledTasks++; + } + + if (numRunningTasks > 0) { + log.warn( + "Inconsistent state: numRunningTasks[%d] is still not zero after trying to kill all running tasks.", + numRunningTasks + ); + } } } @@ -307,7 +316,7 @@ private void incrementNumSucceededTasks() synchronized (taskCountLock) { numRunningTasks--; numSucceededTasks++; - log.info("[%d/%d] tasks succeeded", numSucceededTasks, expectedNumSucceededTasks); + log.info("[%d/%d] tasks succeeded", numSucceededTasks, estimatedNumSucceededTasks); } } @@ -319,10 +328,10 @@ private void incrementNumFailedTasks() } } - boolean isSucceeded() + int getNumSucceededTasks() { synchronized (taskCountLock) { - return numSucceededTasks == expectedNumSucceededTasks; + return numSucceededTasks; } } @@ -348,7 +357,7 @@ SinglePhaseParallelIndexingProgress getProgress() numFailedTasks, numSucceededTasks + numFailedTasks, numRunningTasks + numSucceededTasks + numFailedTasks, - expectedNumSucceededTasks + estimatedNumSucceededTasks ); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java index 9c87b5d8970b..7564f8cecc4e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java @@ -168,7 +168,7 @@ private ParallelIndexSupervisorTask newTask( ) { final TestInputSource inputSource = (TestInputSource) ioConfig.getInputSource(); - final int numTotalSubTasks = inputSource.getNumSplits(new NoopInputFormat(), null); + final int numTotalSubTasks = inputSource.estimateNumSplits(new NoopInputFormat(), null); // set up ingestion spec final ParallelIndexIngestionSpec ingestionSpec = new ParallelIndexIngestionSpec( new DataSchema( @@ -264,7 +264,7 @@ public Stream> createSplits(InputFormat inputFormat, @Null } @Override - public int getNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) + public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) { return splits.size(); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java index 413e99a39a50..d3cd5645df5a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java @@ -486,7 +486,7 @@ public Stream> createSplits(InputFormat inputFormat, @Nullab } @Override - public int getNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) + public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) { return ids.size(); } From 12776052d435af900952c69a273e1e00b95bffcd Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 21 Nov 2019 17:57:00 -0800 Subject: [PATCH 2/4] Fix docs and javadoc --- .../input/impl/SplittableInputSource.java | 2 +- docs/ingestion/native-batch.md | 2 +- .../parallel/ParallelIndexPhaseRunner.java | 10 +++---- .../parallel/ParallelIndexTaskRunner.java | 4 +-- ...ava => ParallelIndexingPhaseProgress.java} | 24 ++++++++-------- .../parallel/ParallelIndexingProgress.java | 28 ------------------- .../task/batch/parallel/TaskMonitor.java | 4 +-- ...rallelIndexSupervisorTaskResourceTest.java | 22 +++++++-------- 8 files changed, 35 insertions(+), 61 deletions(-) rename indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/{SinglePhaseParallelIndexingProgress.java => ParallelIndexingPhaseProgress.java} (72%) delete mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexingProgress.java diff --git a/core/src/main/java/org/apache/druid/data/input/impl/SplittableInputSource.java b/core/src/main/java/org/apache/druid/data/input/impl/SplittableInputSource.java index 00d0660fa431..c2172b890429 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/SplittableInputSource.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/SplittableInputSource.java @@ -54,7 +54,7 @@ default boolean isSplittable() /** * Returns an estimated total number of splits to be created via {@link #createSplits}. The estimated number of splits - * doesn't have to be accurate and can be different to the actual number of InputSplits returned from + * doesn't have to be accurate and can be different from the actual number of InputSplits returned from * {@link #createSplits}. This will be used to estimate the progress of a phase in parallel indexing. * See TaskMonitor for more details of the progress estimation. * diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index 428df3eff2b1..b463820aebf4 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -274,7 +274,7 @@ Returns the name of the current phase if the task running in the parallel mode. * `http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/progress` -Returns the current progress if the supervisor task is running in the parallel mode. +Returns the estimated progress of the current phase if the supervisor task is running in the parallel mode. An example of the result is diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java index 4cd37ab4304e..931e97a2a977 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java @@ -171,10 +171,10 @@ public TaskState run() throws Exception state = TaskState.SUCCESS; } else { // Failed - final SinglePhaseParallelIndexingProgress monitorStatus = taskMonitor.getProgress(); + final ParallelIndexingPhaseProgress monitorStatus = taskMonitor.getProgress(); throw new ISE( - "Expected for [%d] tasks to succeed, but we got [%d] succeeded tasks and [%d] failed tasks", - monitorStatus.getExpectedSucceeded(), + "Expected [%d] tasks to succeed, but we got [%d] succeeded tasks and [%d] failed tasks", + subTaskSpecIterator.count, monitorStatus.getSucceeded(), monitorStatus.getFailed() ); @@ -349,9 +349,9 @@ public Map getReports() } @Override - public ParallelIndexingProgress getProgress() + public ParallelIndexingPhaseProgress getProgress() { - return taskMonitor == null ? SinglePhaseParallelIndexingProgress.notRunning() : taskMonitor.getProgress(); + return taskMonitor == null ? ParallelIndexingPhaseProgress.notRunning() : taskMonitor.getProgress(); } @Override diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTaskRunner.java index c4ebef5aea86..49b2b48ecd61 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTaskRunner.java @@ -71,9 +71,9 @@ public interface ParallelIndexTaskRunner getReports(); /** - * Returns the current {@link ParallelIndexingProgress}. + * Returns the current {@link ParallelIndexingPhaseProgress}. */ - ParallelIndexingProgress getProgress(); + ParallelIndexingPhaseProgress getProgress(); /** * Returns the IDs of current running tasks. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingProgress.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexingPhaseProgress.java similarity index 72% rename from indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingProgress.java rename to indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexingPhaseProgress.java index 28fd7c851387..82a46b1045a8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingProgress.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexingPhaseProgress.java @@ -23,9 +23,10 @@ import com.fasterxml.jackson.annotation.JsonProperty; /** - * Represents the current progress of {@link SinglePhaseParallelIndexTaskRunner}. + * Represents the current progress of a single phase in parallel indexing. + * All subclasses of {@link ParallelIndexPhaseRunner} can use this class to indicate their progress. */ -class SinglePhaseParallelIndexingProgress implements ParallelIndexingProgress +class ParallelIndexingPhaseProgress { /** * Number of running tasks. @@ -53,23 +54,24 @@ class SinglePhaseParallelIndexingProgress implements ParallelIndexingProgress private final int total; /** - * Number of succeeded tasks for {@link SinglePhaseParallelIndexTaskRunner} to succeed. + * Estimated number of succeeded tasks for {@link SinglePhaseParallelIndexTaskRunner} to succeed. + * This can be different from the actual number of tasks to succeed. */ - private final int expectedSucceeded; + private final int estimatedExpectedSucceeded; - static SinglePhaseParallelIndexingProgress notRunning() + static ParallelIndexingPhaseProgress notRunning() { - return new SinglePhaseParallelIndexingProgress(0, 0, 0, 0, 0, -1); + return new ParallelIndexingPhaseProgress(0, 0, 0, 0, 0, -1); } @JsonCreator - SinglePhaseParallelIndexingProgress( + ParallelIndexingPhaseProgress( @JsonProperty("running") int running, @JsonProperty("succeeded") int succeeded, @JsonProperty("failed") int failed, @JsonProperty("complete") int complete, @JsonProperty("total") int total, - @JsonProperty("expectedSucceeded") int expectedSucceeded + @JsonProperty("estimatedExpectedSucceeded") int estimatedExpectedSucceeded ) { this.running = running; @@ -77,7 +79,7 @@ static SinglePhaseParallelIndexingProgress notRunning() this.failed = failed; this.complete = complete; this.total = total; - this.expectedSucceeded = expectedSucceeded; + this.estimatedExpectedSucceeded = estimatedExpectedSucceeded; } @JsonProperty @@ -111,8 +113,8 @@ public int getTotal() } @JsonProperty - public int getExpectedSucceeded() + public int getEstimatedExpectedSucceeded() { - return expectedSucceeded; + return estimatedExpectedSucceeded; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexingProgress.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexingProgress.java deleted file mode 100644 index 336bbb04bcae..000000000000 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexingProgress.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexing.common.task.batch.parallel; - -/** - * Represents the current progress of {@link ParallelIndexSupervisorTask}. Implementations can be different depending on - * the distributed indexing algorithm. - */ -interface ParallelIndexingProgress -{ -} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java index 8e214f9dfb81..c5b8bd138e79 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java @@ -348,10 +348,10 @@ int getNumKilledTasks() return numKilledTasks; } - SinglePhaseParallelIndexingProgress getProgress() + ParallelIndexingPhaseProgress getProgress() { synchronized (taskCountLock) { - return new SinglePhaseParallelIndexingProgress( + return new ParallelIndexingPhaseProgress( numRunningTasks, numSucceededTasks, numFailedTasks, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java index d3cd5645df5a..0281afe5a0cc 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java @@ -165,12 +165,12 @@ public void testAPIs() throws Exception Assert.assertEquals(200, response.getStatus()); Assert.assertEquals( NUM_SUB_TASKS, - ((SinglePhaseParallelIndexingProgress) response.getEntity()).getExpectedSucceeded() + ((ParallelIndexingPhaseProgress) response.getEntity()).getEstimatedExpectedSucceeded() ); // Since taskMonitor works based on polling, it's hard to use a fancier way to check its state. // We use polling to check the state of taskMonitor in this test. - while (getNumSubTasks(SinglePhaseParallelIndexingProgress::getRunning) < NUM_SUB_TASKS) { + while (getNumSubTasks(ParallelIndexingPhaseProgress::getRunning) < NUM_SUB_TASKS) { Thread.sleep(100); } @@ -188,7 +188,7 @@ public void testAPIs() throws Exception runningTasks.get(0).setState(TaskState.SUCCESS); } - while (getNumSubTasks(SinglePhaseParallelIndexingProgress::getSucceeded) < succeededTasks) { + while (getNumSubTasks(ParallelIndexingPhaseProgress::getSucceeded) < succeededTasks) { Thread.sleep(100); } @@ -205,7 +205,7 @@ public void testAPIs() throws Exception } // Wait for new tasks to be started - while (getNumSubTasks(SinglePhaseParallelIndexingProgress::getFailed) < failedTasks + while (getNumSubTasks(ParallelIndexingPhaseProgress::getFailed) < failedTasks || runningTasks.size() < NUM_SUB_TASKS - succeededTasks) { Thread.sleep(100); } @@ -222,7 +222,7 @@ public void testAPIs() throws Exception runningTasks.get(0).setState(TaskState.SUCCESS); } - while (getNumSubTasks(SinglePhaseParallelIndexingProgress::getSucceeded) < succeededTasks) { + while (getNumSubTasks(ParallelIndexingPhaseProgress::getSucceeded) < succeededTasks) { Thread.sleep(100); } @@ -241,10 +241,10 @@ public void testAPIs() throws Exception // Test one more failure runningTasks.get(0).setState(TaskState.FAILED); failedTasks++; - while (getNumSubTasks(SinglePhaseParallelIndexingProgress::getFailed) < failedTasks) { + while (getNumSubTasks(ParallelIndexingPhaseProgress::getFailed) < failedTasks) { Thread.sleep(100); } - while (getNumSubTasks(SinglePhaseParallelIndexingProgress::getRunning) < 1) { + while (getNumSubTasks(ParallelIndexingPhaseProgress::getRunning) < 1) { Thread.sleep(100); } @@ -257,7 +257,7 @@ public void testAPIs() throws Exception runningTasks.get(0).setState(TaskState.SUCCESS); succeededTasks++; - while (getNumSubTasks(SinglePhaseParallelIndexingProgress::getSucceeded) < succeededTasks) { + while (getNumSubTasks(ParallelIndexingPhaseProgress::getSucceeded) < succeededTasks) { Thread.sleep(100); } @@ -265,11 +265,11 @@ public void testAPIs() throws Exception } @SuppressWarnings({"ConstantConditions"}) - private int getNumSubTasks(Function func) + private int getNumSubTasks(Function func) { final Response response = task.getProgress(newRequest()); Assert.assertEquals(200, response.getStatus()); - return func.apply((SinglePhaseParallelIndexingProgress) response.getEntity()); + return func.apply((ParallelIndexingPhaseProgress) response.getEntity()); } private Map buildStateMap() @@ -297,7 +297,7 @@ private void checkState( { Response response = task.getProgress(newRequest()); Assert.assertEquals(200, response.getStatus()); - final SinglePhaseParallelIndexingProgress monitorStatus = (SinglePhaseParallelIndexingProgress) response.getEntity(); + final ParallelIndexingPhaseProgress monitorStatus = (ParallelIndexingPhaseProgress) response.getEntity(); // numRunningTasks Assert.assertEquals(runningTasks.size(), monitorStatus.getRunning()); From 434800a5b556312f4ce33061ce875fc5b70b9494 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 22 Nov 2019 11:45:50 -0800 Subject: [PATCH 3/4] Add unit tests for large or small estimated num splits --- docs/ingestion/native-batch.md | 2 +- .../task/batch/parallel/TaskMonitor.java | 2 +- ...stractParallelIndexSupervisorTaskTest.java | 28 +++ .../ParallelIndexPhaseRunnerTest.java | 237 ++++++++++++++++++ .../SinglePhaseParallelIndexingTest.java | 31 +-- 5 files changed, 269 insertions(+), 31 deletions(-) create mode 100644 indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunnerTest.java diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index b463820aebf4..03d298f5e7b8 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -285,7 +285,7 @@ An example of the result is "failed":0, "complete":0, "total":10, - "expectedSucceeded":10 + "estimatedExpectedSucceeded":10 } ``` diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java index c5b8bd138e79..f4a7889c51f3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java @@ -24,7 +24,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; -import net.jcip.annotations.GuardedBy; +import com.google.errorprone.annotations.concurrent.GuardedBy; import org.apache.druid.client.indexing.IndexingServiceClient; import org.apache.druid.client.indexing.TaskStatusResponse; import org.apache.druid.indexer.TaskState; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java index 20b6548275ca..480522bc33bb 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java @@ -94,6 +94,34 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase 0 ); static final InputFormat DEFAULT_INPUT_FORMAT = DEFAULT_PARSE_SPEC.toInputFormat(); + static final ParallelIndexTuningConfig DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING = new ParallelIndexTuningConfig( + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + 2, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); protected TestLocalTaskActionClient actionClient; protected LocalIndexingServiceClient indexingServiceClient; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunnerTest.java new file mode 100644 index 000000000000..acdf77904e49 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunnerTest.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task.batch.parallel; + +import org.apache.druid.client.indexing.IndexingServiceClient; +import org.apache.druid.data.input.InputSplit; +import org.apache.druid.indexer.TaskState; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.task.NoopTask; +import org.apache.druid.java.util.common.StringUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.io.Writer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; + +public class ParallelIndexPhaseRunnerTest extends AbstractParallelIndexSupervisorTaskTest +{ + private File inputDir; + + @Before + public void setup() throws IOException + { + inputDir = temporaryFolder.newFolder("data"); + // set up data + for (int i = 0; i < 5; i++) { + try (final Writer writer = + Files.newBufferedWriter(new File(inputDir, "test_" + i).toPath(), StandardCharsets.UTF_8)) { + writer.write(StringUtils.format("2017-12-%d,%d th test file\n", 24 + i, i)); + writer.write(StringUtils.format("2017-12-%d,%d th test file\n", 25 + i, i)); + } + } + + for (int i = 0; i < 5; i++) { + try (final Writer writer = + Files.newBufferedWriter(new File(inputDir, "filtered_" + i).toPath(), StandardCharsets.UTF_8)) { + writer.write(StringUtils.format("2017-12-%d,%d th test file\n", 25 + i, i)); + } + } + + indexingServiceClient = new LocalIndexingServiceClient(); + localDeepStorage = temporaryFolder.newFolder("localStorage"); + } + + @After + public void tearDown() + { + indexingServiceClient.shutdown(); + temporaryFolder.delete(); + } + + @Test + public void testLargeEstimatedNumSplits() throws Exception + { + final TestPhaseRunner runner = new TestPhaseRunner( + toolbox, + "supervisorTaskId", + "groupId", + AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING, + Collections.emptyMap(), + indexingServiceClient, + 10, + 12 + ); + Assert.assertEquals(TaskState.SUCCESS, runner.run()); + } + + @Test + public void testSmallEstimatedNumSplits() throws Exception + { + final TestPhaseRunner runner = new TestPhaseRunner( + toolbox, + "supervisorTaskId", + "groupId", + AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING, + Collections.emptyMap(), + indexingServiceClient, + 10, + 8 + ); + Assert.assertEquals(TaskState.SUCCESS, runner.run()); + } + + private static class TestPhaseRunner extends ParallelIndexPhaseRunner + { + private final int actualNumSubTasks; + private final int estimatedNumSubTasks; + + TestPhaseRunner( + TaskToolbox toolbox, + String supervisorTaskId, + String groupId, + ParallelIndexTuningConfig tuningConfig, + Map context, + IndexingServiceClient indexingServiceClient, + int actualNumSubTasks, + int estimatedNumSubTasks + ) + { + super( + toolbox, + supervisorTaskId, + groupId, + tuningConfig, + context, + indexingServiceClient + ); + this.actualNumSubTasks = actualNumSubTasks; + this.estimatedNumSubTasks = estimatedNumSubTasks; + } + + @Override + Iterator> subTaskSpecIterator() + { + return new Iterator>() + { + int subTaskCount = 0; + + @Override + public boolean hasNext() + { + return subTaskCount < actualNumSubTasks; + } + + @Override + public SubTaskSpec next() + { + return new TestSubTaskSpec( + "specId_" + subTaskCount++, + getGroupId(), + getTaskId(), + getContext(), + new InputSplit<>(new Object()), + TestPhaseRunner.this + ); + } + }; + } + + @Override + int estimateTotalNumSubTasks() + { + return estimatedNumSubTasks; + } + + @Override + public String getName() + { + return "TestPhaseRunner"; + } + } + + private static class TestSubTaskSpec extends SubTaskSpec + { + private final TestPhaseRunner phaseRunner; + + private TestSubTaskSpec( + String id, + String groupId, + String supervisorTaskId, + Map context, + InputSplit inputSplit, + TestPhaseRunner phaseRunner + ) + { + super(id, groupId, supervisorTaskId, context, inputSplit); + this.phaseRunner = phaseRunner; + } + + @Override + public ReportingNoopTask newSubTask(int numAttempts) + { + return new ReportingNoopTask(getGroupId(), phaseRunner); + } + } + + private static class EmptySubTaskReport implements SubTaskReport + { + private final String taskId; + + private EmptySubTaskReport(String taskId) + { + this.taskId = taskId; + } + + @Override + public String getTaskId() + { + return taskId; + } + } + + private static class ReportingNoopTask extends NoopTask + { + private final TestPhaseRunner phaseRunner; + + private ReportingNoopTask(String groupId, TestPhaseRunner phaseRunner) + { + super(null, groupId, null, 10, 0, null, null, null); + this.phaseRunner = phaseRunner; + } + + @Override + public TaskStatus run(TaskToolbox toolbox) throws Exception + { + final TaskStatus result = super.run(toolbox); + phaseRunner.collectReport(new EmptySubTaskReport(getId())); + return result; + } + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java index 0b12fbba6a40..0bc1b93de477 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java @@ -334,34 +334,7 @@ private ParallelIndexSupervisorTask newTask( segmentGranularity, appendToExisting, splittableInputSource, - new ParallelIndexTuningConfig( - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - 2, - null, - null, - null, - null, - null, - null, - null, - null, - null - ) + AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING ); } @@ -473,7 +446,7 @@ public static class TestSinglePhaseRunner extends TestSinglePhaseParallelIndexTa TestSinglePhaseRunner( TaskToolbox toolbox, ParallelIndexSupervisorTask supervisorTask, - @Nullable IndexingServiceClient indexingServiceClient + IndexingServiceClient indexingServiceClient ) { super( From 23fb2c76a5a301badaa6e7b0120613be01a8e126 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 22 Nov 2019 13:24:23 -0800 Subject: [PATCH 4/4] add override --- .../common/task/batch/parallel/ParallelIndexPhaseRunnerTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunnerTest.java index acdf77904e49..868e2d4116ad 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunnerTest.java @@ -68,6 +68,7 @@ public void setup() throws IOException localDeepStorage = temporaryFolder.newFolder("localStorage"); } + @Override @After public void tearDown() {