Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.curator.shaded.com.google.common.base.Verify;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionSchema.MultiValueHandling;
import org.apache.druid.data.input.impl.DimensionsSpec;
Expand Down Expand Up @@ -361,10 +362,14 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
// a new Appenderator on its own instead. As a result, they should use different sequence names to allocate
// new segmentIds properly. See IndexerSQLMetadataStorageCoordinator.allocatePendingSegments() for details.
// In this case, we use different fake IDs for each created index task.
final String subtaskId = tuningConfig == null || tuningConfig.getMaxNumConcurrentSubTasks() == 1
? createIndexTaskSpecId(i)
: getId();
return newTask(subtaskId, ingestionSpecs.get(i));
ParallelIndexIngestionSpec ingestionSpec = ingestionSpecs.get(i);
InputSource inputSource = ingestionSpec.getIOConfig().getNonNullInputSource(
ingestionSpec.getDataSchema().getParser()
);
final String subtaskId = ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig)
? getId()
: createIndexTaskSpecId(i);
return newTask(subtaskId, ingestionSpec);
})
.collect(Collectors.toList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,18 +466,25 @@ private void initializeSubTaskCleaner()
registerResourceCloserOnAbnormalExit(currentSubTaskHolder);
}

private boolean isParallelMode()
public static boolean isParallelMode(InputSource inputSource, @Nullable ParallelIndexTuningConfig tuningConfig)
{
if (null == tuningConfig) {
return false;
}
boolean useRangePartitions = useRangePartitions(tuningConfig);
// Range partitioning is not implemented for runSequential() (but hash partitioning is)
int minRequiredNumConcurrentSubTasks = useRangePartitions() ? 1 : 2;
int minRequiredNumConcurrentSubTasks = useRangePartitions ? 1 : 2;
return inputSource.isSplittable() && tuningConfig.getMaxNumConcurrentSubTasks() >= minRequiredNumConcurrentSubTasks;
}

return baseInputSource.isSplittable()
&& ingestionSchema.getTuningConfig().getMaxNumConcurrentSubTasks() >= minRequiredNumConcurrentSubTasks;
private static boolean useRangePartitions(ParallelIndexTuningConfig tuningConfig)
{
return tuningConfig.getGivenOrDefaultPartitionsSpec() instanceof SingleDimensionPartitionsSpec;
}

private boolean useRangePartitions()
private boolean isParallelMode()
{
return ingestionSchema.getTuningConfig().getGivenOrDefaultPartitionsSpec() instanceof SingleDimensionPartitionsSpec;
return isParallelMode(baseInputSource, ingestionSchema.getTuningConfig());
}

/**
Expand Down Expand Up @@ -512,7 +519,7 @@ private TaskStatus runSinglePhaseParallel(TaskToolbox toolbox) throws Exception
*/
private TaskStatus runMultiPhaseParallel(TaskToolbox toolbox) throws Exception
{
return useRangePartitions()
return useRangePartitions(ingestionSchema.getTuningConfig())
? runRangePartitionMultiPhaseParallel(toolbox)
: runHashPartitionMultiPhaseParallel(toolbox);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.druid.timeline.partition.SingleDimensionShardSpec;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -152,7 +153,7 @@ public void testRunParallelWithDynamicPartitioningMatchCompactionState()
lockGranularity == LockGranularity.TIME_CHUNK ? NumberedShardSpec.class : NumberedOverwriteShardSpec.class,
segment.getShardSpec().getClass()
);
// Expecte compaction state to exist as store compaction state by default
// Expect compaction state to exist as store compaction state by default
Assert.assertEquals(expectedState, segment.getLastCompactionState());
}
}
Expand All @@ -161,9 +162,7 @@ public void testRunParallelWithDynamicPartitioningMatchCompactionState()
public void testRunParallelWithHashPartitioningMatchCompactionState()
{
// Hash partitioning is not supported with segment lock yet
if (lockGranularity == LockGranularity.SEGMENT) {
return;
}
Assume.assumeFalse(lockGranularity == LockGranularity.SEGMENT);
runIndexTask(null, true);

final Builder builder = new Builder(
Expand All @@ -182,7 +181,7 @@ public void testRunParallelWithHashPartitioningMatchCompactionState()
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper())
);
for (DataSegment segment : compactedSegments) {
// Expecte compaction state to exist as store compaction state by default
// Expect compaction state to exist as store compaction state by default
Assert.assertSame(HashBasedNumberedShardSpec.class, segment.getShardSpec().getClass());
Assert.assertEquals(expectedState, segment.getLastCompactionState());
}
Expand All @@ -192,9 +191,7 @@ public void testRunParallelWithHashPartitioningMatchCompactionState()
public void testRunParallelWithRangePartitioning()
{
// Range partitioning is not supported with segment lock yet
if (lockGranularity == LockGranularity.SEGMENT) {
return;
}
Assume.assumeFalse(lockGranularity == LockGranularity.SEGMENT);
runIndexTask(null, true);

final Builder builder = new Builder(
Expand All @@ -213,7 +210,36 @@ public void testRunParallelWithRangePartitioning()
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper())
);
for (DataSegment segment : compactedSegments) {
// Expecte compaction state to exist as store compaction state by default
// Expect compaction state to exist as store compaction state by default
Assert.assertSame(SingleDimensionShardSpec.class, segment.getShardSpec().getClass());
Assert.assertEquals(expectedState, segment.getLastCompactionState());
}
}

