diff --git a/core/src/main/java/org/apache/druid/segment/SegmentUtils.java b/core/src/main/java/org/apache/druid/segment/SegmentUtils.java index 6f273486287b..8b54f72867a2 100644 --- a/core/src/main/java/org/apache/druid/segment/SegmentUtils.java +++ b/core/src/main/java/org/apache/druid/segment/SegmentUtils.java @@ -29,6 +29,7 @@ import org.apache.druid.java.util.common.IOE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; import javax.annotation.Nullable; import java.io.File; @@ -36,9 +37,12 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * Utility methods useful for implementing deep storage extensions. @@ -93,6 +97,15 @@ public static Object commaSeparatedIdentifiers(@Nullable final Collection> groupSegmentsByInterval(Collection segments) + { + final Map> intervalToSegments = new HashMap<>(); + segments.forEach( + segment -> intervalToSegments.computeIfAbsent(segment.getInterval(), k -> new ArrayList<>()).add(segment) + ); + return intervalToSegments; + } + private SegmentUtils() { // no instantiation diff --git a/core/src/main/java/org/apache/druid/timeline/DataSegment.java b/core/src/main/java/org/apache/druid/timeline/DataSegment.java index 07c6ae7f689c..18486639ee96 100644 --- a/core/src/main/java/org/apache/druid/timeline/DataSegment.java +++ b/core/src/main/java/org/apache/druid/timeline/DataSegment.java @@ -168,7 +168,7 @@ public DataSegment( List dimensions, List metrics, ShardSpec shardSpec, - CompactionState lastCompactionState, + @Nullable CompactionState lastCompactionState, Integer binaryVersion, long size ) diff --git a/core/src/main/java/org/apache/druid/timeline/partition/BucketNumberedShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/BucketNumberedShardSpec.java index d692dad113da..fba7c9eb6d16 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/BucketNumberedShardSpec.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/BucketNumberedShardSpec.java @@ -97,10 +97,4 @@ default boolean possibleInDomain(Map> domain) { throw new UnsupportedOperationException(); } - - @Override - default boolean isCompatible(Class other) - { - throw new UnsupportedOperationException(); - } } diff --git a/core/src/main/java/org/apache/druid/timeline/partition/BuildingShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/BuildingShardSpec.java index 973fdf4d2a7d..b7ad78b3a928 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/BuildingShardSpec.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/BuildingShardSpec.java @@ -94,10 +94,4 @@ default boolean possibleInDomain(Map> domain) { throw new UnsupportedOperationException(); } - - @Override - default boolean isCompatible(Class other) - { - throw new UnsupportedOperationException(); - } } diff --git a/core/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedPartialShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedPartialShardSpec.java index 0e32ee04bcf1..22f254322357 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedPartialShardSpec.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedPartialShardSpec.java @@ -68,17 +68,11 @@ public int getNumBuckets() } @Override - public ShardSpec complete(ObjectMapper objectMapper, @Nullable ShardSpec specOfPreviousMaxPartitionId) + public ShardSpec complete(ObjectMapper objectMapper, int partitionId, int numCorePartitions) { - // The shardSpec is created by the Overlord. - // For batch tasks, this code can be executed only with segment locking (forceTimeChunkLock = false). - // In this mode, you can have 2 or more tasks concurrently ingesting into the same time chunk of - // the same datasource. Since there is no restriction for those tasks in segment allocation, the - // allocated IDs for each task can interleave. As a result, the core partition set cannot be - // represented as a range. We always set 0 for the core partition set size if this is an initial segment. return new HashBasedNumberedShardSpec( - specOfPreviousMaxPartitionId == null ? 0 : specOfPreviousMaxPartitionId.getPartitionNum() + 1, - specOfPreviousMaxPartitionId == null ? 0 : specOfPreviousMaxPartitionId.getNumCorePartitions(), + partitionId, + numCorePartitions, bucketId, numBuckets, partitionDimensions, @@ -86,12 +80,6 @@ public ShardSpec complete(ObjectMapper objectMapper, @Nullable ShardSpec specOfP ); } - @Override - public ShardSpec complete(ObjectMapper objectMapper, int partitionId) - { - return new HashBasedNumberedShardSpec(partitionId, 0, bucketId, numBuckets, partitionDimensions, objectMapper); - } - @Override public Class getShardSpecClass() { diff --git a/core/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpec.java index 23cdb4ef3868..721eb678b248 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpec.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpec.java @@ -54,7 +54,7 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec @JsonCreator public HashBasedNumberedShardSpec( - @JsonProperty("partitionNum") int partitionNum, // partitionId, hash bucketId + @JsonProperty("partitionNum") int partitionNum, // partitionId @JsonProperty("partitions") int partitions, // core partition set size @JsonProperty("bucketId") @Nullable Integer bucketId, // nullable for backward compatibility @JsonProperty("numBuckets") @Nullable Integer numBuckets, // nullable for backward compatibility @@ -90,12 +90,6 @@ public List getPartitionDimensions() return partitionDimensions; } - @Override - public boolean isCompatible(Class other) - { - return other == HashBasedNumberedShardSpec.class; - } - @Override public boolean isInChunk(long timestamp, InputRow inputRow) { diff --git a/core/src/main/java/org/apache/druid/timeline/partition/LinearPartialShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/LinearPartialShardSpec.java index 82a8f34c8d12..5abf2cf46306 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/LinearPartialShardSpec.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/LinearPartialShardSpec.java @@ -21,8 +21,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; -import javax.annotation.Nullable; - public class LinearPartialShardSpec implements PartialShardSpec { private static final LinearPartialShardSpec INSTANCE = new LinearPartialShardSpec(); @@ -37,16 +35,9 @@ private LinearPartialShardSpec() } @Override - public ShardSpec complete(ObjectMapper objectMapper, @Nullable ShardSpec specOfPreviousMaxPartitionId) - { - return new LinearShardSpec( - specOfPreviousMaxPartitionId == null ? 0 : specOfPreviousMaxPartitionId.getPartitionNum() + 1 - ); - } - - @Override - public ShardSpec complete(ObjectMapper objectMapper, int partitionId) + public ShardSpec complete(ObjectMapper objectMapper, int partitionId, int numCorePartitions) { + // numCorePartitions is ignored return new LinearShardSpec(partitionId); } diff --git a/core/src/main/java/org/apache/druid/timeline/partition/LinearShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/LinearShardSpec.java index 95b0bd832b9b..0f16a427abcb 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/LinearShardSpec.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/LinearShardSpec.java @@ -73,12 +73,6 @@ public boolean possibleInDomain(Map> domain) return true; } - @Override - public boolean isCompatible(Class other) - { - return other == LinearShardSpec.class; - } - @Override public PartitionChunk createChunk(T obj) { diff --git a/core/src/main/java/org/apache/druid/timeline/partition/NoneShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/NoneShardSpec.java index d53011819413..1f8498beb4a3 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/NoneShardSpec.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/NoneShardSpec.java @@ -89,12 +89,6 @@ public boolean possibleInDomain(Map> domain) return true; } - @Override - public boolean isCompatible(Class other) - { - return other == NoneShardSpec.class; - } - @Override public boolean equals(Object obj) { diff --git a/core/src/main/java/org/apache/druid/timeline/partition/NumberedOverwritePartialShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/NumberedOverwritePartialShardSpec.java index b61ea0ddfe2e..fd8dd5d44692 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/NumberedOverwritePartialShardSpec.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/NumberedOverwritePartialShardSpec.java @@ -22,8 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; - -import javax.annotation.Nullable; +import com.google.common.annotations.VisibleForTesting; public class NumberedOverwritePartialShardSpec implements PartialShardSpec { @@ -43,6 +42,12 @@ public NumberedOverwritePartialShardSpec( this.minorVersion = minorVersion; } + @VisibleForTesting + public NumberedOverwritePartialShardSpec(int startRootPartitionId, int endRootPartitionId, int minorVersion) + { + this(startRootPartitionId, endRootPartitionId, (short) minorVersion); + } + @JsonProperty public int getStartRootPartitionId() { @@ -62,14 +67,10 @@ public short getMinorVersion() } @Override - public ShardSpec complete(ObjectMapper objectMapper, @Nullable ShardSpec specOfPreviousMaxPartitionId) + public ShardSpec complete(ObjectMapper objectMapper, int partitionId, int numCorePartitions) { - // specOfPreviousMaxPartitionId is the max partitionId of the same shardSpec - // and could be null if all existing segments are first-generation segments. return new NumberedOverwriteShardSpec( - specOfPreviousMaxPartitionId == null - ? PartitionIds.NON_ROOT_GEN_START_PARTITION_ID - : specOfPreviousMaxPartitionId.getPartitionNum() + 1, + partitionId, startRootPartitionId, endRootPartitionId, minorVersion @@ -77,14 +78,14 @@ public ShardSpec complete(ObjectMapper objectMapper, @Nullable ShardSpec specOfP } @Override - public ShardSpec complete(ObjectMapper objectMapper, int partitionId) + public Class getShardSpecClass() { - return new NumberedOverwriteShardSpec(partitionId, startRootPartitionId, endRootPartitionId, minorVersion); + return NumberedOverwriteShardSpec.class; } @Override - public Class getShardSpecClass() + public boolean useNonRootGenerationPartitionSpace() { - return NumberedOverwriteShardSpec.class; + return true; } } diff --git a/core/src/main/java/org/apache/druid/timeline/partition/NumberedOverwriteShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/NumberedOverwriteShardSpec.java index adb0d288d6a6..55ba3ce200ba 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/NumberedOverwriteShardSpec.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/NumberedOverwriteShardSpec.java @@ -205,12 +205,6 @@ public boolean possibleInDomain(Map> domain) return true; } - @Override - public boolean isCompatible(Class other) - { - return other == NumberedOverwriteShardSpec.class || other == NumberedShardSpec.class; - } - @Override public boolean equals(Object o) { diff --git a/core/src/main/java/org/apache/druid/timeline/partition/NumberedPartialShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/NumberedPartialShardSpec.java index 730502845f30..f7220ad2156a 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/NumberedPartialShardSpec.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/NumberedPartialShardSpec.java @@ -21,8 +21,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; -import javax.annotation.Nullable; - public class NumberedPartialShardSpec implements PartialShardSpec { private static final NumberedPartialShardSpec INSTANCE = new NumberedPartialShardSpec(); @@ -37,27 +35,9 @@ private NumberedPartialShardSpec() } @Override - public ShardSpec complete(ObjectMapper objectMapper, @Nullable ShardSpec specOfPreviousMaxPartitionId) - { - if (specOfPreviousMaxPartitionId == null) { - // The shardSpec is created by the Overlord. - // - For streaming ingestion tasks, the core partition set is always 0. - // - For batch tasks, this code is executed only with segment locking (forceTimeChunkLock = false). - // In this mode, you can have 2 or more tasks concurrently ingesting into the same time chunk of - // the same datasource. Since there is no restriction for those tasks in segment allocation, the - // allocated IDs for each task can interleave. As a result, the core partition set cannot be - // represented as a range. We always set 0 for the core partition set size. - return new NumberedShardSpec(0, 0); - } else { - final NumberedShardSpec prevSpec = (NumberedShardSpec) specOfPreviousMaxPartitionId; - return new NumberedShardSpec(prevSpec.getPartitionNum() + 1, prevSpec.getNumCorePartitions()); - } - } - - @Override - public ShardSpec complete(ObjectMapper objectMapper, int partitionId) + public ShardSpec complete(ObjectMapper objectMapper, int partitionId, int numCorePartitions) { - return new NumberedShardSpec(partitionId, 0); + return new NumberedShardSpec(partitionId, numCorePartitions); } @Override diff --git a/core/src/main/java/org/apache/druid/timeline/partition/NumberedShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/NumberedShardSpec.java index 5db6bf6000ac..acf091d6dd41 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/NumberedShardSpec.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/NumberedShardSpec.java @@ -93,12 +93,6 @@ public boolean possibleInDomain(Map> domain) return true; } - @Override - public boolean isCompatible(Class other) - { - return other == NumberedShardSpec.class || other == NumberedOverwriteShardSpec.class; - } - @Override @JsonProperty("partitions") public int getNumCorePartitions() diff --git a/core/src/main/java/org/apache/druid/timeline/partition/OverwriteShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/OverwriteShardSpec.java index 6a77ea55286b..f51e688c24c3 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/OverwriteShardSpec.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/OverwriteShardSpec.java @@ -45,4 +45,16 @@ default OverwriteShardSpec withAtomicUpdateGroupSize(int atomicUpdateGroupSize) } OverwriteShardSpec withAtomicUpdateGroupSize(short atomicUpdateGroupSize); + + /** + * Returns true if this shardSpec and the given {@link PartialShardSpec} share the same partition space. + * This shardSpec uses non-root-generation partition space and thus does not share the space with other shardSpecs. + * + * @see PartitionIds + */ + @Override + default boolean sharePartitionSpace(PartialShardSpec partialShardSpec) + { + return partialShardSpec.useNonRootGenerationPartitionSpace(); + } } diff --git a/core/src/main/java/org/apache/druid/timeline/partition/PartialShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/PartialShardSpec.java index 6afaa939471e..bb7ec444928f 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/PartialShardSpec.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/PartialShardSpec.java @@ -25,8 +25,6 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.databind.ObjectMapper; -import javax.annotation.Nullable; - /** * This interface is used in the segment allocation protocol when it is coordinated by the Overlord; when appending * segments to an existing datasource (either streaming ingestion or batch append) or any case when segment @@ -45,21 +43,31 @@ public interface PartialShardSpec { /** - * Creates a new ShardSpec based on {@code specOfPreviousMaxPartitionId}. If it's null, it assumes that this is the - * first call for the time chunk where the new segment is created. - * Note that {@code specOfPreviousMaxPartitionId} can also be null for {@link OverwriteShardSpec} if all segments - * in the timeChunk are first-generation segments. - */ - ShardSpec complete(ObjectMapper objectMapper, @Nullable ShardSpec specOfPreviousMaxPartitionId); - - /** - * Creates a new shardSpec having the given partitionId. + * Creates a new ShardSpec with given partitionId and numCorePartitions. + * + * @param objectMapper jsonMapper used only for {@link HashBasedNumberedShardSpec} + * @param partitionId partitionId of the shardSpec. must be carefully chosen to be unique in a time chunk + * @param numCorePartitions the core partition set size. Should be set properly to determine if this segment belongs + * to the core partitions. */ - ShardSpec complete(ObjectMapper objectMapper, int partitionId); + ShardSpec complete(ObjectMapper objectMapper, int partitionId, int numCorePartitions); /** * Returns the class of the shardSpec created by this factory. */ @JsonIgnore Class getShardSpecClass(); + + /** + * Returns true if this partialShardSpec needs a partitionId of a non-root generation. + * Any partialShardSpec to overwrite a subset of segments in a time chunk such as + * {@link NumberedOverwritePartialShardSpec} should return true. + * + * + * @see PartitionIds + */ + default boolean useNonRootGenerationPartitionSpace() + { + return false; + } } diff --git a/core/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java index 8f59d3959e72..f954f4dc430a 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java @@ -125,8 +125,14 @@ default short getAtomicUpdateGroupSize() boolean possibleInDomain(Map> domain); /** - * Returns true if two segments of this and other shardSpecs can exist in the same time chunk. + * Returns true if this shardSpec and the given {@link PartialShardSpec} share the same partition space. + * All shardSpecs except {@link OverwriteShardSpec} use the root-generation partition space and thus share the same + * space. + * + * @see PartitionIds */ - @JsonIgnore - boolean isCompatible(Class other); + default boolean sharePartitionSpace(PartialShardSpec partialShardSpec) + { + return !partialShardSpec.useNonRootGenerationPartitionSpace(); + } } diff --git a/core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionPartialShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionPartialShardSpec.java index e2ebd295c356..9638c7161b02 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionPartialShardSpec.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionPartialShardSpec.java @@ -85,34 +85,9 @@ public int getNumBuckets() } @Override - public ShardSpec complete(ObjectMapper objectMapper, @Nullable ShardSpec specOfPreviousMaxPartitionId) + public ShardSpec complete(ObjectMapper objectMapper, int partitionId, int numCorePartitions) { - // The shardSpec is created by the Overlord. - // For batch tasks, this code can be executed only with segment locking (forceTimeChunkLock = false). - // In this mode, you can have 2 or more tasks concurrently ingesting into the same time chunk of - // the same datasource. Since there is no restriction for those tasks in segment allocation, the - // allocated IDs for each task can interleave. As a result, the core partition set cannot be - // represented as a range. We always set 0 for the core partition set size if this is an initial segment. - return new SingleDimensionShardSpec( - partitionDimension, - start, - end, - specOfPreviousMaxPartitionId == null ? 0 : specOfPreviousMaxPartitionId.getPartitionNum() + 1, - specOfPreviousMaxPartitionId == null ? 0 : specOfPreviousMaxPartitionId.getNumCorePartitions() - ); - } - - @Override - public ShardSpec complete(ObjectMapper objectMapper, int partitionId) - { - // TODO: bucketId and numBuckets should be added to SingleDimensionShardSpec in a follow-up PR. - return new SingleDimensionShardSpec( - partitionDimension, - start, - end, - partitionId, - 0 - ); + return new SingleDimensionShardSpec(partitionDimension, start, end, partitionId, numCorePartitions); } @Override diff --git a/core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionShardSpec.java index 1a00534d6ce8..8deef57da3ef 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionShardSpec.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionShardSpec.java @@ -71,17 +71,6 @@ public SingleDimensionShardSpec( this.numCorePartitions = numCorePartitions == null ? UNKNOWN_NUM_CORE_PARTITIONS : numCorePartitions; } - public SingleDimensionShardSpec withNumCorePartitions(int numCorePartitions) - { - return new SingleDimensionShardSpec( - dimension, - start, - end, - partitionNum, - numCorePartitions - ); - } - @JsonProperty("dimension") public String getDimension() { @@ -165,12 +154,6 @@ public boolean possibleInDomain(Map> domain) return !rangeSet.subRangeSet(getRange()).isEmpty(); } - @Override - public boolean isCompatible(Class other) - { - return other == SingleDimensionShardSpec.class; - } - @Override public PartitionChunk createChunk(T obj) { diff --git a/core/src/test/java/org/apache/druid/segment/SegmentUtilsTest.java b/core/src/test/java/org/apache/druid/segment/SegmentUtilsTest.java index b758342252da..945d4f97039e 100644 --- a/core/src/test/java/org/apache/druid/segment/SegmentUtilsTest.java +++ b/core/src/test/java/org/apache/druid/segment/SegmentUtilsTest.java @@ -19,8 +19,14 @@ package org.apache.druid.segment; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.primitives.Ints; import org.apache.commons.io.FileUtils; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.joda.time.Interval; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -28,6 +34,7 @@ import java.io.File; import java.io.IOException; +import java.util.List; /** */ @@ -57,4 +64,51 @@ public void testException() throws Exception { SegmentUtils.getVersionFromDir(tempFolder.newFolder()); } + + @Test + public void testGroupSegmentsByInterval() + { + final List segments = ImmutableList.of( + newSegment(Intervals.of("2020-01-01/P1D"), 0), + newSegment(Intervals.of("2020-01-02/P1D"), 0), + newSegment(Intervals.of("2020-01-01/P1D"), 1), + newSegment(Intervals.of("2020-01-03/P1D"), 0), + newSegment(Intervals.of("2020-01-02/P1D"), 1), + newSegment(Intervals.of("2020-01-02/P1D"), 2) + ); + Assert.assertEquals( + ImmutableMap.of( + Intervals.of("2020-01-01/P1D"), + ImmutableList.of( + newSegment(Intervals.of("2020-01-01/P1D"), 0), + newSegment(Intervals.of("2020-01-01/P1D"), 1) + ), + Intervals.of("2020-01-02/P1D"), + ImmutableList.of( + newSegment(Intervals.of("2020-01-02/P1D"), 0), + newSegment(Intervals.of("2020-01-02/P1D"), 1), + newSegment(Intervals.of("2020-01-02/P1D"), 2) + ), + Intervals.of("2020-01-03/P1D"), + ImmutableList.of(newSegment(Intervals.of("2020-01-03/P1D"), 0)) + ), + SegmentUtils.groupSegmentsByInterval(segments) + ); + } + + private static DataSegment newSegment(Interval interval, int partitionId) + { + return new DataSegment( + "datasource", + interval, + "version", + null, + ImmutableList.of("dim"), + ImmutableList.of("met"), + new NumberedShardSpec(partitionId, 0), + null, + 9, + 10L + ); + } } diff --git a/core/src/test/java/org/apache/druid/timeline/DataSegmentTest.java b/core/src/test/java/org/apache/druid/timeline/DataSegmentTest.java index c2b0b76e8881..2c790f55f9ca 100644 --- a/core/src/test/java/org/apache/druid/timeline/DataSegmentTest.java +++ b/core/src/test/java/org/apache/druid/timeline/DataSegmentTest.java @@ -97,12 +97,6 @@ public boolean possibleInDomain(Map> domain) { return true; } - - @Override - public boolean isCompatible(Class other) - { - return false; - } }; } diff --git a/core/src/test/java/org/apache/druid/timeline/partition/BuildingHashBasedNumberedShardSpecTest.java b/core/src/test/java/org/apache/druid/timeline/partition/BuildingHashBasedNumberedShardSpecTest.java index 2c052d51c228..287e0dcb0046 100644 --- a/core/src/test/java/org/apache/druid/timeline/partition/BuildingHashBasedNumberedShardSpecTest.java +++ b/core/src/test/java/org/apache/druid/timeline/partition/BuildingHashBasedNumberedShardSpecTest.java @@ -31,7 +31,7 @@ public class BuildingHashBasedNumberedShardSpecTest { private final ObjectMapper mapper = ShardSpecTestUtils.initObjectMapper(); - + @Test public void testConvert() { diff --git a/core/src/test/java/org/apache/druid/timeline/partition/HashBasedNumberedPartialShardSpecTest.java b/core/src/test/java/org/apache/druid/timeline/partition/HashBasedNumberedPartialShardSpecTest.java index 551992bdb2d6..ba2cb7e50365 100644 --- a/core/src/test/java/org/apache/druid/timeline/partition/HashBasedNumberedPartialShardSpecTest.java +++ b/core/src/test/java/org/apache/druid/timeline/partition/HashBasedNumberedPartialShardSpecTest.java @@ -30,7 +30,7 @@ public class HashBasedNumberedPartialShardSpecTest { - private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final ObjectMapper MAPPER = ShardSpecTestUtils.initObjectMapper(); @Test public void testEquals() @@ -73,5 +73,21 @@ public void testJsonPropertyNames() throws IOException Assert.assertEquals(expected.getPartitionDimensions(), map.get("partitionDimensions")); Assert.assertEquals(expected.getBucketId(), map.get("bucketId")); Assert.assertEquals(expected.getNumBuckets(), map.get("numPartitions")); + Assert.assertEquals(expected.getBucketId(), map.get("bucketId")); + } + + @Test + public void testComplete() + { + final HashBasedNumberedPartialShardSpec partialShardSpec = new HashBasedNumberedPartialShardSpec( + ImmutableList.of("dim"), + 2, + 4 + ); + final ShardSpec shardSpec = partialShardSpec.complete(MAPPER, 1, 3); + Assert.assertEquals( + new HashBasedNumberedShardSpec(1, 3, 2, 4, ImmutableList.of("dim"), MAPPER), + shardSpec + ); } } diff --git a/core/src/test/java/org/apache/druid/timeline/partition/NumberedOverwritePartialShardSpecTest.java b/core/src/test/java/org/apache/druid/timeline/partition/NumberedOverwritePartialShardSpecTest.java new file mode 100644 index 000000000000..c212f12a37fb --- /dev/null +++ b/core/src/test/java/org/apache/druid/timeline/partition/NumberedOverwritePartialShardSpecTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.timeline.partition; + +import org.junit.Assert; +import org.junit.Test; + +public class NumberedOverwritePartialShardSpecTest +{ + @Test + public void testUseNonRootGenerationPartitionSpace() + { + final NumberedOverwritePartialShardSpec partialShardSpec = new NumberedOverwritePartialShardSpec(0, 1, (short) 1); + Assert.assertTrue(partialShardSpec.useNonRootGenerationPartitionSpace()); + } + + @Test + public void tetsGetShardSpecClass() + { + final NumberedOverwritePartialShardSpec partialShardSpec = new NumberedOverwritePartialShardSpec(0, 1, (short) 1); + Assert.assertSame(NumberedOverwriteShardSpec.class, partialShardSpec.getShardSpecClass()); + } +} diff --git a/core/src/test/java/org/apache/druid/timeline/partition/NumberedOverwriteShardSpecTest.java b/core/src/test/java/org/apache/druid/timeline/partition/NumberedOverwriteShardSpecTest.java index c6d7935b5699..5efe592fabf4 100644 --- a/core/src/test/java/org/apache/druid/timeline/partition/NumberedOverwriteShardSpecTest.java +++ b/core/src/test/java/org/apache/druid/timeline/partition/NumberedOverwriteShardSpecTest.java @@ -50,4 +50,20 @@ public void testSerde() throws JsonProcessingException final NumberedOverwriteShardSpec fromJson = (NumberedOverwriteShardSpec) mapper.readValue(json, ShardSpec.class); Assert.assertEquals(original, fromJson); } + + @Test + public void testSharePartitionSpace() + { + final NumberedOverwriteShardSpec shardSpec = new NumberedOverwriteShardSpec( + PartitionIds.NON_ROOT_GEN_START_PARTITION_ID, + 0, + 3, + (short) 1, + (short) 1 + ); + Assert.assertFalse(shardSpec.sharePartitionSpace(NumberedPartialShardSpec.instance())); + Assert.assertFalse(shardSpec.sharePartitionSpace(new HashBasedNumberedPartialShardSpec(null, 0, 1))); + Assert.assertFalse(shardSpec.sharePartitionSpace(new SingleDimensionPartialShardSpec("dim", 0, null, null, 1))); + Assert.assertTrue(shardSpec.sharePartitionSpace(new NumberedOverwritePartialShardSpec(0, 2, 1))); + } } diff --git a/core/src/test/java/org/apache/druid/timeline/partition/NumberedPartialShardSpecTest.java b/core/src/test/java/org/apache/druid/timeline/partition/NumberedPartialShardSpecTest.java new file mode 100644 index 000000000000..8d6007868b9d --- /dev/null +++ b/core/src/test/java/org/apache/druid/timeline/partition/NumberedPartialShardSpecTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.timeline.partition; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class NumberedPartialShardSpecTest +{ + @Test + public void testSerde() throws IOException + { + final NumberedPartialShardSpec expected = NumberedPartialShardSpec.instance(); + final ObjectMapper mapper = ShardSpecTestUtils.initObjectMapper(); + final byte[] json = mapper.writeValueAsBytes(expected); + final PartialShardSpec fromJson = mapper.readValue(json, PartialShardSpec.class); + Assert.assertSame(NumberedPartialShardSpec.class, fromJson.getClass()); + } + + @Test + public void testComplete() + { + final NumberedPartialShardSpec partialShardSpec = NumberedPartialShardSpec.instance(); + final ShardSpec shardSpec = partialShardSpec.complete(new ObjectMapper(), 1, 3); + Assert.assertEquals(new NumberedShardSpec(1, 3), shardSpec); + } +} diff --git a/core/src/test/java/org/apache/druid/timeline/partition/PartitionHolderCompletenessTest.java b/core/src/test/java/org/apache/druid/timeline/partition/PartitionHolderCompletenessTest.java index 38b9a47854f2..bf34aaa6aaa8 100644 --- a/core/src/test/java/org/apache/druid/timeline/partition/PartitionHolderCompletenessTest.java +++ b/core/src/test/java/org/apache/druid/timeline/partition/PartitionHolderCompletenessTest.java @@ -38,8 +38,8 @@ public static Iterable constructorFeeder() return ImmutableList.of( new Object[]{ ImmutableList.of( - new NumberedShardSpec(0, 3), new NumberedShardSpec(1, 3), + new NumberedShardSpec(0, 3), new NumberedShardSpec(2, 3) ), NumberedShardSpec.class.getSimpleName() @@ -47,9 +47,9 @@ public static Iterable constructorFeeder() new Object[]{ // Simulate empty hash buckets ImmutableList.of( + new HashBasedNumberedShardSpec(2, 3, 3, 5, null, new ObjectMapper()), new HashBasedNumberedShardSpec(0, 3, 0, 5, null, new ObjectMapper()), - new HashBasedNumberedShardSpec(1, 3, 2, 5, null, new ObjectMapper()), - new HashBasedNumberedShardSpec(2, 3, 3, 5, null, new ObjectMapper()) + new HashBasedNumberedShardSpec(1, 3, 2, 5, null, new ObjectMapper()) ), HashBasedNumberedShardSpec.class.getSimpleName() }, @@ -57,8 +57,8 @@ public static Iterable constructorFeeder() // Simulate empty range buckets ImmutableList.of( new SingleDimensionShardSpec("dim", null, "aaa", 0, 3), - new SingleDimensionShardSpec("dim", "bbb", "fff", 1, 3), - new SingleDimensionShardSpec("dim", "ttt", "zzz", 2, 3) + new SingleDimensionShardSpec("dim", "ttt", "zzz", 2, 3), + new SingleDimensionShardSpec("dim", "bbb", "fff", 1, 3) ), StringUtils.format( "%s with empty buckets", @@ -68,9 +68,9 @@ public static Iterable constructorFeeder() new Object[]{ // Simulate old format segments with missing numCorePartitions ImmutableList.of( - new SingleDimensionShardSpec("dim", null, "bbb", 0, null), new SingleDimensionShardSpec("dim", "bbb", "fff", 1, null), - new SingleDimensionShardSpec("dim", "fff", null, 2, null) + new SingleDimensionShardSpec("dim", "fff", null, 2, null), + new SingleDimensionShardSpec("dim", null, "bbb", 0, null) ), StringUtils.format( "%s with missing numCorePartitions", diff --git a/core/src/test/java/org/apache/druid/timeline/partition/SingleDimensionPartialShardSpecTest.java b/core/src/test/java/org/apache/druid/timeline/partition/SingleDimensionPartialShardSpecTest.java index 4b2e52aa6d6d..df1471b6a0f3 100644 --- a/core/src/test/java/org/apache/druid/timeline/partition/SingleDimensionPartialShardSpecTest.java +++ b/core/src/test/java/org/apache/druid/timeline/partition/SingleDimensionPartialShardSpecTest.java @@ -47,7 +47,7 @@ public void testSerde() throws IOException "end", 10 ); - final ObjectMapper mapper = new ObjectMapper(); + final ObjectMapper mapper = ShardSpecTestUtils.initObjectMapper(); final byte[] json = mapper.writeValueAsBytes(expected); final SingleDimensionPartialShardSpec fromJson = (SingleDimensionPartialShardSpec) mapper.readValue( json, @@ -55,4 +55,18 @@ public void testSerde() throws IOException ); Assert.assertEquals(expected, fromJson); } + + @Test + public void testComplete() + { + final SingleDimensionPartialShardSpec partialShardSpec = new SingleDimensionPartialShardSpec( + "dim", + 2, + "end2", + null, + 3 + ); + final ShardSpec shardSpec = partialShardSpec.complete(new ObjectMapper(), 1, 2); + Assert.assertEquals(new SingleDimensionShardSpec("dim", "end2", null, 1, 2), shardSpec); + } } diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index 799b32e0fbad..c366be0dff69 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -189,7 +189,7 @@ that range if there's some stray data with unexpected timestamps. |--------|-----------|-------|---------| |type|The task type, this should always be `index_parallel`.|none|yes| |inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to parse input data.|none|yes| -|appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This will only work if the existing segment set has extendable-type shardSpecs.|false|no| +|appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. The current limitation is that you can append to any datasources regardless of their original partitioning scheme, but the appended segments should be partitioned using the `dynamic` partitionsSpec.|false|no| ### `tuningConfig` @@ -693,7 +693,7 @@ that range if there's some stray data with unexpected timestamps. |--------|-----------|-------|---------| |type|The task type, this should always be "index".|none|yes| |inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to parse input data.|none|yes| -|appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This will only work if the existing segment set has extendable-type shardSpecs.|false|no| +|appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. The current limitation is that you can append to any datasources regardless of their original partitioning scheme, but the appended segments should be partitioned using the `dynamic` partitionsSpec.|false|no| ### `tuningConfig` diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java index a77fe4092088..64f19f61752c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java @@ -59,6 +59,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -226,8 +227,11 @@ boolean determineLockGranularityandTryLock(TaskActionClient client, List segments) - throws IOException + boolean determineLockGranularityandTryLockWithSegments( + TaskActionClient client, + List segments, + BiConsumer> segmentCheckFunction + ) throws IOException { final boolean forceTimeChunkLock = getContextValue( Tasks.FORCE_TIME_CHUNK_LOCK_KEY, @@ -236,6 +240,7 @@ boolean determineLockGranularityandTryLockWithSegments(TaskActionClient client, if (forceTimeChunkLock) { log.info("[%s] is set to true in task context. Use timeChunk lock", Tasks.FORCE_TIME_CHUNK_LOCK_KEY); taskLockHelper = new TaskLockHelper(false); + segmentCheckFunction.accept(LockGranularity.TIME_CHUNK, segments); return tryTimeChunkLock( client, new ArrayList<>(segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet())) @@ -243,6 +248,7 @@ boolean determineLockGranularityandTryLockWithSegments(TaskActionClient client, } else { final LockGranularityDetermineResult result = determineSegmentGranularity(segments); taskLockHelper = new TaskLockHelper(result.lockGranularity == LockGranularity.SEGMENT); + segmentCheckFunction.accept(result.lockGranularity, segments); return tryLockWithDetermineResult(client, result); } } @@ -363,7 +369,7 @@ private LockGranularityDetermineResult determineSegmentGranularity(List latestSegments); + boolean validateSegments(LockGranularity lockGranularityInUse, List latestSegments); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIntervalSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIntervalSpec.java index 66304424edda..40a4d775e031 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIntervalSpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIntervalSpec.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.segment.SegmentUtils; @@ -76,7 +77,7 @@ public Interval findInterval(String dataSource) } @Override - public boolean validateSegments(List latestSegments) + public boolean validateSegments(LockGranularity lockGranularityInUse, List latestSegments) { final Interval segmentsInterval = JodaUtils.umbrellaInterval( latestSegments.stream().map(DataSegment::getInterval).collect(Collectors.toList()) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 59c9019e6397..3ce3f31eb8e7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -47,6 +47,7 @@ import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; +import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.SegmentLoaderFactory; import org.apache.druid.indexing.common.TaskToolbox; @@ -342,8 +343,8 @@ public int getPriority() @Override public boolean isReady(TaskActionClient taskActionClient) throws Exception { - final List segments = segmentProvider.checkAndGetSegments(taskActionClient); - return determineLockGranularityandTryLockWithSegments(taskActionClient, segments); + final List segments = segmentProvider.findSegments(taskActionClient); + return determineLockGranularityandTryLockWithSegments(taskActionClient, segments, segmentProvider::checkSegments); } @Override @@ -372,6 +373,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception { final List ingestionSpecs = createIngestionSchema( toolbox, + getTaskLockHelper().getLockGranularityToUse(), segmentProvider, partitionConfigurationManager, dimensionsSpec, @@ -480,6 +482,7 @@ private String createIndexTaskSpecId(int i) @VisibleForTesting static List createIngestionSchema( final TaskToolbox toolbox, + final LockGranularity lockGranularityInUse, final SegmentProvider segmentProvider, final PartitionConfigurationManager partitionConfigurationManager, @Nullable final DimensionsSpec dimensionsSpec, @@ -493,7 +496,8 @@ static List createIngestionSchema( { Pair, List>> pair = prepareSegments( toolbox, - segmentProvider + segmentProvider, + lockGranularityInUse ); final Map segmentFileMap = pair.lhs; final List> timelineSegments = pair.rhs; @@ -631,10 +635,12 @@ private static ParallelIndexIOConfig createIoConfig( private static Pair, List>> prepareSegments( TaskToolbox toolbox, - SegmentProvider segmentProvider + SegmentProvider segmentProvider, + LockGranularity lockGranularityInUse ) throws IOException, SegmentLoadingException { - final List usedSegments = segmentProvider.checkAndGetSegments(toolbox.getTaskActionClient()); + final List usedSegments = segmentProvider.findSegments(toolbox.getTaskActionClient()); + segmentProvider.checkSegments(lockGranularityInUse, usedSegments); final Map segmentFileMap = toolbox.fetchSegments(usedSegments); final List> timelineSegments = VersionedIntervalTimeline .forSegments(usedSegments) @@ -852,19 +858,21 @@ static class SegmentProvider this.interval = inputSpec.findInterval(dataSource); } - List checkAndGetSegments(TaskActionClient actionClient) throws IOException + List findSegments(TaskActionClient actionClient) throws IOException { - final List latestSegments = new ArrayList<>( + return new ArrayList<>( actionClient.submit(new RetrieveUsedSegmentsAction(dataSource, interval, null, Segments.ONLY_VISIBLE)) ); + } - if (!inputSpec.validateSegments(latestSegments)) { + void checkSegments(LockGranularity lockGranularityInUse, List latestSegments) + { + if (!inputSpec.validateSegments(lockGranularityInUse, latestSegments)) { throw new ISE( "Specified segments in the spec are different from the current used segments. " + "Possibly new segments would have been added or some segments have been unpublished." ); } - return latestSegments; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 89d69604be26..70255203b855 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -1196,7 +1196,7 @@ public static class IndexTuningConfig implements TuningConfig, AppenderatorConfi private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory; @Nullable - private static PartitionsSpec getDefaultPartitionsSpec( + private static PartitionsSpec getPartitionsSpec( boolean forceGuaranteedRollup, @Nullable PartitionsSpec partitionsSpec, @Nullable Integer maxRowsPerSegment, @@ -1224,11 +1224,11 @@ private static PartitionsSpec getDefaultPartitionsSpec( } else { if (forceGuaranteedRollup) { if (!partitionsSpec.isForceGuaranteedRollupCompatibleType()) { - throw new ISE(partitionsSpec.getClass().getSimpleName() + " cannot be used for perfect rollup"); + throw new IAE(partitionsSpec.getClass().getSimpleName() + " cannot be used for perfect rollup"); } } else { if (!(partitionsSpec instanceof DynamicPartitionsSpec)) { - throw new ISE("DynamicPartitionsSpec must be used for best-effort rollup"); + throw new IAE("DynamicPartitionsSpec must be used for best-effort rollup"); } } return partitionsSpec; @@ -1263,7 +1263,7 @@ public IndexTuningConfig( this( maxRowsInMemory != null ? maxRowsInMemory : rowFlushBoundary_forBackCompatibility, maxBytesInMemory != null ? maxBytesInMemory : 0, - getDefaultPartitionsSpec( + getPartitionsSpec( forceGuaranteedRollup == null ? DEFAULT_GUARANTEE_ROLLUP : forceGuaranteedRollup, partitionsSpec, maxRowsPerSegment == null ? targetPartitionSize : maxRowsPerSegment, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SpecificSegmentsSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SpecificSegmentsSpec.java index 71da54bc8241..7b26c22a4a54 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SpecificSegmentsSpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SpecificSegmentsSpec.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; @@ -73,14 +74,18 @@ public Interval findInterval(String dataSource) } @Override - public boolean validateSegments(List latestSegments) + public boolean validateSegments(LockGranularity lockGranularityInUse, List latestSegments) { final List thoseSegments = latestSegments .stream() .map(segment -> segment.getId().toString()) .sorted() .collect(Collectors.toList()); - return this.segments.equals(thoseSegments); + if (lockGranularityInUse == LockGranularity.TIME_CHUNK) { + return this.segments.equals(thoseSegments); + } else { + return thoseSegments.containsAll(segments); + } } @Override diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskLockHelper.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskLockHelper.java index 92c097500423..af80d8f9dce5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskLockHelper.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskLockHelper.java @@ -182,7 +182,7 @@ private void verifySegmentGranularity(List segments) private boolean tryLockSegments(TaskActionClient actionClient, List segments) throws IOException { - final Map> intervalToSegments = groupSegmentsByInterval(segments); + final Map> intervalToSegments = SegmentUtils.groupSegmentsByInterval(segments); final Closer lockCloserOnError = Closer.create(); for (Entry> entry : intervalToSegments.entrySet()) { final Interval interval = entry.getKey(); @@ -304,13 +304,4 @@ public static void verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(List< throw new ISE("All atomicUpdateGroup must be compacted together"); } } - - private static Map> groupSegmentsByInterval(List segments) - { - final Map> map = new HashMap<>(); - for (DataSegment segment : segments) { - map.computeIfAbsent(segment.getInterval(), k -> new ArrayList<>()).add(segment); - } - return map; - } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index bed85dea6926..28667acbfa4e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -195,7 +195,7 @@ public ParallelIndexSupervisorTask( this.ingestionSchema = ingestionSchema; - if (ingestionSchema.getTuningConfig().isForceGuaranteedRollup()) { + if (isGuaranteedRollup(ingestionSchema.getIOConfig(), ingestionSchema.getTuningConfig())) { checkPartitionsSpecForForceGuaranteedRollup(ingestionSchema.getTuningConfig().getGivenOrDefaultPartitionsSpec()); if (ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals().isEmpty()) { @@ -429,7 +429,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception if (isParallelMode()) { this.toolbox = toolbox; - if (getIngestionSchema().getTuningConfig().isForceGuaranteedRollup()) { + if (isGuaranteedRollup(ingestionSchema.getIOConfig(), ingestionSchema.getTuningConfig())) { return runMultiPhaseParallel(toolbox); } else { return runSinglePhaseParallel(toolbox); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java index ce1e1fc1fccd..369908706335 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java @@ -47,7 +47,6 @@ import org.apache.druid.timeline.partition.NumberedShardSpec; import org.apache.druid.timeline.partition.PartialShardSpec; import org.apache.druid.timeline.partition.ShardSpec; -import org.apache.druid.timeline.partition.SingleDimensionShardSpec; import org.easymock.EasyMock; import org.joda.time.DateTime; import org.joda.time.Period; @@ -869,37 +868,6 @@ public void testCannotDoAnythingWithSillyQueryGranularity() Assert.assertNull(id1); } - @Test - public void testCannotAddToExistingSingleDimensionShardSpecs() throws Exception - { - final Task task = NoopTask.create(); - - taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments( - ImmutableSet.of( - DataSegment.builder() - .dataSource(DATA_SOURCE) - .interval(Granularities.HOUR.bucket(PARTY_TIME)) - .version(PARTY_TIME.toString()) - .shardSpec(new SingleDimensionShardSpec("foo", null, "bar", 0, 2)) - .size(0) - .build(), - DataSegment.builder() - .dataSource(DATA_SOURCE) - .interval(Granularities.HOUR.bucket(PARTY_TIME)) - .version(PARTY_TIME.toString()) - .shardSpec(new SingleDimensionShardSpec("foo", "bar", null, 1, 2)) - .size(0) - .build() - ) - ); - - taskActionTestKit.getTaskLockbox().add(task); - - final SegmentIdWithShardSpec id1 = allocate(task, PARTY_TIME, Granularities.NONE, Granularities.HOUR, "s1", null); - - Assert.assertNull(id1); - } - @Test public void testWithPartialShardSpecAndOvershadowingSegments() throws IOException { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionInputSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionInputSpecTest.java index 543428fe6acf..8c0db006bbfb 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionInputSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionInputSpecTest.java @@ -20,6 +20,7 @@ package org.apache.druid.indexing.common.task; import com.google.common.collect.ImmutableList; +import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.segment.SegmentUtils; @@ -101,7 +102,9 @@ public void testFindInterval() @Test public void testValidateSegments() { - Assert.assertTrue(inputSpec.validateSegments(SEGMENTS)); + Assert.assertTrue(inputSpec.validateSegments(LockGranularity.TIME_CHUNK, SEGMENTS)); + Assert.assertTrue(inputSpec.validateSegments(LockGranularity.SEGMENT, SEGMENTS)); + Assert.assertFalse(inputSpec.validateSegments(LockGranularity.SEGMENT, SEGMENTS.subList(0, SEGMENTS.size() - 1))); } @Test @@ -109,10 +112,10 @@ public void testValidateWrongSegments() { final List someSegmentIsMissing = new ArrayList<>(SEGMENTS); someSegmentIsMissing.remove(0); - Assert.assertFalse(inputSpec.validateSegments(someSegmentIsMissing)); + Assert.assertFalse(inputSpec.validateSegments(LockGranularity.TIME_CHUNK, someSegmentIsMissing)); final List someSegmentIsUnknown = new ArrayList<>(SEGMENTS); someSegmentIsUnknown.add(newSegment(Intervals.of("2018-01-01/2018-01-02"))); - Assert.assertFalse(inputSpec.validateSegments(someSegmentIsUnknown)); + Assert.assertFalse(inputSpec.validateSegments(LockGranularity.TIME_CHUNK, someSegmentIsUnknown)); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java index 8c3c5ffe3b26..8e078e4aee2d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java @@ -20,27 +20,47 @@ package org.apache.druid.indexing.common.task; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.io.Files; import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.SegmentsSplitHintSpec; +import org.apache.druid.data.input.impl.CsvInputFormat; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.LocalInputSource; +import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.indexer.TaskState; +import org.apache.druid.indexer.partitions.HashedPartitionsSpec; +import org.apache.druid.indexer.partitions.PartitionsSpec; +import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.RetryPolicyConfig; import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; +import org.apache.druid.indexing.common.task.CompactionTask.Builder; import org.apache.druid.indexing.common.task.batch.parallel.AbstractParallelIndexSupervisorTaskTest; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig; import org.apache.druid.indexing.firehose.WindowedSegmentId; import org.apache.druid.indexing.input.DruidInputSource; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.segment.SegmentUtils; +import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider; import org.apache.druid.server.security.AuthTestUtils; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.NumberedOverwriteShardSpec; +import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.apache.druid.timeline.partition.PartitionIds; +import org.apache.druid.timeline.partition.ShardSpec; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Before; @@ -48,12 +68,17 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import javax.annotation.Nullable; import java.io.BufferedWriter; import java.io.File; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import java.util.Set; @RunWith(Parameterized.class) @@ -76,6 +101,8 @@ public static Iterable constructorFeeder() private final LockGranularity lockGranularity; private final RowIngestionMetersFactory rowIngestionMetersFactory; + private File inputDir; + public CompactionTaskParallelRunTest(LockGranularity lockGranularity) { this.lockGranularity = lockGranularity; @@ -83,29 +110,110 @@ public CompactionTaskParallelRunTest(LockGranularity lockGranularity) } @Before - public void setup() + public void setup() throws IOException { getObjectMapper().registerSubtypes(ParallelIndexTuningConfig.class, DruidInputSource.class); + + inputDir = temporaryFolder.newFolder(); + final File tmpFile = File.createTempFile("druid", "index", inputDir); + + try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + writer.write("2014-01-01T00:00:10Z,a,1\n"); + writer.write("2014-01-01T00:00:10Z,b,2\n"); + writer.write("2014-01-01T00:00:10Z,c,3\n"); + writer.write("2014-01-01T01:00:20Z,a,1\n"); + writer.write("2014-01-01T01:00:20Z,b,2\n"); + writer.write("2014-01-01T01:00:20Z,c,3\n"); + writer.write("2014-01-01T02:00:30Z,a,1\n"); + writer.write("2014-01-01T02:00:30Z,b,2\n"); + writer.write("2014-01-01T02:00:30Z,c,3\n"); + } } @Test - public void testRunParallel() throws Exception + public void testRunParallel() { - runIndexTask(); + runIndexTask(null, true); - final CompactionTask compactionTask = new CompactionTask( - null, - null, + final Builder builder = new Builder( DATA_SOURCE, + getObjectMapper(), + AuthTestUtils.TEST_AUTHORIZER_MAPPER, + new NoopChatHandlerProvider(), + rowIngestionMetersFactory, null, null, - new CompactionIOConfig(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null)), - null, - null, - null, + getSegmentLoaderFactory(), + RETRY_POLICY_FACTORY, + appenderatorsManager + ); + final CompactionTask compactionTask = builder + .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null)) + .tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING) + .build(); + + runTask(compactionTask); + } + + @Test + public void testCompactHashAndDynamicPartitionedSegments() + { + runIndexTask(new HashedPartitionsSpec(null, 2, null), false); + runIndexTask(null, true); + final Builder builder = new Builder( + DATA_SOURCE, + getObjectMapper(), + AuthTestUtils.TEST_AUTHORIZER_MAPPER, + new NoopChatHandlerProvider(), + rowIngestionMetersFactory, null, - newTuningConfig(), null, + getSegmentLoaderFactory(), + RETRY_POLICY_FACTORY, + appenderatorsManager + ); + final CompactionTask compactionTask = builder + .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null)) + .tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING) + .build(); + + final Map> intervalToSegments = SegmentUtils.groupSegmentsByInterval( + runTask(compactionTask) + ); + Assert.assertEquals(3, intervalToSegments.size()); + Assert.assertEquals( + ImmutableSet.of( + Intervals.of("2014-01-01T00/PT1H"), + Intervals.of("2014-01-01T01/PT1H"), + Intervals.of("2014-01-01T02/PT1H") + ), + intervalToSegments.keySet() + ); + for (Entry> entry : intervalToSegments.entrySet()) { + final List segmentsInInterval = entry.getValue(); + Assert.assertEquals(1, segmentsInInterval.size()); + final ShardSpec shardSpec = segmentsInInterval.get(0).getShardSpec(); + if (lockGranularity == LockGranularity.TIME_CHUNK) { + Assert.assertSame(NumberedShardSpec.class, shardSpec.getClass()); + final NumberedShardSpec numberedShardSpec = (NumberedShardSpec) shardSpec; + Assert.assertEquals(0, numberedShardSpec.getPartitionNum()); + Assert.assertEquals(1, numberedShardSpec.getNumCorePartitions()); + } else { + Assert.assertSame(NumberedOverwriteShardSpec.class, shardSpec.getClass()); + final NumberedOverwriteShardSpec numberedShardSpec = (NumberedOverwriteShardSpec) shardSpec; + Assert.assertEquals(PartitionIds.NON_ROOT_GEN_START_PARTITION_ID, numberedShardSpec.getPartitionNum()); + Assert.assertEquals(1, numberedShardSpec.getAtomicUpdateGroupSize()); + } + } + } + + @Test + public void testCompactRangeAndDynamicPartitionedSegments() + { + runIndexTask(new SingleDimensionPartitionsSpec(2, null, "dim", false), false); + runIndexTask(null, true); + final Builder builder = new Builder( + DATA_SOURCE, getObjectMapper(), AuthTestUtils.TEST_AUTHORIZER_MAPPER, new NoopChatHandlerProvider(), @@ -116,14 +224,45 @@ public void testRunParallel() throws Exception RETRY_POLICY_FACTORY, appenderatorsManager ); + final CompactionTask compactionTask = builder + .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null)) + .tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING) + .build(); - runTask(compactionTask); + final Map> intervalToSegments = SegmentUtils.groupSegmentsByInterval( + runTask(compactionTask) + ); + Assert.assertEquals(3, intervalToSegments.size()); + Assert.assertEquals( + ImmutableSet.of( + Intervals.of("2014-01-01T00/PT1H"), + Intervals.of("2014-01-01T01/PT1H"), + Intervals.of("2014-01-01T02/PT1H") + ), + intervalToSegments.keySet() + ); + for (Entry> entry : intervalToSegments.entrySet()) { + final List segmentsInInterval = entry.getValue(); + Assert.assertEquals(1, segmentsInInterval.size()); + final ShardSpec shardSpec = segmentsInInterval.get(0).getShardSpec(); + if (lockGranularity == LockGranularity.TIME_CHUNK) { + Assert.assertSame(NumberedShardSpec.class, shardSpec.getClass()); + final NumberedShardSpec numberedShardSpec = (NumberedShardSpec) shardSpec; + Assert.assertEquals(0, numberedShardSpec.getPartitionNum()); + Assert.assertEquals(1, numberedShardSpec.getNumCorePartitions()); + } else { + Assert.assertSame(NumberedOverwriteShardSpec.class, shardSpec.getClass()); + final NumberedOverwriteShardSpec numberedShardSpec = (NumberedOverwriteShardSpec) shardSpec; + Assert.assertEquals(PartitionIds.NON_ROOT_GEN_START_PARTITION_ID, numberedShardSpec.getPartitionNum()); + Assert.assertEquals(1, numberedShardSpec.getAtomicUpdateGroupSize()); + } + } } @Test - public void testDruidInputSourceCreateSplitsWithIndividualSplits() throws Exception + public void testDruidInputSourceCreateSplitsWithIndividualSplits() { - runIndexTask(); + runIndexTask(null, true); List>> splits = Lists.newArrayList( DruidInputSource.createSplits( @@ -152,42 +291,45 @@ public void testDruidInputSourceCreateSplitsWithIndividualSplits() throws Except Assert.assertEquals(segmentIdsFromCoordinator, segmentIdsFromSplits); } - private void runIndexTask() throws Exception + private void runIndexTask(@Nullable PartitionsSpec partitionsSpec, boolean appendToExisting) { - File tmpDir = temporaryFolder.newFolder(); - File tmpFile = File.createTempFile("druid", "index", tmpDir); - - try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { - writer.write("2014-01-01T00:00:10Z,a,1\n"); - writer.write("2014-01-01T00:00:10Z,b,2\n"); - writer.write("2014-01-01T00:00:10Z,c,3\n"); - writer.write("2014-01-01T01:00:20Z,a,1\n"); - writer.write("2014-01-01T01:00:20Z,b,2\n"); - writer.write("2014-01-01T01:00:20Z,c,3\n"); - writer.write("2014-01-01T02:00:30Z,a,1\n"); - writer.write("2014-01-01T02:00:30Z,b,2\n"); - writer.write("2014-01-01T02:00:30Z,c,3\n"); - } - - IndexTask indexTask = new IndexTask( - null, + ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig( null, - IndexTaskTest.createIngestionSpec( - getObjectMapper(), - tmpDir, - CompactionTaskRunTest.DEFAULT_PARSE_SPEC, + new LocalInputSource(inputDir, "druid*"), + new CsvInputFormat( + Arrays.asList("ts", "dim", "val"), + "|", null, - new UniformGranularitySpec( - Granularities.HOUR, - Granularities.MINUTE, + false, + 0 + ), + appendToExisting + ); + ParallelIndexTuningConfig tuningConfig = newTuningConfig(partitionsSpec, 2, !appendToExisting); + ParallelIndexSupervisorTask indexTask = new ParallelIndexSupervisorTask( + null, + null, + null, + new ParallelIndexIngestionSpec( + new DataSchema( + DATA_SOURCE, + new TimestampSpec("ts", "auto", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim"))), + new AggregatorFactory[]{new LongSumAggregatorFactory("val", "val")}, + new UniformGranularitySpec( + Granularities.HOUR, + Granularities.MINUTE, + ImmutableList.of(INTERVAL_TO_INDEX) + ), null ), - IndexTaskTest.createTuningConfig(2, 2, null, 2L, null, false, true), - false + ioConfig, + tuningConfig ), null, - AuthTestUtils.TEST_AUTHORIZER_MAPPER, + null, new NoopChatHandlerProvider(), + AuthTestUtils.TEST_AUTHORIZER_MAPPER, rowIngestionMetersFactory, appenderatorsManager ); @@ -195,41 +337,10 @@ private void runIndexTask() throws Exception runTask(indexTask); } - private void runTask(Task task) + private Set runTask(Task task) { task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK); Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode()); - } - - private static ParallelIndexTuningConfig newTuningConfig() - { - return new ParallelIndexTuningConfig( - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - 2, - null, - null, - null, - null, - null, - null, - null, - null, - null - ); + return getIndexingServiceClient().getPublishedSegments(task); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index 66ee8595d25d..dbcfd4ad251a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -52,6 +52,7 @@ import org.apache.druid.guice.IndexingServiceTuningConfigModule; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; +import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.RetryPolicyConfig; import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.SegmentLoaderFactory; @@ -579,6 +580,7 @@ public void testCreateIngestionSchema() throws IOException, SegmentLoadingExcept { final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, + LockGranularity.TIME_CHUNK, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), new PartitionConfigurationManager(TUNING_CONFIG), null, @@ -645,6 +647,7 @@ public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOExceptio ); final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, + LockGranularity.TIME_CHUNK, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), new PartitionConfigurationManager(tuningConfig), null, @@ -712,6 +715,7 @@ public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException, Segm ); final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, + LockGranularity.TIME_CHUNK, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), new PartitionConfigurationManager(tuningConfig), null, @@ -779,6 +783,7 @@ public void testCreateIngestionSchemaWithNumShards() throws IOException, Segment ); final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, + LockGranularity.TIME_CHUNK, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), new PartitionConfigurationManager(tuningConfig), null, @@ -840,6 +845,7 @@ public void testCreateIngestionSchemaWithCustomDimensionsSpec() throws IOExcepti final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, + LockGranularity.TIME_CHUNK, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), new PartitionConfigurationManager(TUNING_CONFIG), customSpec, @@ -881,6 +887,7 @@ public void testCreateIngestionSchemaWithCustomMetricsSpec() throws IOException, final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, + LockGranularity.TIME_CHUNK, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), new PartitionConfigurationManager(TUNING_CONFIG), null, @@ -915,6 +922,7 @@ public void testCreateIngestionSchemaWithCustomSegments() throws IOException, Se { final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, + LockGranularity.TIME_CHUNK, new SegmentProvider(DATA_SOURCE, SpecificSegmentsSpec.fromSegments(SEGMENTS)), new PartitionConfigurationManager(TUNING_CONFIG), null, @@ -955,6 +963,7 @@ public void testCreateIngestionSchemaWithDifferentSegmentSet() throws IOExceptio segments.remove(segments.size() / 2); CompactionTask.createIngestionSchema( toolbox, + LockGranularity.TIME_CHUNK, new SegmentProvider(DATA_SOURCE, SpecificSegmentsSpec.fromSegments(segments)), new PartitionConfigurationManager(TUNING_CONFIG), null, @@ -978,6 +987,7 @@ public void testMissingMetadata() throws IOException, SegmentLoadingException final List segments = new ArrayList<>(SEGMENTS); CompactionTask.createIngestionSchema( toolbox, + LockGranularity.TIME_CHUNK, new SegmentProvider(DATA_SOURCE, SpecificSegmentsSpec.fromSegments(segments)), new PartitionConfigurationManager(TUNING_CONFIG), null, @@ -1019,6 +1029,7 @@ public void testSegmentGranularity() throws IOException, SegmentLoadingException { final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, + LockGranularity.TIME_CHUNK, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), new PartitionConfigurationManager(TUNING_CONFIG), null, @@ -1054,6 +1065,7 @@ public void testNullSegmentGranularityAnd() throws IOException, SegmentLoadingEx { final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, + LockGranularity.TIME_CHUNK, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), new PartitionConfigurationManager(TUNING_CONFIG), null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskSerdeTest.java index 1a87fb01f76c..23f5748a39a9 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskSerdeTest.java @@ -188,7 +188,7 @@ public void testSerdeTuningConfigWithDeprecatedHashedPartitionsSpec() throws IOE @Test public void testForceGuaranteedRollupWithDynamicPartitionsSpec() { - expectedException.expect(IllegalStateException.class); + expectedException.expect(IllegalArgumentException.class); expectedException.expectMessage("DynamicPartitionsSpec cannot be used for perfect rollup"); final IndexTuningConfig tuningConfig = new IndexTuningConfig( null, @@ -222,7 +222,7 @@ public void testForceGuaranteedRollupWithDynamicPartitionsSpec() @Test public void testBestEffortRollupWithHashedPartitionsSpec() { - expectedException.expect(IllegalStateException.class); + expectedException.expect(IllegalArgumentException.class); expectedException.expectMessage("DynamicPartitionsSpec must be used for best-effort rollup"); final IndexTuningConfig tuningConfig = new IndexTuningConfig( null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java index 513c13c56e44..38bb83d51cf5 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java @@ -239,13 +239,13 @@ public void testDeterminePartitions() throws Exception Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(0).getInterval()); Assert.assertEquals(HashBasedNumberedShardSpec.class, segments.get(0).getShardSpec().getClass()); Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum()); - Assert.assertEquals(2, ((NumberedShardSpec) segments.get(0).getShardSpec()).getNumCorePartitions()); + Assert.assertEquals(2, segments.get(0).getShardSpec().getNumCorePartitions()); Assert.assertEquals("test", segments.get(1).getDataSource()); Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(1).getInterval()); Assert.assertEquals(HashBasedNumberedShardSpec.class, segments.get(1).getShardSpec().getClass()); Assert.assertEquals(1, segments.get(1).getShardSpec().getPartitionNum()); - Assert.assertEquals(2, ((NumberedShardSpec) segments.get(1).getShardSpec()).getNumCorePartitions()); + Assert.assertEquals(2, segments.get(1).getShardSpec().getNumCorePartitions()); } @Test diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java index 64949deefb34..966a4c827cb2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java @@ -21,7 +21,6 @@ import com.google.common.base.Preconditions; import org.apache.druid.data.input.InputFormat; -import org.apache.druid.data.input.MaxSizeSplitHintSpec; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.LocalInputSource; import org.apache.druid.data.input.impl.ParseSpec; @@ -29,10 +28,12 @@ import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; -import org.apache.druid.indexer.partitions.DimensionBasedPartitionsSpec; +import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.SegmentLoaderFactory; +import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Tasks; +import org.apache.druid.indexing.input.DruidInputSource; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.query.DefaultGenericQueryMetricsFactory; import org.apache.druid.query.QueryPlus; @@ -69,6 +70,8 @@ @SuppressWarnings("SameParameterValue") abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIndexSupervisorTaskTest { + protected static final String DATASOURCE = "dataSource"; + private static final ScanQueryRunnerFactory SCAN_QUERY_RUNNER_FACTORY = new ScanQueryRunnerFactory( new ScanQueryQueryToolChest( new ScanQueryConfig().setLegacy(false), @@ -85,6 +88,7 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn { this.lockGranularity = lockGranularity; this.useInputFormatApi = useInputFormatApi; + getObjectMapper().registerSubtypes(ParallelIndexTuningConfig.class, DruidInputSource.class); } boolean isUseInputFormatApi() @@ -100,10 +104,39 @@ Set runTestTask( Interval interval, File inputDir, String filter, - DimensionBasedPartitionsSpec partitionsSpec, + PartitionsSpec partitionsSpec, int maxNumConcurrentSubTasks, TaskState expectedTaskStatus ) + { + return runTestTask( + timestampSpec, + dimensionsSpec, + inputFormat, + parseSpec, + interval, + inputDir, + filter, + partitionsSpec, + maxNumConcurrentSubTasks, + expectedTaskStatus, + false + ); + } + + Set runTestTask( + @Nullable TimestampSpec timestampSpec, + @Nullable DimensionsSpec dimensionsSpec, + @Nullable InputFormat inputFormat, + @Nullable ParseSpec parseSpec, + Interval interval, + File inputDir, + String filter, + PartitionsSpec partitionsSpec, + int maxNumConcurrentSubTasks, + TaskState expectedTaskStatus, + boolean appendToExisting + ) { final ParallelIndexSupervisorTask task = newTask( timestampSpec, @@ -114,13 +147,14 @@ Set runTestTask( inputDir, filter, partitionsSpec, - maxNumConcurrentSubTasks + maxNumConcurrentSubTasks, + appendToExisting ); return runTask(task, expectedTaskStatus); } - Set runTask(ParallelIndexSupervisorTask task, TaskState expectedTaskStatus) + Set runTask(Task task, TaskState expectedTaskStatus) { task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK); TaskStatus taskStatus = getIndexingServiceClient().runAndWait(task); @@ -128,41 +162,6 @@ Set runTask(ParallelIndexSupervisorTask task, TaskState expectedTas return getIndexingServiceClient().getPublishedSegments(task); } - ParallelIndexTuningConfig newTuningConfig( - DimensionBasedPartitionsSpec partitionsSpec, - int maxNumConcurrentSubTasks - ) - { - return new ParallelIndexTuningConfig( - null, - null, - null, - null, - null, - null, - new MaxSizeSplitHintSpec(1L), // set maxSplitSize to 1 so that each split has only one file. - partitionsSpec, - null, - null, - null, - true, - null, - null, - null, - null, - maxNumConcurrentSubTasks, - null, - null, - null, - null, - null, - null, - null, - null, - null - ); - } - private ParallelIndexSupervisorTask newTask( @Nullable TimestampSpec timestampSpec, @Nullable DimensionsSpec dimensionsSpec, @@ -171,8 +170,9 @@ private ParallelIndexSupervisorTask newTask( Interval interval, File inputDir, String filter, - DimensionBasedPartitionsSpec partitionsSpec, - int maxNumConcurrentSubTasks + PartitionsSpec partitionsSpec, + int maxNumConcurrentSubTasks, + boolean appendToExisting ) { GranularitySpec granularitySpec = new UniformGranularitySpec( @@ -181,7 +181,11 @@ private ParallelIndexSupervisorTask newTask( interval == null ? null : Collections.singletonList(interval) ); - ParallelIndexTuningConfig tuningConfig = newTuningConfig(partitionsSpec, maxNumConcurrentSubTasks); + ParallelIndexTuningConfig tuningConfig = newTuningConfig( + partitionsSpec, + maxNumConcurrentSubTasks, + !appendToExisting + ); final ParallelIndexIngestionSpec ingestionSpec; @@ -191,11 +195,11 @@ private ParallelIndexSupervisorTask newTask( null, new LocalInputSource(inputDir, filter), inputFormat, - false + appendToExisting ); ingestionSpec = new ParallelIndexIngestionSpec( new DataSchema( - "dataSource", + DATASOURCE, timestampSpec, dimensionsSpec, new AggregatorFactory[]{new LongSumAggregatorFactory("val", "val")}, @@ -209,7 +213,7 @@ private ParallelIndexSupervisorTask newTask( Preconditions.checkArgument(inputFormat == null); ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig( new LocalFirehoseFactory(inputDir, filter, null), - false + appendToExisting ); //noinspection unchecked ingestionSpec = new ParallelIndexIngestionSpec( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java index 470a00372b48..a1d26feaa9b1 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java @@ -37,6 +37,7 @@ import org.apache.druid.client.indexing.NoopIndexingServiceClient; import org.apache.druid.client.indexing.TaskStatusResponse; import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.MaxSizeSplitHintSpec; import org.apache.druid.data.input.impl.CSVParseSpec; import org.apache.druid.data.input.impl.CsvInputFormat; import org.apache.druid.data.input.impl.DimensionsSpec; @@ -46,6 +47,7 @@ import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatusPlus; +import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexing.common.RetryPolicyConfig; import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.SegmentLoaderFactory; @@ -139,34 +141,35 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase false, 0 ); - static final ParallelIndexTuningConfig DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING = new ParallelIndexTuningConfig( - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - 2, - null, - null, - null, - null, - null, - null, - null, - null, - null - ); + public static final ParallelIndexTuningConfig DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING = + new ParallelIndexTuningConfig( + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + 2, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); private static final Logger LOG = new Logger(AbstractParallelIndexSupervisorTaskTest.class); @@ -223,6 +226,42 @@ public void tearDownAbstractParallelIndexSupervisorTaskTest() temporaryFolder.delete(); } + protected ParallelIndexTuningConfig newTuningConfig( + PartitionsSpec partitionsSpec, + int maxNumConcurrentSubTasks, + boolean forceGuaranteedRollup + ) + { + return new ParallelIndexTuningConfig( + null, + null, + null, + null, + null, + null, + new MaxSizeSplitHintSpec(1L), // set maxSplitSize to 1 so that each split has only one file. + partitionsSpec, + null, + null, + null, + forceGuaranteedRollup, + null, + null, + null, + null, + maxNumConcurrentSubTasks, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + } + protected LocalIndexingServiceClient getIndexingServiceClient() { return indexingServiceClient; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java index 7fcabd0911ba..45f40bda2501 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java @@ -27,13 +27,16 @@ import org.apache.druid.data.input.impl.ParseSpec; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.indexer.TaskState; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; +import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.scan.ScanResultValue; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec; +import org.apache.druid.timeline.partition.NumberedShardSpec; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Before; @@ -49,9 +52,12 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; +import java.util.stream.Collectors; @RunWith(Parameterized.class) public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPhaseParallelIndexingTest @@ -128,9 +134,78 @@ public void setup() throws IOException @Test public void testRun() throws Exception { - final Set publishedSegments; + final Set publishedSegments = runTestTask( + new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2")), + TaskState.SUCCESS, + false + ); + assertHashedPartition(publishedSegments); + } + + @Test + public void testAppendLinearlyPartitionedSegmensToHashPartitionedDatasourceSuccessfullyAppend() + { + final Set publishedSegments = new HashSet<>(); + publishedSegments.addAll( + runTestTask( + new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2")), + TaskState.SUCCESS, + false + ) + ); + // Append + publishedSegments.addAll( + runTestTask( + new DynamicPartitionsSpec(5, null), + TaskState.SUCCESS, + true + ) + ); + // And append again + publishedSegments.addAll( + runTestTask( + new DynamicPartitionsSpec(10, null), + TaskState.SUCCESS, + true + ) + ); + + final Map> intervalToSegments = new HashMap<>(); + publishedSegments.forEach( + segment -> intervalToSegments.computeIfAbsent(segment.getInterval(), k -> new ArrayList<>()).add(segment) + ); + for (Entry> entry : intervalToSegments.entrySet()) { + final List segments = entry.getValue(); + final List hashedSegments = segments + .stream() + .filter(segment -> segment.getShardSpec().getClass() == HashBasedNumberedShardSpec.class) + .collect(Collectors.toList()); + final List linearSegments = segments + .stream() + .filter(segment -> segment.getShardSpec().getClass() == NumberedShardSpec.class) + .collect(Collectors.toList()); + + for (DataSegment hashedSegment : hashedSegments) { + final HashBasedNumberedShardSpec hashShardSpec = (HashBasedNumberedShardSpec) hashedSegment.getShardSpec(); + for (DataSegment linearSegment : linearSegments) { + Assert.assertEquals(hashedSegment.getInterval(), linearSegment.getInterval()); + Assert.assertEquals(hashedSegment.getVersion(), linearSegment.getVersion()); + final NumberedShardSpec numberedShardSpec = (NumberedShardSpec) linearSegment.getShardSpec(); + Assert.assertEquals(hashShardSpec.getNumCorePartitions(), numberedShardSpec.getNumCorePartitions()); + Assert.assertTrue(hashShardSpec.getPartitionNum() < numberedShardSpec.getPartitionNum()); + } + } + } + } + + private Set runTestTask( + PartitionsSpec partitionsSpec, + TaskState expectedTaskState, + boolean appendToExisting + ) + { if (isUseInputFormatApi()) { - publishedSegments = runTestTask( + return runTestTask( TIMESTAMP_SPEC, DIMENSIONS_SPEC, INPUT_FORMAT, @@ -138,12 +213,13 @@ public void testRun() throws Exception INTERVAL_TO_INDEX, inputDir, "test_*", - new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2")), + partitionsSpec, maxNumConcurrentSubTasks, - TaskState.SUCCESS + expectedTaskState, + appendToExisting ); } else { - publishedSegments = runTestTask( + return runTestTask( null, null, null, @@ -151,12 +227,12 @@ public void testRun() throws Exception INTERVAL_TO_INDEX, inputDir, "test_*", - new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2")), + partitionsSpec, maxNumConcurrentSubTasks, - TaskState.SUCCESS + expectedTaskState, + appendToExisting ); } - assertHashedPartition(publishedSegments); } private void assertHashedPartition(Set publishedSegments) throws IOException diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java index 3ae79a3b6966..25937101b215 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java @@ -21,14 +21,28 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Ordering; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.InlineInputSource; +import org.apache.druid.data.input.impl.JsonInputFormat; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy; +import org.apache.druid.segment.data.CompressionStrategy; +import org.apache.druid.segment.data.RoaringBitmapSerdeFactory; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.timeline.partition.BuildingHashBasedNumberedShardSpec; import org.hamcrest.Matchers; +import org.joda.time.Duration; import org.joda.time.Interval; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.runners.Enclosed; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -146,4 +160,82 @@ private static void assertNoMissingPartitions( Assert.assertEquals(expectedIds, actualIds); } } + + public static class ConstructorTest + { + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testFailToConstructWhenBothAppendToExistingAndForceGuaranteedRollupAreSet() + { + final boolean appendToExisting = true; + final boolean forceGuaranteedRollup = true; + final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig( + null, + new InlineInputSource("test"), + new JsonInputFormat(null, null, null), + appendToExisting + ); + final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig( + null, + null, + 10, + 1000L, + null, + null, + null, + new HashedPartitionsSpec(null, 10, null), + new IndexSpec( + new RoaringBitmapSerdeFactory(true), + CompressionStrategy.UNCOMPRESSED, + CompressionStrategy.LZF, + LongEncodingStrategy.LONGS + ), + new IndexSpec(), + 1, + forceGuaranteedRollup, + true, + 10000L, + OffHeapMemorySegmentWriteOutMediumFactory.instance(), + null, + 10, + 100, + 20L, + new Duration(3600), + 128, + null, + null, + false, + null, + null + ); + final ParallelIndexIngestionSpec indexIngestionSpec = new ParallelIndexIngestionSpec( + new DataSchema( + "datasource", + new TimestampSpec(null, null, null), + DimensionsSpec.EMPTY, + null, + null, + null + ), + ioConfig, + tuningConfig + ); + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Perfect rollup cannot be guaranteed when appending to existing dataSources"); + new ParallelIndexSupervisorTask( + null, + null, + null, + indexIngestionSpec, + null, + null, + null, + null, + null, + null + ); + } + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java index f9577df97b6b..101c97473485 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java @@ -22,6 +22,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.jsontype.NamedType; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.indexer.partitions.HashedPartitionsSpec; +import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy; @@ -226,4 +228,127 @@ public void testSerdeWithMaxNumSubTasksAndMaxNumConcurrentSubTasks() null ); } + + @Test + public void testConstructorWithHashedPartitionsSpecAndNonForceGuaranteedRollupFailToCreate() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("DynamicPartitionsSpec must be used for best-effort rollup"); + final boolean forceGuaranteedRollup = false; + new ParallelIndexTuningConfig( + null, + null, + 10, + 1000L, + null, + null, + null, + new HashedPartitionsSpec(null, 10, null), + new IndexSpec( + new RoaringBitmapSerdeFactory(true), + CompressionStrategy.UNCOMPRESSED, + CompressionStrategy.LZF, + LongEncodingStrategy.LONGS + ), + new IndexSpec(), + 1, + forceGuaranteedRollup, + true, + 10000L, + OffHeapMemorySegmentWriteOutMediumFactory.instance(), + null, + 10, + 100, + 20L, + new Duration(3600), + 128, + null, + null, + false, + null, + null + ); + } + + @Test + public void testConstructorWithSingleDimensionPartitionsSpecAndNonForceGuaranteedRollupFailToCreate() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("DynamicPartitionsSpec must be used for best-effort rollup"); + final boolean forceGuaranteedRollup = false; + new ParallelIndexTuningConfig( + null, + null, + 10, + 1000L, + null, + null, + null, + new SingleDimensionPartitionsSpec(100, null, "dim", false), + new IndexSpec( + new RoaringBitmapSerdeFactory(true), + CompressionStrategy.UNCOMPRESSED, + CompressionStrategy.LZF, + LongEncodingStrategy.LONGS + ), + new IndexSpec(), + 1, + forceGuaranteedRollup, + true, + 10000L, + OffHeapMemorySegmentWriteOutMediumFactory.instance(), + null, + 10, + 100, + 20L, + new Duration(3600), + 128, + null, + null, + false, + null, + null + ); + } + + @Test + public void testConstructorWithDynamicPartitionsSpecAndForceGuaranteedRollupFailToCreate() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("cannot be used for perfect rollup"); + final boolean forceGuaranteedRollup = true; + new ParallelIndexTuningConfig( + null, + null, + 10, + 1000L, + null, + null, + null, + new DynamicPartitionsSpec(100, null), + new IndexSpec( + new RoaringBitmapSerdeFactory(true), + CompressionStrategy.UNCOMPRESSED, + CompressionStrategy.LZF, + LongEncodingStrategy.LONGS + ), + new IndexSpec(), + 1, + forceGuaranteedRollup, + true, + 10000L, + OffHeapMemorySegmentWriteOutMediumFactory.instance(), + null, + 10, + 100, + 20L, + new Duration(3600), + 128, + null, + null, + false, + null, + null + ); + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialCompactionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialCompactionTest.java new file mode 100644 index 000000000000..c65c5fdd1d8a --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialCompactionTest.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task.batch.parallel; + +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.impl.CsvInputFormat; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexer.TaskState; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.indexer.partitions.HashedPartitionsSpec; +import org.apache.druid.indexer.partitions.PartitionsSpec; +import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec; +import org.apache.druid.indexing.common.LockGranularity; +import org.apache.druid.indexing.common.RetryPolicyConfig; +import org.apache.druid.indexing.common.RetryPolicyFactory; +import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory; +import org.apache.druid.indexing.common.task.CompactionTask; +import org.apache.druid.indexing.common.task.CompactionTask.Builder; +import org.apache.druid.indexing.common.task.SpecificSegmentsSpec; +import org.apache.druid.indexing.common.task.TestAppenderatorsManager; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.segment.SegmentUtils; +import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider; +import org.apache.druid.server.security.AuthTestUtils; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.io.Writer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class PartialCompactionTest extends AbstractMultiPhaseParallelIndexingTest +{ + private static final TimestampSpec TIMESTAMP_SPEC = new TimestampSpec("ts", "auto", null); + private static final DimensionsSpec DIMENSIONS_SPEC = new DimensionsSpec( + DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim1", "dim2")) + ); + private static final InputFormat INPUT_FORMAT = new CsvInputFormat( + Arrays.asList("ts", "dim1", "dim2", "val"), + null, + false, + false, + 0 + ); + private static final Interval INTERVAL_TO_INDEX = Intervals.of("2017-12/P1M"); + private static final RetryPolicyFactory RETRY_POLICY_FACTORY = new RetryPolicyFactory(new RetryPolicyConfig()); + + private File inputDir; + + public PartialCompactionTest() + { + super(LockGranularity.SEGMENT, true); + } + + @Before + public void setup() throws IOException + { + inputDir = temporaryFolder.newFolder("data"); + // set up data + for (int i = 0; i < 10; i++) { + try (final Writer writer = + Files.newBufferedWriter(new File(inputDir, "test_" + i).toPath(), StandardCharsets.UTF_8)) { + for (int j = 0; j < 10; j++) { + writer.write(StringUtils.format("2017-12-%d,%d,%d th test file\n", j + 1, i + 10, i)); + writer.write(StringUtils.format("2017-12-%d,%d,%d th test file\n", j + 2, i + 11, i)); + } + } + } + } + + @Test + public void testPartialCompactHashAndDynamicPartitionedSegments() + { + final Map> hashPartitionedSegments = SegmentUtils.groupSegmentsByInterval( + runTestTask( + new HashedPartitionsSpec(null, 3, null), + TaskState.SUCCESS, + false + ) + ); + final Map> linearlyPartitionedSegments = SegmentUtils.groupSegmentsByInterval( + runTestTask( + new DynamicPartitionsSpec(10, null), + TaskState.SUCCESS, + true + ) + ); + // Pick half of each partition lists to compact together + hashPartitionedSegments.values().forEach( + segmentsInInterval -> segmentsInInterval.sort( + Comparator.comparing(segment -> segment.getShardSpec().getPartitionNum()) + ) + ); + linearlyPartitionedSegments.values().forEach( + segmentsInInterval -> segmentsInInterval.sort( + Comparator.comparing(segment -> segment.getShardSpec().getPartitionNum()) + ) + ); + final List segmentsToCompact = new ArrayList<>(); + for (List segmentsInInterval : hashPartitionedSegments.values()) { + segmentsToCompact.addAll( + segmentsInInterval.subList(segmentsInInterval.size() / 2, segmentsInInterval.size()) + ); + } + for (List segmentsInInterval : linearlyPartitionedSegments.values()) { + segmentsToCompact.addAll( + segmentsInInterval.subList(0, segmentsInInterval.size() / 2) + ); + } + final CompactionTask compactionTask = newCompactionTaskBuilder() + .inputSpec(SpecificSegmentsSpec.fromSegments(segmentsToCompact)) + .tuningConfig(newTuningConfig(new DynamicPartitionsSpec(20, null), 2, false)) + .build(); + final Map> compactedSegments = SegmentUtils.groupSegmentsByInterval( + runTask(compactionTask, TaskState.SUCCESS) + ); + for (List segmentsInInterval : compactedSegments.values()) { + final int expectedAtomicUpdateGroupSize = segmentsInInterval.size(); + for (DataSegment segment : segmentsInInterval) { + Assert.assertEquals(expectedAtomicUpdateGroupSize, segment.getShardSpec().getAtomicUpdateGroupSize()); + } + } + } + + @Test + public void testPartialCompactRangeAndDynamicPartitionedSegments() + { + final Map> rangePartitionedSegments = SegmentUtils.groupSegmentsByInterval( + runTestTask( + new SingleDimensionPartitionsSpec(10, null, "dim1", false), + TaskState.SUCCESS, + false + ) + ); + final Map> linearlyPartitionedSegments = SegmentUtils.groupSegmentsByInterval( + runTestTask( + new DynamicPartitionsSpec(10, null), + TaskState.SUCCESS, + true + ) + ); + // Pick half of each partition lists to compact together + rangePartitionedSegments.values().forEach( + segmentsInInterval -> segmentsInInterval.sort( + Comparator.comparing(segment -> segment.getShardSpec().getPartitionNum()) + ) + ); + linearlyPartitionedSegments.values().forEach( + segmentsInInterval -> segmentsInInterval.sort( + Comparator.comparing(segment -> segment.getShardSpec().getPartitionNum()) + ) + ); + final List segmentsToCompact = new ArrayList<>(); + for (List segmentsInInterval : rangePartitionedSegments.values()) { + segmentsToCompact.addAll( + segmentsInInterval.subList(segmentsInInterval.size() / 2, segmentsInInterval.size()) + ); + } + for (List segmentsInInterval : linearlyPartitionedSegments.values()) { + segmentsToCompact.addAll( + segmentsInInterval.subList(0, segmentsInInterval.size() / 2) + ); + } + final CompactionTask compactionTask = newCompactionTaskBuilder() + .inputSpec(SpecificSegmentsSpec.fromSegments(segmentsToCompact)) + .tuningConfig(newTuningConfig(new DynamicPartitionsSpec(20, null), 2, false)) + .build(); + final Map> compactedSegments = SegmentUtils.groupSegmentsByInterval( + runTask(compactionTask, TaskState.SUCCESS) + ); + for (List segmentsInInterval : compactedSegments.values()) { + final int expectedAtomicUpdateGroupSize = segmentsInInterval.size(); + for (DataSegment segment : segmentsInInterval) { + Assert.assertEquals(expectedAtomicUpdateGroupSize, segment.getShardSpec().getAtomicUpdateGroupSize()); + } + } + } + + private Set runTestTask( + PartitionsSpec partitionsSpec, + TaskState expectedTaskState, + boolean appendToExisting + ) + { + return runTestTask( + TIMESTAMP_SPEC, + DIMENSIONS_SPEC, + INPUT_FORMAT, + null, + INTERVAL_TO_INDEX, + inputDir, + "test_*", + partitionsSpec, + 2, + expectedTaskState, + appendToExisting + ); + } + + private Builder newCompactionTaskBuilder() + { + return new Builder( + DATASOURCE, + getObjectMapper(), + AuthTestUtils.TEST_AUTHORIZER_MAPPER, + new NoopChatHandlerProvider(), + new DropwizardRowIngestionMetersFactory(), + getIndexingServiceClient(), + getCoordinatorClient(), + getSegmentLoaderFactory(), + RETRY_POLICY_FACTORY, + new TestAppenderatorsManager() + ); + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java index f62799b4963f..03ce926c2606 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java @@ -32,12 +32,15 @@ import org.apache.druid.data.input.impl.ParseSpec; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.indexer.TaskState; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.query.scan.ScanResultValue; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.NumberedShardSpec; import org.apache.druid.timeline.partition.SingleDimensionShardSpec; import org.hamcrest.Matchers; import org.joda.time.Interval; @@ -59,8 +62,12 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.stream.Collectors; @@ -201,9 +208,95 @@ private static void writeRow( public void createsCorrectRangePartitions() throws Exception { int targetRowsPerSegment = NUM_ROW / DIM_FILE_CARDINALITY / NUM_PARTITION; - final Set publishedSegments; + final Set publishedSegments = runTestTask( + new SingleDimensionPartitionsSpec( + targetRowsPerSegment, + null, + DIM1, + false + ), + useMultivalueDim ? TaskState.FAILED : TaskState.SUCCESS, + false + ); + + if (!useMultivalueDim) { + assertRangePartitions(publishedSegments); + } + } + + @Test + public void testAppendLinearlyPartitionedSegmentsToHashPartitionedDatasourceSuccessfullyAppend() + { + if (useMultivalueDim) { + return; + } + final int targetRowsPerSegment = NUM_ROW / DIM_FILE_CARDINALITY / NUM_PARTITION; + final Set publishedSegments = new HashSet<>(); + publishedSegments.addAll( + runTestTask( + new SingleDimensionPartitionsSpec( + targetRowsPerSegment, + null, + DIM1, + false + ), + TaskState.SUCCESS, + false + ) + ); + // Append + publishedSegments.addAll( + runTestTask( + new DynamicPartitionsSpec(5, null), + TaskState.SUCCESS, + true + ) + ); + // And append again + publishedSegments.addAll( + runTestTask( + new DynamicPartitionsSpec(10, null), + TaskState.SUCCESS, + true + ) + ); + + final Map> intervalToSegments = new HashMap<>(); + publishedSegments.forEach( + segment -> intervalToSegments.computeIfAbsent(segment.getInterval(), k -> new ArrayList<>()).add(segment) + ); + for (Entry> entry : intervalToSegments.entrySet()) { + final List segments = entry.getValue(); + final List rangedSegments = segments + .stream() + .filter(segment -> segment.getShardSpec().getClass() == SingleDimensionShardSpec.class) + .collect(Collectors.toList()); + final List linearSegments = segments + .stream() + .filter(segment -> segment.getShardSpec().getClass() == NumberedShardSpec.class) + .collect(Collectors.toList()); + + for (DataSegment rangedSegment : rangedSegments) { + final SingleDimensionShardSpec rangeShardSpec = (SingleDimensionShardSpec) rangedSegment.getShardSpec(); + for (DataSegment linearSegment : linearSegments) { + Assert.assertEquals(rangedSegment.getInterval(), linearSegment.getInterval()); + Assert.assertEquals(rangedSegment.getVersion(), linearSegment.getVersion()); + final NumberedShardSpec numberedShardSpec = (NumberedShardSpec) linearSegment.getShardSpec(); + Assert.assertEquals(rangeShardSpec.getNumCorePartitions(), numberedShardSpec.getNumCorePartitions()); + Assert.assertTrue(rangeShardSpec.getPartitionNum() < numberedShardSpec.getPartitionNum()); + } + } + } + } + + private Set runTestTask( + PartitionsSpec partitionsSpec, + TaskState expectedTaskState, + boolean appendToExisting + ) + { if (isUseInputFormatApi()) { - publishedSegments = runTestTask( + return runTestTask( TIMESTAMP_SPEC, DIMENSIONS_SPEC, INPUT_FORMAT, @@ -211,17 +304,13 @@ public void createsCorrectRangePartitions() throws Exception INTERVAL_TO_INDEX, inputDir, TEST_FILE_NAME_PREFIX + "*", - new SingleDimensionPartitionsSpec( - targetRowsPerSegment, - null, - DIM1, - false - ), + partitionsSpec, maxNumConcurrentSubTasks, - useMultivalueDim ? TaskState.FAILED : TaskState.SUCCESS + expectedTaskState, + appendToExisting ); } else { - publishedSegments = runTestTask( + return runTestTask( null, null, null, @@ -229,20 +318,12 @@ public void createsCorrectRangePartitions() throws Exception INTERVAL_TO_INDEX, inputDir, TEST_FILE_NAME_PREFIX + "*", - new SingleDimensionPartitionsSpec( - targetRowsPerSegment, - null, - DIM1, - false - ), + partitionsSpec, maxNumConcurrentSubTasks, - useMultivalueDim ? TaskState.FAILED : TaskState.SUCCESS + expectedTaskState, + appendToExisting ); } - - if (!useMultivalueDim) { - assertRangePartitions(publishedSegments); - } } private void assertRangePartitions(Set publishedSegments) throws IOException diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java index 4abb539f333c..0ae3eef5f492 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java @@ -37,6 +37,7 @@ import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.segment.SegmentUtils; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory; @@ -59,10 +60,8 @@ import java.io.Writer; import java.nio.charset.StandardCharsets; import java.nio.file.Files; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -155,12 +154,23 @@ public void testIsReady() throws Exception } } - private void runTestTask(@Nullable Interval interval, Granularity segmentGranularity, boolean appendToExisting) + private void runTestTask( + @Nullable Interval interval, + Granularity segmentGranularity, + boolean appendToExisting, + Collection originalSegmentsIfAppend + ) { + // The task could run differently between when appendToExisting is false and true even when this is an initial write final ParallelIndexSupervisorTask task = newTask(interval, segmentGranularity, appendToExisting, true); task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK); Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode()); - assertShardSpec(interval, appendToExisting); + assertShardSpec( + task, + interval == null ? LockGranularity.TIME_CHUNK : lockGranularity, + appendToExisting, + originalSegmentsIfAppend + ); } private void runOverwriteTask( @@ -172,18 +182,13 @@ private void runOverwriteTask( final ParallelIndexSupervisorTask task = newTask(interval, segmentGranularity, false, true); task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK); Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode()); - assertShardSpecAfterOverwrite(interval, actualLockGranularity); - } - - private void runTestTask(@Nullable Interval interval, Granularity segmentGranularity) - { - runTestTask(interval, segmentGranularity, false); + assertShardSpecAfterOverwrite(task, actualLockGranularity); } private void testRunAndOverwrite(@Nullable Interval inputInterval, Granularity secondSegmentGranularity) { // Ingest all data. - runTestTask(inputInterval, Granularities.DAY); + runTestTask(inputInterval, Granularities.DAY, false, Collections.emptyList()); final Interval interval = inputInterval == null ? Intervals.ETERNITY : inputInterval; final Collection allSegments = new HashSet<>( @@ -191,11 +196,15 @@ private void testRunAndOverwrite(@Nullable Interval inputInterval, Granularity s ); // Reingest the same data. Each segment should get replaced by a segment with a newer version. - runOverwriteTask( - inputInterval, - secondSegmentGranularity, - secondSegmentGranularity.equals(Granularities.DAY) ? lockGranularity : LockGranularity.TIME_CHUNK - ); + final LockGranularity actualLockGranularity; + if (inputInterval == null) { + actualLockGranularity = LockGranularity.TIME_CHUNK; + } else { + actualLockGranularity = secondSegmentGranularity.equals(Granularities.DAY) + ? lockGranularity + : LockGranularity.TIME_CHUNK; + } + runOverwriteTask(inputInterval, secondSegmentGranularity, actualLockGranularity); // Verify that the segment has been replaced. final Collection newSegments = @@ -206,17 +215,17 @@ private void testRunAndOverwrite(@Nullable Interval inputInterval, Granularity s Assert.assertEquals(new HashSet<>(newSegments), visibles); } - private void assertShardSpec(@Nullable Interval interval, boolean appendToExisting) + private void assertShardSpec( + ParallelIndexSupervisorTask task, + LockGranularity actualLockGranularity, + boolean appendToExisting, + Collection originalSegmentsIfAppend + ) { - final Interval nonNullInterval = interval == null ? Intervals.ETERNITY : interval; - final Collection segments = - getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", nonNullInterval, Segments.ONLY_VISIBLE); - if (!appendToExisting && lockGranularity != LockGranularity.SEGMENT) { - // Check the core partition set in the shardSpec - final Map> intervalToSegments = new HashMap<>(); - segments.forEach( - segment -> intervalToSegments.computeIfAbsent(segment.getInterval(), k -> new ArrayList<>()).add(segment) - ); + final Collection segments = getIndexingServiceClient().getPublishedSegments(task); + if (!appendToExisting && actualLockGranularity == LockGranularity.TIME_CHUNK) { + // Initial write + final Map> intervalToSegments = SegmentUtils.groupSegmentsByInterval(segments); for (List segmentsPerInterval : intervalToSegments.values()) { for (DataSegment segment : segmentsPerInterval) { Assert.assertSame(NumberedShardSpec.class, segment.getShardSpec().getClass()); @@ -225,23 +234,27 @@ private void assertShardSpec(@Nullable Interval interval, boolean appendToExisti } } } else { + // Append or initial write with segment lock + final Map> intervalToOriginalSegments = SegmentUtils.groupSegmentsByInterval( + originalSegmentsIfAppend + ); for (DataSegment segment : segments) { Assert.assertSame(NumberedShardSpec.class, segment.getShardSpec().getClass()); final NumberedShardSpec shardSpec = (NumberedShardSpec) segment.getShardSpec(); - Assert.assertEquals(0, shardSpec.getNumCorePartitions()); + final List originalSegmentsInInterval = intervalToOriginalSegments.get(segment.getInterval()); + final int expectedNumCorePartitions = + originalSegmentsInInterval == null || originalSegmentsInInterval.isEmpty() + ? 0 + : originalSegmentsInInterval.get(0).getShardSpec().getNumCorePartitions(); + Assert.assertEquals(expectedNumCorePartitions, shardSpec.getNumCorePartitions()); } } } - private void assertShardSpecAfterOverwrite(@Nullable Interval interval, LockGranularity actualLockGranularity) + private void assertShardSpecAfterOverwrite(ParallelIndexSupervisorTask task, LockGranularity actualLockGranularity) { - final Interval nonNullInterval = interval == null ? Intervals.ETERNITY : interval; - final Collection segments = - getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", nonNullInterval, Segments.ONLY_VISIBLE); - final Map> intervalToSegments = new HashMap<>(); - segments.forEach( - segment -> intervalToSegments.computeIfAbsent(segment.getInterval(), k -> new ArrayList<>()).add(segment) - ); + final Collection segments = getIndexingServiceClient().getPublishedSegments(task); + final Map> intervalToSegments = SegmentUtils.groupSegmentsByInterval(segments); if (actualLockGranularity != LockGranularity.SEGMENT) { // Check the core partition set in the shardSpec for (List segmentsPerInterval : intervalToSegments.values()) { @@ -296,7 +309,7 @@ public void testRunInSequential() final ParallelIndexSupervisorTask task = newTask(interval, appendToExisting, false); task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK); Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode()); - assertShardSpec(interval, appendToExisting); + assertShardSpec(task, lockGranularity, appendToExisting, Collections.emptyList()); } @Test @@ -349,18 +362,18 @@ public void testWith1MaxNumConcurrentSubTasks() task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK); Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode()); Assert.assertNull("Runner must be null if the task was in the sequential mode", task.getCurrentRunner()); - assertShardSpec(interval, appendToExisting); + assertShardSpec(task, lockGranularity, appendToExisting, Collections.emptyList()); } @Test public void testAppendToExisting() { final Interval interval = Intervals.of("2017-12/P1M"); - runTestTask(interval, Granularities.DAY, true); + runTestTask(interval, Granularities.DAY, true, Collections.emptyList()); final Collection oldSegments = getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", interval, Segments.ONLY_VISIBLE); - runTestTask(interval, Granularities.DAY, true); + runTestTask(interval, Granularities.DAY, true, oldSegments); final Collection newSegments = getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", interval, Segments.ONLY_VISIBLE); Assert.assertTrue(newSegments.containsAll(oldSegments)); @@ -369,6 +382,29 @@ public void testAppendToExisting() Assert.assertEquals(new HashSet<>(newSegments), visibles); } + @Test + public void testOverwriteAndAppend() + { + final Interval interval = Intervals.of("2017-12/P1M"); + testRunAndOverwrite(interval, Granularities.DAY); + final Collection beforeAppendSegments = + getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", interval, Segments.ONLY_VISIBLE); + + runTestTask( + interval, + Granularities.DAY, + true, + beforeAppendSegments + ); + final Collection afterAppendSegments = + getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", interval, Segments.ONLY_VISIBLE); + Assert.assertTrue(afterAppendSegments.containsAll(beforeAppendSegments)); + final VersionedIntervalTimeline timeline = VersionedIntervalTimeline + .forSegments(afterAppendSegments); + final Set visibles = timeline.findNonOvershadowedObjectsInInterval(interval, Partitions.ONLY_COMPLETE); + Assert.assertEquals(new HashSet<>(afterAppendSegments), visibles); + } + private ParallelIndexSupervisorTask newTask( @Nullable Interval interval, boolean appendToExisting, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index da02d9268952..bca3f7881e51 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -153,7 +153,7 @@ public SegmentIdWithShardSpec allocatePendingSegment( dataSource, interval, maxVersion, - partialShardSpec.complete(objectMapper, 0) + partialShardSpec.complete(objectMapper, 0, 0) ); } diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 5c48598880ba..cc4d03351b28 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -51,7 +51,8 @@ import org.apache.druid.timeline.partition.NoneShardSpec; import org.apache.druid.timeline.partition.PartialShardSpec; import org.apache.druid.timeline.partition.PartitionChunk; -import org.apache.druid.timeline.partition.ShardSpec; +import org.apache.druid.timeline.partition.PartitionIds; +import org.apache.druid.timeline.partition.SingleDimensionShardSpec; import org.joda.time.Interval; import org.joda.time.chrono.ISOChronology; import org.skife.jdbi.v2.Folder3; @@ -810,16 +811,7 @@ private SegmentIdWithShardSpec createNewSegment( return null; } else { - //noinspection ConstantConditions - if (FluentIterable - .from(existingChunks) - .transformAndConcat(TimelineObjectHolder::getObject) - .anyMatch(chunk -> !chunk.getObject().getShardSpec().isCompatible(partialShardSpec.getShardSpecClass()))) { - // All existing segments should have a compatible shardSpec with partialShardSpec. - return null; - } - - // max partitionId of the SAME shardSpec + // max partitionId of the shardSpecs which share the same partition space. SegmentIdWithShardSpec maxId = null; if (!existingChunks.isEmpty()) { @@ -829,10 +821,10 @@ private SegmentIdWithShardSpec createNewSegment( for (DataSegment segment : FluentIterable .from(existingHolder.getObject()) .transform(PartitionChunk::getObject) - // Here we check only the segments of the same shardSpec to find out the max partitionId. - // Note that OverwriteShardSpec has the higher range for partitionId than others. + // Here we check only the segments of the shardSpec which shares the same partition space with the given + // partialShardSpec. Note that OverwriteShardSpec doesn't share the partition space with others. // See PartitionIds. - .filter(segment -> segment.getShardSpec().getClass() == partialShardSpec.getShardSpecClass())) { + .filter(segment -> segment.getShardSpec().sharePartitionSpace(partialShardSpec))) { // Don't use the stream API for performance. if (maxId == null || maxId.getShardSpec().getPartitionNum() < segment.getShardSpec().getPartitionNum()) { maxId = SegmentIdWithShardSpec.fromDataSegment(segment); @@ -851,7 +843,7 @@ private SegmentIdWithShardSpec createNewSegment( } maxId = pendings.stream() - .filter(id -> id.getShardSpec().getClass() == partialShardSpec.getShardSpecClass()) + .filter(id -> id.getShardSpec().sharePartitionSpace(partialShardSpec)) .max((id1, id2) -> { final int versionCompare = id1.getVersion().compareTo(id2.getVersion()); if (versionCompare != 0) { @@ -873,9 +865,21 @@ private SegmentIdWithShardSpec createNewSegment( } if (maxId == null) { - final ShardSpec shardSpec = partialShardSpec.complete(jsonMapper, null); + // This code is executed when the Overlord coordinates segment allocation, which is either you append segments + // or you use segment lock. When appending segments, null maxId means that we are allocating the very initial + // segment for this time chunk. Since the core partitions set is not determined for appended segments, we set + // it 0. When you use segment lock, the core partitions set doesn't work with it. We simply set it 0 so that the + // OvershadowableManager handles the atomic segment update. + final int newPartitionId = partialShardSpec.useNonRootGenerationPartitionSpace() + ? PartitionIds.NON_ROOT_GEN_START_PARTITION_ID + : PartitionIds.ROOT_GEN_START_PARTITION_ID; String version = versionOfExistingChunks == null ? maxVersion : versionOfExistingChunks; - return new SegmentIdWithShardSpec(dataSource, interval, version, shardSpec); + return new SegmentIdWithShardSpec( + dataSource, + interval, + version, + partialShardSpec.complete(jsonMapper, newPartitionId, 0) + ); } else if (!maxId.getInterval().equals(interval) || maxId.getVersion().compareTo(maxVersion) > 0) { log.warn( "Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: conflicting segment[%s].", @@ -885,13 +889,23 @@ private SegmentIdWithShardSpec createNewSegment( maxId ); return null; + } else if (maxId.getShardSpec().getNumCorePartitions() == SingleDimensionShardSpec.UNKNOWN_NUM_CORE_PARTITIONS) { + log.warn( + "Cannot allocate new segment because of unknown core partition size of segment[%s], shardSpec[%s]", + maxId, + maxId.getShardSpec() + ); + return null; } else { - final ShardSpec newShardSpec = partialShardSpec.complete(jsonMapper, maxId.getShardSpec()); return new SegmentIdWithShardSpec( dataSource, maxId.getInterval(), Preconditions.checkNotNull(versionOfExistingChunks, "versionOfExistingChunks"), - newShardSpec + partialShardSpec.complete( + jsonMapper, + maxId.getShardSpec().getPartitionNum() + 1, + maxId.getShardSpec().getNumCorePartitions() + ) ); } } diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index bd8e5ef3a8b0..8e7219614cd7 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -44,6 +44,7 @@ import org.apache.druid.timeline.partition.NumberedShardSpec; import org.apache.druid.timeline.partition.PartialShardSpec; import org.apache.druid.timeline.partition.PartitionIds; +import org.apache.druid.timeline.partition.SingleDimensionShardSpec; import org.assertj.core.api.Assertions; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -60,6 +61,7 @@ import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; @@ -1113,4 +1115,49 @@ public void testAllocatePendingSegmentsForHashBasedNumberedShardSpec() throws IO Assert.assertEquals(0, shardSpec.getNumCorePartitions()); Assert.assertEquals(3, shardSpec.getNumBuckets()); } + + @Test + public void testAddNumberedShardSpecAfterSingleDimensionsShardSpecWithUnknownCorePartitionSize() throws IOException + { + final String datasource = "datasource"; + final Interval interval = Intervals.of("2020-01-01/P1D"); + final String version = "version"; + final List dimensions = ImmutableList.of("dim"); + final List metrics = ImmutableList.of("met"); + final Set originalSegments = new HashSet<>(); + for (int i = 0; i < 6; i++) { + final String start = i == 0 ? null : String.valueOf(i - 1); + final String end = i == 5 ? null : String.valueOf(i); + originalSegments.add( + new DataSegment( + datasource, + interval, + version, + ImmutableMap.of(), + dimensions, + metrics, + new SingleDimensionShardSpec( + "dim", + start, + end, + i, + null // emulate shardSpecs created in older versions of Druid + ), + 9, + 10L + ) + ); + } + coordinator.announceHistoricalSegments(originalSegments); + final SegmentIdWithShardSpec id = coordinator.allocatePendingSegment( + datasource, + "seq", + null, + interval, + NumberedPartialShardSpec.instance(), + version, + false + ); + Assert.assertNull(id); + } } diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/SegmentIdWithShardSpecTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/SegmentIdWithShardSpecTest.java index 558736980a85..8e6c1b9df81d 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/SegmentIdWithShardSpecTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/SegmentIdWithShardSpecTest.java @@ -53,7 +53,7 @@ public void testSerde() throws Exception Assert.assertEquals(INTERVAL, id2.getInterval()); Assert.assertEquals(VERSION, id2.getVersion()); Assert.assertEquals(SHARD_SPEC_1.getPartitionNum(), id2.getShardSpec().getPartitionNum()); - Assert.assertEquals(SHARD_SPEC_1.getNumCorePartitions(), ((NumberedShardSpec) id2.getShardSpec()).getNumCorePartitions()); + Assert.assertEquals(SHARD_SPEC_1.getNumCorePartitions(), id2.getShardSpec().getNumCorePartitions()); } @Test diff --git a/server/src/test/java/org/apache/druid/server/shard/NumberedShardSpecTest.java b/server/src/test/java/org/apache/druid/server/shard/NumberedShardSpecTest.java index e94b8068424e..04c0c553bcf9 100644 --- a/server/src/test/java/org/apache/druid/server/shard/NumberedShardSpecTest.java +++ b/server/src/test/java/org/apache/druid/server/shard/NumberedShardSpecTest.java @@ -30,10 +30,13 @@ import org.apache.druid.timeline.Overshadowable; import org.apache.druid.timeline.TimelineObjectHolder; import org.apache.druid.timeline.VersionedIntervalTimeline; -import org.apache.druid.timeline.partition.NumberedOverwriteShardSpec; +import org.apache.druid.timeline.partition.HashBasedNumberedPartialShardSpec; +import org.apache.druid.timeline.partition.NumberedOverwritePartialShardSpec; +import org.apache.druid.timeline.partition.NumberedPartialShardSpec; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.apache.druid.timeline.partition.PartitionChunk; import org.apache.druid.timeline.partition.ShardSpec; +import org.apache.druid.timeline.partition.SingleDimensionPartialShardSpec; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Test; @@ -60,7 +63,7 @@ public void testSerdeRoundTrip() throws Exception ShardSpec.class ); Assert.assertEquals(1, spec.getPartitionNum()); - Assert.assertEquals(2, ((NumberedShardSpec) spec).getNumCorePartitions()); + Assert.assertEquals(2, spec.getNumCorePartitions()); } @Test @@ -71,7 +74,7 @@ public void testSerdeBackwardsCompat() throws Exception ShardSpec.class ); Assert.assertEquals(1, spec.getPartitionNum()); - Assert.assertEquals(2, ((NumberedShardSpec) spec).getNumCorePartitions()); + Assert.assertEquals(2, spec.getNumCorePartitions()); } @Test @@ -195,6 +198,16 @@ public void testVersionedIntervalTimelineBehaviorForNumberedShardSpec() ); } + @Test + public void testSharePartitionSpace() + { + final NumberedShardSpec shardSpec = new NumberedShardSpec(0, 1); + Assert.assertTrue(shardSpec.sharePartitionSpace(NumberedPartialShardSpec.instance())); + Assert.assertTrue(shardSpec.sharePartitionSpace(new HashBasedNumberedPartialShardSpec(null, 0, 1))); + Assert.assertTrue(shardSpec.sharePartitionSpace(new SingleDimensionPartialShardSpec("dim", 0, null, null, 1))); + Assert.assertFalse(shardSpec.sharePartitionSpace(new NumberedOverwritePartialShardSpec(0, 2, 1))); + } + private void testVersionedIntervalTimelineBehaviorForNumberedShardSpec( List> chunks, Set expectedObjects @@ -217,14 +230,6 @@ private void testVersionedIntervalTimelineBehaviorForNumberedShardSpec( Assert.assertEquals(expectedObjects, actualObjects); } - @Test - public void testCompatible() - { - final NumberedShardSpec spec = new NumberedShardSpec(0, 0); - Assert.assertTrue(spec.isCompatible(NumberedShardSpec.class)); - Assert.assertTrue(spec.isCompatible(NumberedOverwriteShardSpec.class)); - } - private static final class OvershadowableString implements Overshadowable { private final int partitionId; diff --git a/server/src/test/java/org/apache/druid/server/shard/SingleDimensionShardSpecTest.java b/server/src/test/java/org/apache/druid/server/shard/SingleDimensionShardSpecTest.java index 0787cf2c119c..6928dfa735e3 100644 --- a/server/src/test/java/org/apache/druid/server/shard/SingleDimensionShardSpecTest.java +++ b/server/src/test/java/org/apache/druid/server/shard/SingleDimensionShardSpecTest.java @@ -30,7 +30,11 @@ import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.timeline.partition.HashBasedNumberedPartialShardSpec; +import org.apache.druid.timeline.partition.NumberedOverwritePartialShardSpec; +import org.apache.druid.timeline.partition.NumberedPartialShardSpec; import org.apache.druid.timeline.partition.ShardSpec; +import org.apache.druid.timeline.partition.SingleDimensionPartialShardSpec; import org.apache.druid.timeline.partition.SingleDimensionShardSpec; import org.junit.Assert; import org.junit.Test; @@ -142,6 +146,16 @@ public void testPossibleInDomain() Assert.assertTrue(shard7.possibleInDomain(domain2)); } + @Test + public void testSharePartitionSpace() + { + final SingleDimensionShardSpec shardSpec = makeSpec("start", "end"); + Assert.assertTrue(shardSpec.sharePartitionSpace(NumberedPartialShardSpec.instance())); + Assert.assertTrue(shardSpec.sharePartitionSpace(new HashBasedNumberedPartialShardSpec(null, 0, 1))); + Assert.assertTrue(shardSpec.sharePartitionSpace(new SingleDimensionPartialShardSpec("dim", 0, null, null, 1))); + Assert.assertFalse(shardSpec.sharePartitionSpace(new NumberedOverwritePartialShardSpec(0, 2, 1))); + } + private static RangeSet rangeSet(List> ranges) { ImmutableRangeSet.Builder builder = ImmutableRangeSet.builder(); diff --git a/server/src/test/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpecTest.java b/server/src/test/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpecTest.java index ecb110237498..be24fe5c25a8 100644 --- a/server/src/test/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpecTest.java +++ b/server/src/test/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpecTest.java @@ -85,14 +85,14 @@ public void testSerdeBackwardsCompat() throws Exception ShardSpec.class ); Assert.assertEquals(1, spec.getPartitionNum()); - Assert.assertEquals(2, ((HashBasedNumberedShardSpec) spec).getNumCorePartitions()); + Assert.assertEquals(2, spec.getNumCorePartitions()); final ShardSpec specWithPartitionDimensions = ServerTestHelper.MAPPER.readValue( "{\"type\": \"hashed\", \"partitions\": 2, \"partitionNum\": 1, \"partitionDimensions\":[\"visitor_id\"]}", ShardSpec.class ); Assert.assertEquals(1, specWithPartitionDimensions.getPartitionNum()); - Assert.assertEquals(2, ((HashBasedNumberedShardSpec) specWithPartitionDimensions).getNumCorePartitions()); + Assert.assertEquals(2, specWithPartitionDimensions.getNumCorePartitions()); Assert.assertEquals(2, ((HashBasedNumberedShardSpec) specWithPartitionDimensions).getNumBuckets()); Assert.assertEquals( ImmutableList.of("visitor_id"), @@ -199,6 +199,23 @@ public void testGetGroupKey() ); } + @Test + public void testSharePartitionSpace() + { + final HashBasedNumberedShardSpec shardSpec = new HashBasedNumberedShardSpec( + 1, + 2, + 1, + 3, + ImmutableList.of("visitor_id"), + ServerTestHelper.MAPPER + ); + Assert.assertTrue(shardSpec.sharePartitionSpace(NumberedPartialShardSpec.instance())); + Assert.assertTrue(shardSpec.sharePartitionSpace(new HashBasedNumberedPartialShardSpec(null, 0, 1))); + Assert.assertTrue(shardSpec.sharePartitionSpace(new SingleDimensionPartialShardSpec("dim", 0, null, null, 1))); + Assert.assertFalse(shardSpec.sharePartitionSpace(new NumberedOverwritePartialShardSpec(0, 2, 1))); + } + public boolean assertExistsInOneSpec(List specs, InputRow row) { for (ShardSpec spec : specs) {