diff --git a/core/src/main/java/org/apache/druid/indexer/partitions/DimensionBasedPartitionsSpec.java b/core/src/main/java/org/apache/druid/indexer/partitions/DimensionBasedPartitionsSpec.java index 58c0189ee729..9ab48213dc5d 100644 --- a/core/src/main/java/org/apache/druid/indexer/partitions/DimensionBasedPartitionsSpec.java +++ b/core/src/main/java/org/apache/druid/indexer/partitions/DimensionBasedPartitionsSpec.java @@ -19,6 +19,7 @@ package org.apache.druid.indexer.partitions; +import javax.annotation.Nullable; import java.util.List; /** @@ -33,4 +34,7 @@ public interface DimensionBasedPartitionsSpec extends PartitionsSpec String TARGET_PARTITION_SIZE = "targetPartitionSize"; List getPartitionDimensions(); + + @Nullable + Integer getTargetRowsPerSegment(); } diff --git a/core/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java b/core/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java index f9f639681586..c97086ce130e 100644 --- a/core/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java +++ b/core/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java @@ -54,9 +54,9 @@ public HashedPartitionsSpec( // Deprecated properties preserved for backward compatibility: @Deprecated @JsonProperty(DimensionBasedPartitionsSpec.TARGET_PARTITION_SIZE) @Nullable - Integer targetPartitionSize, + Integer targetPartitionSize, // prefer targetRowsPerSegment @Deprecated @JsonProperty(PartitionsSpec.MAX_ROWS_PER_SEGMENT) @Nullable - Integer maxRowsPerSegment + Integer maxRowsPerSegment // prefer targetRowsPerSegment ) { Integer adjustedTargetRowsPerSegment = PartitionsSpec.resolveHistoricalNullIfNeeded(targetRowsPerSegment); @@ -112,6 +112,13 @@ public HashedPartitionsSpec( this(null, numShards, partitionDimensions, null, maxRowsPerSegment); } + @Nullable + @Override + public Integer getTargetRowsPerSegment() + { + return null; + } + @Nullable @Override @JsonProperty diff --git a/core/src/main/java/org/apache/druid/indexer/partitions/PartitionsSpec.java b/core/src/main/java/org/apache/druid/indexer/partitions/PartitionsSpec.java index 8555ddb3c3a3..11f9ec81212a 100644 --- a/core/src/main/java/org/apache/druid/indexer/partitions/PartitionsSpec.java +++ b/core/src/main/java/org/apache/druid/indexer/partitions/PartitionsSpec.java @@ -30,8 +30,7 @@ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = HashedPartitionsSpec.class) @JsonSubTypes(value = { @JsonSubTypes.Type(name = SingleDimensionPartitionsSpec.NAME, value = SingleDimensionPartitionsSpec.class), - @JsonSubTypes.Type(name = SingleDimensionPartitionsSpec.OLD_NAME, value = SingleDimensionPartitionsSpec.class), - // for backward compatibility + @JsonSubTypes.Type(name = SingleDimensionPartitionsSpec.OLD_NAME, value = SingleDimensionPartitionsSpec.class), // for backward compatibility @JsonSubTypes.Type(name = HashedPartitionsSpec.NAME, value = HashedPartitionsSpec.class), @JsonSubTypes.Type(name = DynamicPartitionsSpec.NAME, value = DynamicPartitionsSpec.class) }) diff --git a/core/src/main/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpec.java b/core/src/main/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpec.java index 3c8e506b56c5..05fc5a473466 100644 --- a/core/src/main/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpec.java +++ b/core/src/main/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpec.java @@ -127,6 +127,7 @@ private static int resolveMaxRowsPerSegment(Property target, Property 0 && currentDimPartition.rows + dvc.numRows >= config.getTargetPartitionSize()) { + if (currentDimPartition.rows > 0 && currentDimPartition.rows + dvc.numRows > config.getTargetPartitionSize()) { final ShardSpec shardSpec = new SingleDimensionShardSpec( currentDimPartitions.dim, currentDimPartitionStart, diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java index e46f727a3451..b5777e917bbc 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java @@ -84,15 +84,15 @@ public class HadoopDruidIndexerConfig { private static final Injector INJECTOR; - public static final String CONFIG_PROPERTY = "druid.indexer.config"; - public static final Charset JAVA_NATIVE_CHARSET = Charset.forName("Unicode"); - public static final Splitter TAB_SPLITTER = Splitter.on("\t"); - public static final Joiner TAB_JOINER = Joiner.on("\t"); + static final String CONFIG_PROPERTY = "druid.indexer.config"; + static final Charset JAVA_NATIVE_CHARSET = Charset.forName("Unicode"); + static final Splitter TAB_SPLITTER = Splitter.on("\t"); + static final Joiner TAB_JOINER = Joiner.on("\t"); public static final ObjectMapper JSON_MAPPER; public static final IndexIO INDEX_IO; - public static final IndexMerger INDEX_MERGER_V9; - public static final HadoopKerberosConfig HADOOP_KERBEROS_CONFIG; - public static final DataSegmentPusher DATA_SEGMENT_PUSHER; + static final IndexMerger INDEX_MERGER_V9; + static final HadoopKerberosConfig HADOOP_KERBEROS_CONFIG; + static final DataSegmentPusher DATA_SEGMENT_PUSHER; private static final String DEFAULT_WORKING_PATH = "/tmp/druid-indexing"; @@ -133,7 +133,7 @@ public static HadoopDruidIndexerConfig fromSpec(HadoopIngestionSpec spec) return new HadoopDruidIndexerConfig(spec); } - public static HadoopDruidIndexerConfig fromMap(Map argSpec) + private static HadoopDruidIndexerConfig fromMap(Map argSpec) { // Eventually PathSpec needs to get rid of its Hadoop dependency, then maybe this can be ingested directly without // the Map<> intermediary @@ -226,14 +226,7 @@ public HadoopDruidIndexerConfig( shardSpecLookups.put( entry.getKey(), actualSpec.getLookup( Lists.transform( - entry.getValue(), new Function() - { - @Override - public ShardSpec apply(HadoopyShardSpec input) - { - return input.getActualSpec(); - } - } + entry.getValue(), HadoopyShardSpec::getActualSpec ) ) ); @@ -298,7 +291,7 @@ public IndexSpec getIndexSpecForIntermediatePersists() return schema.getTuningConfig().getIndexSpecForIntermediatePersists(); } - public boolean isOverwriteFiles() + boolean isOverwriteFiles() { return schema.getTuningConfig().isOverwriteFiles(); } @@ -313,24 +306,30 @@ public Optional> getIntervals() { Optional> setOptional = schema.getDataSchema().getGranularitySpec().bucketIntervals(); if (setOptional.isPresent()) { - return Optional.of((List) JodaUtils.condenseIntervals(setOptional.get())); + return Optional.of(JodaUtils.condenseIntervals(setOptional.get())); } else { return Optional.absent(); } } - public boolean isDeterminingPartitions() + boolean isDeterminingPartitions() { return schema.getTuningConfig().getPartitionsSpec().needsDeterminePartitions(true); } public int getTargetPartitionSize() { - final Integer targetPartitionSize = schema.getTuningConfig().getPartitionsSpec().getMaxRowsPerSegment(); + DimensionBasedPartitionsSpec spec = schema.getTuningConfig().getPartitionsSpec(); + + if (spec.getTargetRowsPerSegment() != null) { + return spec.getTargetRowsPerSegment(); + } + + final Integer targetPartitionSize = spec.getMaxRowsPerSegment(); return targetPartitionSize == null ? -1 : targetPartitionSize; } - public boolean isForceExtendableShardSpecs() + boolean isForceExtendableShardSpecs() { return schema.getTuningConfig().isForceExtendableShardSpecs(); } @@ -355,7 +354,7 @@ public HadoopyShardSpec getShardSpec(Bucket bucket) return schema.getTuningConfig().getShardSpecs().get(bucket.time.getMillis()).get(bucket.partitionNum); } - public int getShardSpecCount(Bucket bucket) + int getShardSpecCount(Bucket bucket) { return schema.getTuningConfig().getShardSpecs().get(bucket.time.getMillis()).size(); } @@ -370,18 +369,18 @@ public int getMaxParseExceptions() return schema.getTuningConfig().getMaxParseExceptions(); } - public boolean isUseYarnRMJobStatusFallback() + boolean isUseYarnRMJobStatusFallback() { return schema.getTuningConfig().isUseYarnRMJobStatusFallback(); } - public void setHadoopJobIdFileName(String hadoopJobIdFileName) + void setHadoopJobIdFileName(String hadoopJobIdFileName) { this.hadoopJobIdFileName = hadoopJobIdFileName; } - public String getHadoopJobIdFileName() + String getHadoopJobIdFileName() { return hadoopJobIdFileName; } @@ -390,9 +389,6 @@ public String getHadoopJobIdFileName() * Job instance should have Configuration set (by calling {@link #addJobProperties(Job)} * or via injected system properties) before this method is called. The {@link PathSpec} may * create objects which depend on the values of these configurations. - * @param job - * @return - * @throws IOException */ public Job addInputPaths(Job job) throws IOException { @@ -410,7 +406,7 @@ public Job addInputPaths(Job job) throws IOException * * @return the Bucket that this row belongs to */ - public Optional getBucket(InputRow inputRow) + Optional getBucket(InputRow inputRow) { final Optional timeBucket = schema.getDataSchema().getGranularitySpec().bucketInterval( DateTimes.utc(inputRow.getTimestampFromEpoch()) @@ -436,13 +432,13 @@ public Optional getBucket(InputRow inputRow) } - public Optional> getSegmentGranularIntervals() + Optional> getSegmentGranularIntervals() { return Optional.fromNullable( - (Set) schema.getDataSchema() - .getGranularitySpec() - .bucketIntervals() - .orNull() + schema.getDataSchema() + .getGranularitySpec() + .bucketIntervals() + .orNull() ); } @@ -453,40 +449,35 @@ public List getInputIntervals() .inputIntervals(); } - public Optional> getAllBuckets() + Optional> getAllBuckets() { Optional> intervals = getSegmentGranularIntervals(); if (intervals.isPresent()) { return Optional.of( - (Iterable) FunctionalIterable + FunctionalIterable .create(intervals.get()) .transformCat( - new Function>() - { - @Override - public Iterable apply(Interval input) - { - final DateTime bucketTime = input.getStart(); - final List specs = schema.getTuningConfig().getShardSpecs().get(bucketTime.getMillis()); - if (specs == null) { - return ImmutableList.of(); - } - - return FunctionalIterable - .create(specs) - .transform( - new Function() - { - int i = 0; + input -> { + final DateTime bucketTime = input.getStart(); + final List specs = schema.getTuningConfig().getShardSpecs().get(bucketTime.getMillis()); + if (specs == null) { + return ImmutableList.of(); + } + + return FunctionalIterable + .create(specs) + .transform( + new Function() + { + int i = 0; - @Override - public Bucket apply(HadoopyShardSpec input) - { - return new Bucket(input.getShardNum(), bucketTime, i++); - } + @Override + public Bucket apply(HadoopyShardSpec input) + { + return new Bucket(input.getShardNum(), bucketTime, i++); } - ); - } + } + ); } ) ); @@ -511,7 +502,7 @@ public String getWorkingPath() * @return the intermediate path for this job run. */ - public Path makeIntermediatePath() + Path makeIntermediatePath() { return new Path( StringUtils.format( @@ -524,7 +515,7 @@ public Path makeIntermediatePath() ); } - public Path makeSegmentPartitionInfoPath(Interval bucketInterval) + Path makeSegmentPartitionInfoPath(Interval bucketInterval) { return new Path( StringUtils.format( @@ -536,7 +527,7 @@ public Path makeSegmentPartitionInfoPath(Interval bucketInterval) ); } - public Path makeIntervalInfoPath() + Path makeIntervalInfoPath() { return new Path( StringUtils.format( @@ -546,27 +537,27 @@ public Path makeIntervalInfoPath() ); } - public Path makeDescriptorInfoDir() + Path makeDescriptorInfoDir() { return new Path(makeIntermediatePath(), "segmentDescriptorInfo"); } - public Path makeGroupedDataDir() + Path makeGroupedDataDir() { return new Path(makeIntermediatePath(), "groupedData"); } - public Path makeDescriptorInfoPath(DataSegment segment) + Path makeDescriptorInfoPath(DataSegment segment) { return new Path(makeDescriptorInfoDir(), StringUtils.removeChar(segment.getId() + ".json", ':')); } - public void addJobProperties(Job job) + void addJobProperties(Job job) { addJobProperties(job.getConfiguration()); } - public void addJobProperties(Configuration conf) + void addJobProperties(Configuration conf) { for (final Map.Entry entry : schema.getTuningConfig().getJobProperties().entrySet()) { conf.set(entry.getKey(), entry.getValue()); @@ -597,7 +588,7 @@ public void verify() Preconditions.checkNotNull(schema.getTuningConfig().getVersion(), "version"); } - public List getAllowedHadoopPrefix() + List getAllowedHadoopPrefix() { return allowedHadoopPrefix; } diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java index 89b7fde43d61..6abc2e651edb 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java @@ -41,6 +41,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.util.Arrays; @@ -51,6 +52,11 @@ @RunWith(Parameterized.class) public class DeterminePartitionsJobTest { + @Nullable + private static final Long NO_TARGET_ROWS_PER_SEGMENT = null; + @Nullable + private static final Long NO_MAX_ROWS_PER_SEGMENT = null; + private final HadoopDruidIndexerConfig config; private final int expectedNumOfSegments; private final int[] expectedNumOfShardsForEachSegment; @@ -59,19 +65,51 @@ public class DeterminePartitionsJobTest private final File tmpDir; @Parameterized.Parameters(name = "assumeGrouped={0}, " - + "maxRowsPerSegment={1}, " - + "interval={2}" - + "expectedNumOfSegments={3}, " - + "expectedNumOfShardsForEachSegment={4}, " - + "expectedStartEndForEachShard={5}, " - + "data={6}") + + "targetRowsPerSegment={1}, " + + "maxRowsPerSegment={2}, " + + "interval={3}" + + "expectedNumOfSegments={4}, " + + "expectedNumOfShardsForEachSegment={5}, " + + "expectedStartEndForEachShard={6}, " + + "data={7}") public static Collection constructFeed() { return Arrays.asList( new Object[][]{ { + // Test partitoning by targetRowsPerSegment true, - 3, + 2, + NO_MAX_ROWS_PER_SEGMENT, + "2014-10-22T00:00:00Z/P1D", + 1, + new int[]{5}, + new String[][][]{ + { + {null, "c.example.com"}, + {"c.example.com", "e.example.com"}, + {"e.example.com", "g.example.com"}, + {"g.example.com", "i.example.com"}, + {"i.example.com", null} + } + }, + ImmutableList.of( + "2014102200,a.example.com,CN,100", + "2014102200,b.example.com,US,50", + "2014102200,c.example.com,US,200", + "2014102200,d.example.com,US,250", + "2014102200,e.example.com,US,123", + "2014102200,f.example.com,US,567", + "2014102200,g.example.com,US,11", + "2014102200,h.example.com,US,251", + "2014102200,i.example.com,US,963", + "2014102200,j.example.com,US,333" + ) + }, + { + true, + NO_TARGET_ROWS_PER_SEGMENT, + 2, "2014-10-22T00:00:00Z/P1D", 1, new int[]{5}, @@ -86,7 +124,7 @@ public static Collection constructFeed() }, ImmutableList.of( "2014102200,a.example.com,CN,100", - "2014102200,b.exmaple.com,US,50", + "2014102200,b.example.com,US,50", "2014102200,c.example.com,US,200", "2014102200,d.example.com,US,250", "2014102200,e.example.com,US,123", @@ -99,7 +137,8 @@ public static Collection constructFeed() }, { false, - 3, + NO_TARGET_ROWS_PER_SEGMENT, + 2, "2014-10-20T00:00:00Z/P1D", 1, new int[]{5}, @@ -115,8 +154,8 @@ public static Collection constructFeed() ImmutableList.of( "2014102000,a.example.com,CN,100", "2014102000,a.example.com,CN,100", - "2014102000,b.exmaple.com,US,50", - "2014102000,b.exmaple.com,US,50", + "2014102000,b.example.com,US,50", + "2014102000,b.example.com,US,50", "2014102000,c.example.com,US,200", "2014102000,c.example.com,US,200", "2014102000,d.example.com,US,250", @@ -137,7 +176,8 @@ public static Collection constructFeed() }, { true, - 6, + NO_TARGET_ROWS_PER_SEGMENT, + 5, "2014-10-20T00:00:00Z/P3D", 3, new int[]{2, 2, 2}, @@ -157,7 +197,7 @@ public static Collection constructFeed() }, ImmutableList.of( "2014102000,a.example.com,CN,100", - "2014102000,b.exmaple.com,CN,50", + "2014102000,b.example.com,CN,50", "2014102000,c.example.com,CN,200", "2014102000,d.example.com,US,250", "2014102000,e.example.com,US,123", @@ -166,9 +206,8 @@ public static Collection constructFeed() "2014102000,h.example.com,US,251", "2014102000,i.example.com,US,963", "2014102000,j.example.com,US,333", - "2014102000,k.example.com,US,555", "2014102100,a.example.com,CN,100", - "2014102100,b.exmaple.com,CN,50", + "2014102100,b.example.com,CN,50", "2014102100,c.example.com,CN,200", "2014102100,d.example.com,US,250", "2014102100,e.example.com,US,123", @@ -177,9 +216,8 @@ public static Collection constructFeed() "2014102100,h.example.com,US,251", "2014102100,i.example.com,US,963", "2014102100,j.example.com,US,333", - "2014102100,k.example.com,US,555", "2014102200,a.example.com,CN,100", - "2014102200,b.exmaple.com,CN,50", + "2014102200,b.example.com,CN,50", "2014102200,c.example.com,CN,200", "2014102200,d.example.com,US,250", "2014102200,e.example.com,US,123", @@ -187,12 +225,12 @@ public static Collection constructFeed() "2014102200,g.example.com,US,11", "2014102200,h.example.com,US,251", "2014102200,i.example.com,US,963", - "2014102200,j.example.com,US,333", - "2014102200,k.example.com,US,555" + "2014102200,j.example.com,US,333" ) }, { true, + NO_TARGET_ROWS_PER_SEGMENT, 1000, "2014-10-22T00:00:00Z/P1D", 1, @@ -204,7 +242,7 @@ public static Collection constructFeed() }, ImmutableList.of( "2014102200,a.example.com,CN,100", - "2014102200,b.exmaple.com,US,50", + "2014102200,b.example.com,US,50", "2014102200,c.example.com,US,200", "2014102200,d.example.com,US,250", "2014102200,e.example.com,US,123", @@ -221,6 +259,7 @@ public static Collection constructFeed() public DeterminePartitionsJobTest( boolean assumeGrouped, + @Nullable Integer targetRowsPerSegment, Integer maxRowsPerSegment, String interval, int expectedNumOfSegments, @@ -284,7 +323,7 @@ public DeterminePartitionsJobTest( new HadoopTuningConfig( tmpDir.getCanonicalPath(), null, - new SingleDimensionPartitionsSpec(null, maxRowsPerSegment, null, assumeGrouped), + new SingleDimensionPartitionsSpec(targetRowsPerSegment, maxRowsPerSegment, null, assumeGrouped), null, null, null, diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java index 47b430780c6c..e3e357c3955f 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java @@ -24,6 +24,10 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.indexer.partitions.DimensionBasedPartitionsSpec; +import org.apache.druid.indexer.partitions.HashedPartitionsSpec; +import org.apache.druid.indexer.partitions.PartitionsSpec; +import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; @@ -36,13 +40,13 @@ import org.junit.Assert; import org.junit.Test; +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; -/** - */ public class HadoopDruidIndexerConfigTest { private static final ObjectMapper JSON_MAPPER; @@ -55,56 +59,18 @@ public class HadoopDruidIndexerConfigTest @Test public void testHashedBucketSelection() { - List specs = new ArrayList<>(); + List shardSpecs = new ArrayList<>(); final int partitionCount = 10; for (int i = 0; i < partitionCount; i++) { - specs.add(new HadoopyShardSpec( + shardSpecs.add(new HadoopyShardSpec( new HashBasedNumberedShardSpec(i, partitionCount, null, new DefaultObjectMapper()), i )); } - HadoopIngestionSpec spec = new HadoopIngestionSpec( - new DataSchema( - "foo", - null, - new AggregatorFactory[0], - new UniformGranularitySpec( - Granularities.MINUTE, - Granularities.MINUTE, - ImmutableList.of(Intervals.of("2010-01-01/P1D")) - ), - null, - JSON_MAPPER - ), - new HadoopIOConfig(ImmutableMap.of("paths", "bar", "type", "static"), null, null), - new HadoopTuningConfig( - null, - null, - null, - ImmutableMap.of(DateTimes.of("2010-01-01T01:00:00").getMillis(), specs), - null, - null, - null, - null, - false, - false, - false, - false, - null, - false, - false, - null, - null, - null, - false, - false, - null, - null, - null, - null - ) - ); + HadoopIngestionSpec spec = new HadoopIngestionSpecBuilder() + .shardSpecs(ImmutableMap.of(DateTimes.of("2010-01-01T01:00:00").getMillis(), shardSpecs)) + .build(); HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSpec(spec); final List dims = Arrays.asList("diM1", "dIM2"); final ImmutableMap values = ImmutableMap.of( @@ -133,57 +99,21 @@ public void testHashedBucketSelection() @Test public void testNoneShardSpecBucketSelection() { - HadoopIngestionSpec spec = new HadoopIngestionSpec( - new DataSchema( - "foo", - null, - new AggregatorFactory[0], - new UniformGranularitySpec( - Granularities.MINUTE, - Granularities.MINUTE, - ImmutableList.of(Intervals.of("2010-01-01/P1D")) - ), - null, - JSON_MAPPER - ), - new HadoopIOConfig(ImmutableMap.of("paths", "bar", "type", "static"), null, null), - new HadoopTuningConfig( - null, - null, - null, - ImmutableMap.of(DateTimes.of("2010-01-01T01:00:00").getMillis(), - Collections.singletonList(new HadoopyShardSpec( - NoneShardSpec.instance(), - 1 - )), - DateTimes.of("2010-01-01T02:00:00").getMillis(), - Collections.singletonList(new HadoopyShardSpec( - NoneShardSpec.instance(), - 2 - )) - ), - null, - null, - null, - null, - false, - false, - false, - false, - null, - false, - false, - null, - null, - null, - false, - false, - null, - null, - null, - null - ) + Map> shardSpecs = ImmutableMap.of( + DateTimes.of("2010-01-01T01:00:00").getMillis(), + Collections.singletonList(new HadoopyShardSpec( + NoneShardSpec.instance(), + 1 + )), + DateTimes.of("2010-01-01T02:00:00").getMillis(), + Collections.singletonList(new HadoopyShardSpec( + NoneShardSpec.instance(), + 2 + )) ); + HadoopIngestionSpec spec = new HadoopIngestionSpecBuilder() + .shardSpecs(shardSpecs) + .build(); HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSpec(spec); final List dims = Arrays.asList("diM1", "dIM2"); final ImmutableMap values = ImmutableMap.of( @@ -197,10 +127,132 @@ public void testNoneShardSpecBucketSelection() "4" ); final long ts1 = DateTimes.of("2010-01-01T01:00:01").getMillis(); - Assert.assertEquals(config.getBucket(new MapBasedInputRow(ts1, dims, values)).get().getShardNum(), 1); + Assert.assertEquals(1, config.getBucket(new MapBasedInputRow(ts1, dims, values)).get().getShardNum()); final long ts2 = DateTimes.of("2010-01-01T02:00:01").getMillis(); - Assert.assertEquals(config.getBucket(new MapBasedInputRow(ts2, dims, values)).get().getShardNum(), 2); + Assert.assertEquals(2, config.getBucket(new MapBasedInputRow(ts2, dims, values)).get().getShardNum()); + } + + @Test + public void testGetTargetPartitionSizeWithHashedPartitions() + { + HadoopIngestionSpec spec = new HadoopIngestionSpecBuilder() + .partitionsSpec(HashedPartitionsSpec.defaultSpec()) + .build(); + HadoopDruidIndexerConfig config = new HadoopDruidIndexerConfig(spec); + int targetPartitionSize = config.getTargetPartitionSize(); + Assert.assertEquals(PartitionsSpec.DEFAULT_MAX_ROWS_PER_SEGMENT, targetPartitionSize); + } + + @Test + public void testGetTargetPartitionSizeWithSingleDimensionPartitionsTargetRowsPerSegment() + { + int targetRowsPerSegment = 123; + SingleDimensionPartitionsSpec partitionsSpec = new SingleDimensionPartitionsSpec( + targetRowsPerSegment, + null, + null, + false + + ); + HadoopIngestionSpec spec = new HadoopIngestionSpecBuilder() + .partitionsSpec(partitionsSpec) + .build(); + HadoopDruidIndexerConfig config = new HadoopDruidIndexerConfig(spec); + int targetPartitionSize = config.getTargetPartitionSize(); + Assert.assertEquals(targetRowsPerSegment, targetPartitionSize); + } + + @Test + public void testGetTargetPartitionSizeWithSingleDimensionPartitionsMaxRowsPerSegment() + { + int maxRowsPerSegment = 456; + SingleDimensionPartitionsSpec partitionsSpec = new SingleDimensionPartitionsSpec( + null, + maxRowsPerSegment, + null, + false + ); + HadoopIngestionSpec spec = new HadoopIngestionSpecBuilder() + .partitionsSpec(partitionsSpec) + .build(); + HadoopDruidIndexerConfig config = new HadoopDruidIndexerConfig(spec); + int targetPartitionSize = config.getTargetPartitionSize(); + Assert.assertEquals(maxRowsPerSegment, targetPartitionSize); + } + + private static class HadoopIngestionSpecBuilder + { + private static final DataSchema DATA_SCHEMA = new DataSchema( + "foo", + null, + new AggregatorFactory[0], + new UniformGranularitySpec( + Granularities.MINUTE, + Granularities.MINUTE, + ImmutableList.of(Intervals.of("2010-01-01/P1D")) + ), + null, + HadoopDruidIndexerConfigTest.JSON_MAPPER + ); + + private static final HadoopIOConfig HADOOP_IO_CONFIG = new HadoopIOConfig( + ImmutableMap.of("paths", "bar", "type", "static"), + null, + null + ); + + @Nullable + private DimensionBasedPartitionsSpec partitionsSpec = null; + private Map> shardSpecs = Collections.emptyMap(); + + HadoopIngestionSpecBuilder partitionsSpec(DimensionBasedPartitionsSpec partitionsSpec) + { + this.partitionsSpec = partitionsSpec; + return this; + } + + HadoopIngestionSpecBuilder shardSpecs(Map> shardSpecs) + { + this.shardSpecs = shardSpecs; + return this; + } + + HadoopIngestionSpec build() + { + HadoopTuningConfig hadoopTuningConfig = new HadoopTuningConfig( + null, + null, + partitionsSpec, + shardSpecs, + null, + null, + null, + null, + false, + false, + false, + false, + null, + false, + false, + null, + null, + null, + false, + false, + null, + null, + null, + null + ); + + return new HadoopIngestionSpec( + DATA_SCHEMA, + HADOOP_IO_CONFIG, + hadoopTuningConfig + ); + } } }