@Test
public void testRunParallelWithRangePartitioningWithSingleTask()
{
// Range partitioning is not supported with segment lock yet
Assume.assumeFalse(lockGranularity == LockGranularity.SEGMENT);
runIndexTask(null, true);

final Builder builder = new Builder(
DATA_SOURCE,
getSegmentLoaderFactory(),
RETRY_POLICY_FACTORY
);
final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
.tuningConfig(newTuningConfig(new SingleDimensionPartitionsSpec(7, null, "dim", false), 1, true))
.build();

final Set<DataSegment> compactedSegments = runTask(compactionTask);
final CompactionState expectedState = new CompactionState(
new SingleDimensionPartitionsSpec(7, null, "dim", false),
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper())
);
for (DataSegment segment : compactedSegments) {
// Expect compaction state to exist as store compaction state by default
Assert.assertSame(SingleDimensionShardSpec.class, segment.getShardSpec().getClass());
Assert.assertEquals(expectedState, segment.getLastCompactionState());
}
Expand Down Expand Up @@ -242,7 +268,7 @@ public void testRunCompactionStateNotStoreIfContextSetToFalse()
lockGranularity == LockGranularity.TIME_CHUNK ? NumberedShardSpec.class : NumberedOverwriteShardSpec.class,
segment.getShardSpec().getClass()
);
// Expecte compaction state to exist as store compaction state by default
// Expect compaction state to exist as store compaction state by default
Assert.assertEquals(null, segment.getLastCompactionState());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Ordering;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InlineInputSource;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.segment.IndexSpec;
Expand All @@ -36,6 +39,7 @@
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.timeline.partition.BuildingHashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.HashPartitionFunction;
import org.easymock.EasyMock;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Interval;
Expand All @@ -55,6 +59,9 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.mock;

@RunWith(Enclosed.class)
public class ParallelIndexSupervisorTaskTest
{
Expand Down Expand Up @@ -241,4 +248,64 @@ public void testFailToConstructWhenBothAppendToExistingAndForceGuaranteedRollupA
);
}
}

public static class staticUtilsTest
{
@Test
public void testIsParallelModeFalse_nullTuningConfig()
{
InputSource inputSource = mock(InputSource.class);
Assert.assertFalse(ParallelIndexSupervisorTask.isParallelMode(inputSource, null));
}

@Test
public void testIsParallelModeFalse_rangePartition()
{
InputSource inputSource = mock(InputSource.class);
expect(inputSource.isSplittable()).andReturn(true).anyTimes();

ParallelIndexTuningConfig tuningConfig = mock(ParallelIndexTuningConfig.class);
expect(tuningConfig.getGivenOrDefaultPartitionsSpec()).andReturn(mock(SingleDimensionPartitionsSpec.class))
.anyTimes();
expect(tuningConfig.getMaxNumConcurrentSubTasks()).andReturn(0).andReturn(1).andReturn(2);
EasyMock.replay(inputSource, tuningConfig);

Assert.assertFalse(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig));
Assert.assertTrue(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig));
Assert.assertTrue(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig));
}

@Test
public void testIsParallelModeFalse_notRangePartition()
{
InputSource inputSource = mock(InputSource.class);
expect(inputSource.isSplittable()).andReturn(true).anyTimes();

ParallelIndexTuningConfig tuningConfig = mock(ParallelIndexTuningConfig.class);
expect(tuningConfig.getGivenOrDefaultPartitionsSpec()).andReturn(mock(PartitionsSpec.class))
.anyTimes();
expect(tuningConfig.getMaxNumConcurrentSubTasks()).andReturn(1).andReturn(2).andReturn(3);
EasyMock.replay(inputSource, tuningConfig);

Assert.assertFalse(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig));
Assert.assertTrue(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig));
Assert.assertTrue(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig));
}

@Test
public void testIsParallelModeFalse_inputSourceNotSplittable()
{
InputSource inputSource = mock(InputSource.class);
expect(inputSource.isSplittable()).andReturn(false).anyTimes();

ParallelIndexTuningConfig tuningConfig = mock(ParallelIndexTuningConfig.class);
expect(tuningConfig.getGivenOrDefaultPartitionsSpec()).andReturn(mock(SingleDimensionPartitionsSpec.class))
.anyTimes();
expect(tuningConfig.getMaxNumConcurrentSubTasks()).andReturn(3);
EasyMock.replay(inputSource, tuningConfig);

Assert.assertFalse(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig));
}
}

}