diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 62a9f26ba49b..ba2502f197af 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -32,6 +32,7 @@ import org.apache.curator.shaded.com.google.common.base.Verify; import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.client.indexing.ClientCompactionTaskQuery; +import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionSchema.MultiValueHandling; import org.apache.druid.data.input.impl.DimensionsSpec; @@ -361,10 +362,14 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception // a new Appenderator on its own instead. As a result, they should use different sequence names to allocate // new segmentIds properly. See IndexerSQLMetadataStorageCoordinator.allocatePendingSegments() for details. // In this case, we use different fake IDs for each created index task. - final String subtaskId = tuningConfig == null || tuningConfig.getMaxNumConcurrentSubTasks() == 1 - ? createIndexTaskSpecId(i) - : getId(); - return newTask(subtaskId, ingestionSpecs.get(i)); + ParallelIndexIngestionSpec ingestionSpec = ingestionSpecs.get(i); + InputSource inputSource = ingestionSpec.getIOConfig().getNonNullInputSource( + ingestionSpec.getDataSchema().getParser() + ); + final String subtaskId = ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig) + ? getId() + : createIndexTaskSpecId(i); + return newTask(subtaskId, ingestionSpec); }) .collect(Collectors.toList()); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index dd0e75980c9c..4a218a0b26e0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -466,18 +466,25 @@ private void initializeSubTaskCleaner() registerResourceCloserOnAbnormalExit(currentSubTaskHolder); } - private boolean isParallelMode() + public static boolean isParallelMode(InputSource inputSource, @Nullable ParallelIndexTuningConfig tuningConfig) { + if (null == tuningConfig) { + return false; + } + boolean useRangePartitions = useRangePartitions(tuningConfig); // Range partitioning is not implemented for runSequential() (but hash partitioning is) - int minRequiredNumConcurrentSubTasks = useRangePartitions() ? 1 : 2; + int minRequiredNumConcurrentSubTasks = useRangePartitions ? 1 : 2; + return inputSource.isSplittable() && tuningConfig.getMaxNumConcurrentSubTasks() >= minRequiredNumConcurrentSubTasks; + } - return baseInputSource.isSplittable() - && ingestionSchema.getTuningConfig().getMaxNumConcurrentSubTasks() >= minRequiredNumConcurrentSubTasks; + private static boolean useRangePartitions(ParallelIndexTuningConfig tuningConfig) + { + return tuningConfig.getGivenOrDefaultPartitionsSpec() instanceof SingleDimensionPartitionsSpec; } - private boolean useRangePartitions() + private boolean isParallelMode() { - return ingestionSchema.getTuningConfig().getGivenOrDefaultPartitionsSpec() instanceof SingleDimensionPartitionsSpec; + return isParallelMode(baseInputSource, ingestionSchema.getTuningConfig()); } /** @@ -512,7 +519,7 @@ private TaskStatus runSinglePhaseParallel(TaskToolbox toolbox) throws Exception */ private TaskStatus runMultiPhaseParallel(TaskToolbox toolbox) throws Exception { - return useRangePartitions() + return useRangePartitions(ingestionSchema.getTuningConfig()) ? runRangePartitionMultiPhaseParallel(toolbox) : runHashPartitionMultiPhaseParallel(toolbox); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java index 616dd24978ec..7b7005bcbd60 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java @@ -63,6 +63,7 @@ import org.apache.druid.timeline.partition.SingleDimensionShardSpec; import org.joda.time.Interval; import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -152,7 +153,7 @@ public void testRunParallelWithDynamicPartitioningMatchCompactionState() lockGranularity == LockGranularity.TIME_CHUNK ? NumberedShardSpec.class : NumberedOverwriteShardSpec.class, segment.getShardSpec().getClass() ); - // Expecte compaction state to exist as store compaction state by default + // Expect compaction state to exist as store compaction state by default Assert.assertEquals(expectedState, segment.getLastCompactionState()); } } @@ -161,9 +162,7 @@ public void testRunParallelWithDynamicPartitioningMatchCompactionState() public void testRunParallelWithHashPartitioningMatchCompactionState() { // Hash partitioning is not supported with segment lock yet - if (lockGranularity == LockGranularity.SEGMENT) { - return; - } + Assume.assumeFalse(lockGranularity == LockGranularity.SEGMENT); runIndexTask(null, true); final Builder builder = new Builder( @@ -182,7 +181,7 @@ public void testRunParallelWithHashPartitioningMatchCompactionState() compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()) ); for (DataSegment segment : compactedSegments) { - // Expecte compaction state to exist as store compaction state by default + // Expect compaction state to exist as store compaction state by default Assert.assertSame(HashBasedNumberedShardSpec.class, segment.getShardSpec().getClass()); Assert.assertEquals(expectedState, segment.getLastCompactionState()); } @@ -192,9 +191,7 @@ public void testRunParallelWithHashPartitioningMatchCompactionState() public void testRunParallelWithRangePartitioning() { // Range partitioning is not supported with segment lock yet - if (lockGranularity == LockGranularity.SEGMENT) { - return; - } + Assume.assumeFalse(lockGranularity == LockGranularity.SEGMENT); runIndexTask(null, true); final Builder builder = new Builder( @@ -213,7 +210,36 @@ public void testRunParallelWithRangePartitioning() compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()) ); for (DataSegment segment : compactedSegments) { - // Expecte compaction state to exist as store compaction state by default + // Expect compaction state to exist as store compaction state by default + Assert.assertSame(SingleDimensionShardSpec.class, segment.getShardSpec().getClass()); + Assert.assertEquals(expectedState, segment.getLastCompactionState()); + } + } + + @Test + public void testRunParallelWithRangePartitioningWithSingleTask() + { + // Range partitioning is not supported with segment lock yet + Assume.assumeFalse(lockGranularity == LockGranularity.SEGMENT); + runIndexTask(null, true); + + final Builder builder = new Builder( + DATA_SOURCE, + getSegmentLoaderFactory(), + RETRY_POLICY_FACTORY + ); + final CompactionTask compactionTask = builder + .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null)) + .tuningConfig(newTuningConfig(new SingleDimensionPartitionsSpec(7, null, "dim", false), 1, true)) + .build(); + + final Set compactedSegments = runTask(compactionTask); + final CompactionState expectedState = new CompactionState( + new SingleDimensionPartitionsSpec(7, null, "dim", false), + compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()) + ); + for (DataSegment segment : compactedSegments) { + // Expect compaction state to exist as store compaction state by default Assert.assertSame(SingleDimensionShardSpec.class, segment.getShardSpec().getClass()); Assert.assertEquals(expectedState, segment.getLastCompactionState()); } @@ -242,7 +268,7 @@ public void testRunCompactionStateNotStoreIfContextSetToFalse() lockGranularity == LockGranularity.TIME_CHUNK ? NumberedShardSpec.class : NumberedOverwriteShardSpec.class, segment.getShardSpec().getClass() ); - // Expecte compaction state to exist as store compaction state by default + // Expect compaction state to exist as store compaction state by default Assert.assertEquals(null, segment.getLastCompactionState()); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java index 60c286a4b29c..ef1db8a31468 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java @@ -21,11 +21,14 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Ordering; +import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.InlineInputSource; import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; +import org.apache.druid.indexer.partitions.PartitionsSpec; +import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.segment.IndexSpec; @@ -36,6 +39,7 @@ import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.timeline.partition.BuildingHashBasedNumberedShardSpec; import org.apache.druid.timeline.partition.HashPartitionFunction; +import org.easymock.EasyMock; import org.hamcrest.Matchers; import org.joda.time.Duration; import org.joda.time.Interval; @@ -55,6 +59,9 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.mock; + @RunWith(Enclosed.class) public class ParallelIndexSupervisorTaskTest { @@ -241,4 +248,64 @@ public void testFailToConstructWhenBothAppendToExistingAndForceGuaranteedRollupA ); } } + + public static class staticUtilsTest + { + @Test + public void testIsParallelModeFalse_nullTuningConfig() + { + InputSource inputSource = mock(InputSource.class); + Assert.assertFalse(ParallelIndexSupervisorTask.isParallelMode(inputSource, null)); + } + + @Test + public void testIsParallelModeFalse_rangePartition() + { + InputSource inputSource = mock(InputSource.class); + expect(inputSource.isSplittable()).andReturn(true).anyTimes(); + + ParallelIndexTuningConfig tuningConfig = mock(ParallelIndexTuningConfig.class); + expect(tuningConfig.getGivenOrDefaultPartitionsSpec()).andReturn(mock(SingleDimensionPartitionsSpec.class)) + .anyTimes(); + expect(tuningConfig.getMaxNumConcurrentSubTasks()).andReturn(0).andReturn(1).andReturn(2); + EasyMock.replay(inputSource, tuningConfig); + + Assert.assertFalse(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig)); + Assert.assertTrue(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig)); + Assert.assertTrue(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig)); + } + + @Test + public void testIsParallelModeFalse_notRangePartition() + { + InputSource inputSource = mock(InputSource.class); + expect(inputSource.isSplittable()).andReturn(true).anyTimes(); + + ParallelIndexTuningConfig tuningConfig = mock(ParallelIndexTuningConfig.class); + expect(tuningConfig.getGivenOrDefaultPartitionsSpec()).andReturn(mock(PartitionsSpec.class)) + .anyTimes(); + expect(tuningConfig.getMaxNumConcurrentSubTasks()).andReturn(1).andReturn(2).andReturn(3); + EasyMock.replay(inputSource, tuningConfig); + + Assert.assertFalse(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig)); + Assert.assertTrue(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig)); + Assert.assertTrue(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig)); + } + + @Test + public void testIsParallelModeFalse_inputSourceNotSplittable() + { + InputSource inputSource = mock(InputSource.class); + expect(inputSource.isSplittable()).andReturn(false).anyTimes(); + + ParallelIndexTuningConfig tuningConfig = mock(ParallelIndexTuningConfig.class); + expect(tuningConfig.getGivenOrDefaultPartitionsSpec()).andReturn(mock(SingleDimensionPartitionsSpec.class)) + .anyTimes(); + expect(tuningConfig.getMaxNumConcurrentSubTasks()).andReturn(3); + EasyMock.replay(inputSource, tuningConfig); + + Assert.assertFalse(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig)); + } + } + }