From dbda34be64b709f3c5dd55c30ac4fe78daf05907 Mon Sep 17 00:00:00 2001 From: Abhishek Agarwal <1477457+abhishekagarwal87@users.noreply.github.com> Date: Mon, 28 Sep 2020 21:58:41 +0530 Subject: [PATCH 1/3] Fix the task id creation in CompactionTask --- .../indexing/common/task/CompactionTask.java | 11 ++- .../parallel/ParallelIndexSupervisorTask.java | 15 +++-- .../task/CompactionTaskParallelRunTest.java | 31 +++++++++ .../ParallelIndexSupervisorTaskTest.java | 67 +++++++++++++++++++ 4 files changed, 117 insertions(+), 7 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 62a9f26ba49b..15cc1d8af029 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -32,6 +32,7 @@ import org.apache.curator.shaded.com.google.common.base.Verify; import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.client.indexing.ClientCompactionTaskQuery; +import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionSchema.MultiValueHandling; import org.apache.druid.data.input.impl.DimensionsSpec; @@ -361,9 +362,13 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception // a new Appenderator on its own instead. As a result, they should use different sequence names to allocate // new segmentIds properly. See IndexerSQLMetadataStorageCoordinator.allocatePendingSegments() for details. // In this case, we use different fake IDs for each created index task. - final String subtaskId = tuningConfig == null || tuningConfig.getMaxNumConcurrentSubTasks() == 1 - ? createIndexTaskSpecId(i) - : getId(); + ParallelIndexIngestionSpec ingestionSpec = ingestionSpecs.get(i); + InputSource inputSource = ingestionSpec.getIOConfig().getNonNullInputSource( + ingestionSpec.getDataSchema().getParser() + ); + final String subtaskId = ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig) + ? getId() + : createIndexTaskSpecId(i); return newTask(subtaskId, ingestionSpecs.get(i)); }) .collect(Collectors.toList()); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index dd0e75980c9c..5192b55c29bc 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -466,13 +466,20 @@ private void initializeSubTaskCleaner() registerResourceCloserOnAbnormalExit(currentSubTaskHolder); } - private boolean isParallelMode() + public static boolean isParallelMode(InputSource inputSource, @Nullable ParallelIndexTuningConfig tuningConfig) { + if (null == tuningConfig) { + return false; + } + boolean useRangePartitions = tuningConfig.getGivenOrDefaultPartitionsSpec() instanceof SingleDimensionPartitionsSpec; // Range partitioning is not implemented for runSequential() (but hash partitioning is) - int minRequiredNumConcurrentSubTasks = useRangePartitions() ? 1 : 2; + int minRequiredNumConcurrentSubTasks = useRangePartitions ? 1 : 2; + return inputSource.isSplittable() && tuningConfig.getMaxNumConcurrentSubTasks() >= minRequiredNumConcurrentSubTasks; + } - return baseInputSource.isSplittable() - && ingestionSchema.getTuningConfig().getMaxNumConcurrentSubTasks() >= minRequiredNumConcurrentSubTasks; + private boolean isParallelMode() + { + return isParallelMode(baseInputSource, ingestionSchema.getTuningConfig()); } private boolean useRangePartitions() diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java index 616dd24978ec..fc18f9a35cff 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java @@ -219,6 +219,37 @@ public void testRunParallelWithRangePartitioning() } } + @Test + public void testRunParallelWithRangePartitioningWithSingleTask() + { + // Range partitioning is not supported with segment lock yet + if (lockGranularity == LockGranularity.SEGMENT) { + return; + } + runIndexTask(null, true); + + final Builder builder = new Builder( + DATA_SOURCE, + getSegmentLoaderFactory(), + RETRY_POLICY_FACTORY + ); + final CompactionTask compactionTask = builder + .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null)) + .tuningConfig(newTuningConfig(new SingleDimensionPartitionsSpec(7, null, "dim", false), 1, true)) + .build(); + + final Set compactedSegments = runTask(compactionTask); + final CompactionState expectedState = new CompactionState( + new SingleDimensionPartitionsSpec(7, null, "dim", false), + compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()) + ); + for (DataSegment segment : compactedSegments) { + // Expecte compaction state to exist as store compaction state by default + Assert.assertSame(SingleDimensionShardSpec.class, segment.getShardSpec().getClass()); + Assert.assertEquals(expectedState, segment.getLastCompactionState()); + } + } + @Test public void testRunCompactionStateNotStoreIfContextSetToFalse() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java index 60c286a4b29c..ef1db8a31468 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java @@ -21,11 +21,14 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Ordering; +import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.InlineInputSource; import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; +import org.apache.druid.indexer.partitions.PartitionsSpec; +import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.segment.IndexSpec; @@ -36,6 +39,7 @@ import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.timeline.partition.BuildingHashBasedNumberedShardSpec; import org.apache.druid.timeline.partition.HashPartitionFunction; +import org.easymock.EasyMock; import org.hamcrest.Matchers; import org.joda.time.Duration; import org.joda.time.Interval; @@ -55,6 +59,9 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.mock; + @RunWith(Enclosed.class) public class ParallelIndexSupervisorTaskTest { @@ -241,4 +248,64 @@ public void testFailToConstructWhenBothAppendToExistingAndForceGuaranteedRollupA ); } } + + public static class staticUtilsTest + { + @Test + public void testIsParallelModeFalse_nullTuningConfig() + { + InputSource inputSource = mock(InputSource.class); + Assert.assertFalse(ParallelIndexSupervisorTask.isParallelMode(inputSource, null)); + } + + @Test + public void testIsParallelModeFalse_rangePartition() + { + InputSource inputSource = mock(InputSource.class); + expect(inputSource.isSplittable()).andReturn(true).anyTimes(); + + ParallelIndexTuningConfig tuningConfig = mock(ParallelIndexTuningConfig.class); + expect(tuningConfig.getGivenOrDefaultPartitionsSpec()).andReturn(mock(SingleDimensionPartitionsSpec.class)) + .anyTimes(); + expect(tuningConfig.getMaxNumConcurrentSubTasks()).andReturn(0).andReturn(1).andReturn(2); + EasyMock.replay(inputSource, tuningConfig); + + Assert.assertFalse(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig)); + Assert.assertTrue(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig)); + Assert.assertTrue(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig)); + } + + @Test + public void testIsParallelModeFalse_notRangePartition() + { + InputSource inputSource = mock(InputSource.class); + expect(inputSource.isSplittable()).andReturn(true).anyTimes(); + + ParallelIndexTuningConfig tuningConfig = mock(ParallelIndexTuningConfig.class); + expect(tuningConfig.getGivenOrDefaultPartitionsSpec()).andReturn(mock(PartitionsSpec.class)) + .anyTimes(); + expect(tuningConfig.getMaxNumConcurrentSubTasks()).andReturn(1).andReturn(2).andReturn(3); + EasyMock.replay(inputSource, tuningConfig); + + Assert.assertFalse(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig)); + Assert.assertTrue(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig)); + Assert.assertTrue(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig)); + } + + @Test + public void testIsParallelModeFalse_inputSourceNotSplittable() + { + InputSource inputSource = mock(InputSource.class); + expect(inputSource.isSplittable()).andReturn(false).anyTimes(); + + ParallelIndexTuningConfig tuningConfig = mock(ParallelIndexTuningConfig.class); + expect(tuningConfig.getGivenOrDefaultPartitionsSpec()).andReturn(mock(SingleDimensionPartitionsSpec.class)) + .anyTimes(); + expect(tuningConfig.getMaxNumConcurrentSubTasks()).andReturn(3); + EasyMock.replay(inputSource, tuningConfig); + + Assert.assertFalse(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig)); + } + } + } From 7322fbee6dc13d6013408a34e20ee1dd0ccb83e0 Mon Sep 17 00:00:00 2001 From: Abhishek Agarwal <1477457+abhishekagarwal87@users.noreply.github.com> Date: Wed, 30 Sep 2020 12:34:51 +0530 Subject: [PATCH 2/3] review comments --- .../druid/indexing/common/task/CompactionTask.java | 2 +- .../batch/parallel/ParallelIndexSupervisorTask.java | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 15cc1d8af029..ba2502f197af 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -369,7 +369,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception final String subtaskId = ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig) ? getId() : createIndexTaskSpecId(i); - return newTask(subtaskId, ingestionSpecs.get(i)); + return newTask(subtaskId, ingestionSpec); }) .collect(Collectors.toList()); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 5192b55c29bc..4a218a0b26e0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -471,20 +471,20 @@ public static boolean isParallelMode(InputSource inputSource, @Nullable Parallel if (null == tuningConfig) { return false; } - boolean useRangePartitions = tuningConfig.getGivenOrDefaultPartitionsSpec() instanceof SingleDimensionPartitionsSpec; + boolean useRangePartitions = useRangePartitions(tuningConfig); // Range partitioning is not implemented for runSequential() (but hash partitioning is) int minRequiredNumConcurrentSubTasks = useRangePartitions ? 1 : 2; return inputSource.isSplittable() && tuningConfig.getMaxNumConcurrentSubTasks() >= minRequiredNumConcurrentSubTasks; } - private boolean isParallelMode() + private static boolean useRangePartitions(ParallelIndexTuningConfig tuningConfig) { - return isParallelMode(baseInputSource, ingestionSchema.getTuningConfig()); + return tuningConfig.getGivenOrDefaultPartitionsSpec() instanceof SingleDimensionPartitionsSpec; } - private boolean useRangePartitions() + private boolean isParallelMode() { - return ingestionSchema.getTuningConfig().getGivenOrDefaultPartitionsSpec() instanceof SingleDimensionPartitionsSpec; + return isParallelMode(baseInputSource, ingestionSchema.getTuningConfig()); } /** @@ -519,7 +519,7 @@ private TaskStatus runSinglePhaseParallel(TaskToolbox toolbox) throws Exception */ private TaskStatus runMultiPhaseParallel(TaskToolbox toolbox) throws Exception { - return useRangePartitions() + return useRangePartitions(ingestionSchema.getTuningConfig()) ? runRangePartitionMultiPhaseParallel(toolbox) : runHashPartitionMultiPhaseParallel(toolbox); } From b6056de793d119e4ab83daf7fc6c67030e2c61eb Mon Sep 17 00:00:00 2001 From: Abhishek Agarwal <1477457+abhishekagarwal87@users.noreply.github.com> Date: Thu, 1 Oct 2020 12:58:15 +0530 Subject: [PATCH 3/3] Ignore test for range partitioning and segment lock --- .../task/CompactionTaskParallelRunTest.java | 23 ++++++++----------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java index fc18f9a35cff..7b7005bcbd60 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java @@ -63,6 +63,7 @@ import org.apache.druid.timeline.partition.SingleDimensionShardSpec; import org.joda.time.Interval; import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -152,7 +153,7 @@ public void testRunParallelWithDynamicPartitioningMatchCompactionState() lockGranularity == LockGranularity.TIME_CHUNK ? NumberedShardSpec.class : NumberedOverwriteShardSpec.class, segment.getShardSpec().getClass() ); - // Expecte compaction state to exist as store compaction state by default + // Expect compaction state to exist as store compaction state by default Assert.assertEquals(expectedState, segment.getLastCompactionState()); } } @@ -161,9 +162,7 @@ public void testRunParallelWithDynamicPartitioningMatchCompactionState() public void testRunParallelWithHashPartitioningMatchCompactionState() { // Hash partitioning is not supported with segment lock yet - if (lockGranularity == LockGranularity.SEGMENT) { - return; - } + Assume.assumeFalse(lockGranularity == LockGranularity.SEGMENT); runIndexTask(null, true); final Builder builder = new Builder( @@ -182,7 +181,7 @@ public void testRunParallelWithHashPartitioningMatchCompactionState() compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()) ); for (DataSegment segment : compactedSegments) { - // Expecte compaction state to exist as store compaction state by default + // Expect compaction state to exist as store compaction state by default Assert.assertSame(HashBasedNumberedShardSpec.class, segment.getShardSpec().getClass()); Assert.assertEquals(expectedState, segment.getLastCompactionState()); } @@ -192,9 +191,7 @@ public void testRunParallelWithHashPartitioningMatchCompactionState() public void testRunParallelWithRangePartitioning() { // Range partitioning is not supported with segment lock yet - if (lockGranularity == LockGranularity.SEGMENT) { - return; - } + Assume.assumeFalse(lockGranularity == LockGranularity.SEGMENT); runIndexTask(null, true); final Builder builder = new Builder( @@ -213,7 +210,7 @@ public void testRunParallelWithRangePartitioning() compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()) ); for (DataSegment segment : compactedSegments) { - // Expecte compaction state to exist as store compaction state by default + // Expect compaction state to exist as store compaction state by default Assert.assertSame(SingleDimensionShardSpec.class, segment.getShardSpec().getClass()); Assert.assertEquals(expectedState, segment.getLastCompactionState()); } @@ -223,9 +220,7 @@ public void testRunParallelWithRangePartitioning() public void testRunParallelWithRangePartitioningWithSingleTask() { // Range partitioning is not supported with segment lock yet - if (lockGranularity == LockGranularity.SEGMENT) { - return; - } + Assume.assumeFalse(lockGranularity == LockGranularity.SEGMENT); runIndexTask(null, true); final Builder builder = new Builder( @@ -244,7 +239,7 @@ public void testRunParallelWithRangePartitioningWithSingleTask() compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()) ); for (DataSegment segment : compactedSegments) { - // Expecte compaction state to exist as store compaction state by default + // Expect compaction state to exist as store compaction state by default Assert.assertSame(SingleDimensionShardSpec.class, segment.getShardSpec().getClass()); Assert.assertEquals(expectedState, segment.getLastCompactionState()); } @@ -273,7 +268,7 @@ public void testRunCompactionStateNotStoreIfContextSetToFalse() lockGranularity == LockGranularity.TIME_CHUNK ? NumberedShardSpec.class : NumberedOverwriteShardSpec.class, segment.getShardSpec().getClass() ); - // Expecte compaction state to exist as store compaction state by default + // Expect compaction state to exist as store compaction state by default Assert.assertEquals(null, segment.getLastCompactionState()); } }