diff --git a/core/src/main/java/org/apache/druid/indexer/partitions/Checks.java b/core/src/main/java/org/apache/druid/indexer/partitions/Checks.java new file mode 100644 index 000000000000..a0e47f8efb88 --- /dev/null +++ b/core/src/main/java/org/apache/druid/indexer/partitions/Checks.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexer.partitions; + +/** + * Various helper methods useful for checking the validity of arguments to spec constructors. + */ +class Checks +{ + /** + * @return Non-null value, or first one if both are null + */ + @SuppressWarnings("VariableNotUsedInsideIf") // false positive: checked for 'null' not used inside 'if + static Property checkAtMostOneNotNull(String name1, Integer value1, String name2, Integer value2) + { + final Property property; + + if (value1 == null && value2 == null) { + property = new Property<>(name1, value1); + } else if (value1 == null) { + property = new Property<>(name2, value2); + } else if (value2 == null) { + property = new Property<>(name1, value1); + } else { + throw new IllegalArgumentException("At most one of " + name1 + " or " + name2 + " must be present"); + } + + return property; + } +} diff --git a/core/src/main/java/org/apache/druid/indexer/partitions/DimensionBasedPartitionsSpec.java b/core/src/main/java/org/apache/druid/indexer/partitions/DimensionBasedPartitionsSpec.java index b1f4e322a9ce..58c0189ee729 100644 --- a/core/src/main/java/org/apache/druid/indexer/partitions/DimensionBasedPartitionsSpec.java +++ b/core/src/main/java/org/apache/druid/indexer/partitions/DimensionBasedPartitionsSpec.java @@ -26,5 +26,11 @@ */ public interface DimensionBasedPartitionsSpec extends PartitionsSpec { + String TARGET_ROWS_PER_SEGMENT = "targetRowsPerSegment"; + + // Deprecated properties preserved for backward compatibility: + @Deprecated + String TARGET_PARTITION_SIZE = "targetPartitionSize"; + List getPartitionDimensions(); } diff --git a/core/src/main/java/org/apache/druid/indexer/partitions/DynamicPartitionsSpec.java b/core/src/main/java/org/apache/druid/indexer/partitions/DynamicPartitionsSpec.java index fb326633015a..242c0918c675 100644 --- a/core/src/main/java/org/apache/druid/indexer/partitions/DynamicPartitionsSpec.java +++ b/core/src/main/java/org/apache/druid/indexer/partitions/DynamicPartitionsSpec.java @@ -31,13 +31,14 @@ public class DynamicPartitionsSpec implements PartitionsSpec { public static final long DEFAULT_MAX_TOTAL_ROWS = 20_000_000; + static final String NAME = "dynamic"; private final int maxRowsPerSegment; private final long maxTotalRows; @JsonCreator public DynamicPartitionsSpec( - @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment, + @JsonProperty(PartitionsSpec.MAX_ROWS_PER_SEGMENT) @Nullable Integer maxRowsPerSegment, @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows ) { diff --git a/core/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java b/core/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java index 8002fb5af7d9..950015f7c2c3 100644 --- a/core/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java +++ b/core/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java @@ -31,6 +31,7 @@ public class HashedPartitionsSpec implements DimensionBasedPartitionsSpec { + static final String NAME = "hashed"; private static final Logger LOG = new Logger(HashedPartitionsSpec.class); @Nullable @@ -41,34 +42,37 @@ public class HashedPartitionsSpec implements DimensionBasedPartitionsSpec public static HashedPartitionsSpec defaultSpec() { - return new HashedPartitionsSpec(null, null, null, null); - } - - public HashedPartitionsSpec( - @Nullable Integer maxRowsPerSegment, - @Nullable Integer numShards, - @Nullable List partitionDimensions - ) - { - this(null, maxRowsPerSegment, numShards, partitionDimensions); + return new HashedPartitionsSpec(null, null, null, null, null); } @JsonCreator public HashedPartitionsSpec( - @JsonProperty("targetPartitionSize") @Deprecated @Nullable Integer targetPartitionSize, - @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment, + @JsonProperty(DimensionBasedPartitionsSpec.TARGET_ROWS_PER_SEGMENT) @Nullable Integer targetRowsPerSegment, @JsonProperty("numShards") @Nullable Integer numShards, - @JsonProperty("partitionDimensions") @Nullable List partitionDimensions + @JsonProperty("partitionDimensions") @Nullable List partitionDimensions, + + // Deprecated properties preserved for backward compatibility: + @Deprecated @JsonProperty(DimensionBasedPartitionsSpec.TARGET_PARTITION_SIZE) @Nullable + Integer targetPartitionSize, + @Deprecated @JsonProperty(PartitionsSpec.MAX_ROWS_PER_SEGMENT) @Nullable + Integer maxRowsPerSegment ) { + Property target = Checks.checkAtMostOneNotNull( + DimensionBasedPartitionsSpec.TARGET_ROWS_PER_SEGMENT, + targetRowsPerSegment, + DimensionBasedPartitionsSpec.TARGET_PARTITION_SIZE, + targetPartitionSize + ); + Preconditions.checkArgument( - PartitionsSpec.isEffectivelyNull(targetPartitionSize) || PartitionsSpec.isEffectivelyNull(maxRowsPerSegment), - "Can't set both targetPartitionSize and maxRowsPerSegment" + PartitionsSpec.isEffectivelyNull(target.getValue()) || PartitionsSpec.isEffectivelyNull(maxRowsPerSegment), + "Can't set both " + target.getName() + " and maxRowsPerSegment" ); - final Integer realMaxRowsPerSegment = targetPartitionSize == null ? maxRowsPerSegment : targetPartitionSize; + final Integer realMaxRowsPerSegment = target.getValue() == null ? maxRowsPerSegment : target.getValue(); Preconditions.checkArgument( PartitionsSpec.isEffectivelyNull(realMaxRowsPerSegment) || PartitionsSpec.isEffectivelyNull(numShards), - "Can't use maxRowsPerSegment or targetPartitionSize and numShards together" + "Can't use maxRowsPerSegment or " + target.getName() + " and numShards together" ); // Needs to determine partitions if the _given_ numShards is null this.maxRowsPerSegment = getValidMaxRowsPerSegment(realMaxRowsPerSegment, numShards); @@ -100,6 +104,16 @@ public HashedPartitionsSpec( } } + public HashedPartitionsSpec( + @Nullable Integer maxRowsPerSegment, + @Nullable Integer numShards, + @Nullable List partitionDimensions + ) + { + this(null, numShards, partitionDimensions, null, maxRowsPerSegment); + } + + private static boolean needsDeterminePartitions(@Nullable Integer numShards) { return PartitionsSpec.isEffectivelyNull(numShards); diff --git a/core/src/main/java/org/apache/druid/indexer/partitions/PartitionsSpec.java b/core/src/main/java/org/apache/druid/indexer/partitions/PartitionsSpec.java index 2f7396d168e0..c4961e9c0c69 100644 --- a/core/src/main/java/org/apache/druid/indexer/partitions/PartitionsSpec.java +++ b/core/src/main/java/org/apache/druid/indexer/partitions/PartitionsSpec.java @@ -29,14 +29,15 @@ */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = HashedPartitionsSpec.class) @JsonSubTypes(value = { - @JsonSubTypes.Type(name = "single_dim", value = SingleDimensionPartitionsSpec.class), - @JsonSubTypes.Type(name = "dimension", value = SingleDimensionPartitionsSpec.class), // for backward compatibility - @JsonSubTypes.Type(name = "hashed", value = HashedPartitionsSpec.class), - @JsonSubTypes.Type(name = "dynamic", value = DynamicPartitionsSpec.class) + @JsonSubTypes.Type(name = SingleDimensionPartitionsSpec.NAME, value = SingleDimensionPartitionsSpec.class), + @JsonSubTypes.Type(name = SingleDimensionPartitionsSpec.OLD_NAME, value = SingleDimensionPartitionsSpec.class), // for backward compatibility + @JsonSubTypes.Type(name = HashedPartitionsSpec.NAME, value = HashedPartitionsSpec.class), + @JsonSubTypes.Type(name = DynamicPartitionsSpec.NAME, value = DynamicPartitionsSpec.class) }) public interface PartitionsSpec { int DEFAULT_MAX_ROWS_PER_SEGMENT = 5_000_000; + String MAX_ROWS_PER_SEGMENT = "maxRowsPerSegment"; /** * Returns the max number of rows per segment. diff --git a/core/src/main/java/org/apache/druid/indexer/partitions/Property.java b/core/src/main/java/org/apache/druid/indexer/partitions/Property.java new file mode 100644 index 000000000000..6d4715edd1b5 --- /dev/null +++ b/core/src/main/java/org/apache/druid/indexer/partitions/Property.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexer.partitions; + +import java.util.Objects; + +/** + * Convenience class for holding a pair of string key and templated value. + */ +class Property +{ + private final String name; + private final T value; + + Property(String name, T value) + { + this.name = name; + this.value = value; + } + + public String getName() + { + return name; + } + + public T getValue() + { + return value; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Property property = (Property) o; + return Objects.equals(name, property.name) && + Objects.equals(value, property.value); + } + + @Override + public int hashCode() + { + return Objects.hash(name, value); + } +} diff --git a/core/src/main/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpec.java b/core/src/main/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpec.java index aa6ef87f3804..9b985fb04058 100644 --- a/core/src/main/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpec.java +++ b/core/src/main/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpec.java @@ -21,76 +21,125 @@ import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; import java.util.Collections; import java.util.List; import java.util.Objects; +/** + * Partition a segment by a single dimension. + */ public class SingleDimensionPartitionsSpec implements DimensionBasedPartitionsSpec { - private final int maxRowsPerSegment; - private final int maxPartitionSize; - @Nullable + static final String NAME = "single_dim"; + static final String OLD_NAME = "dimension"; // for backward compatibility + + private static final String MAX_PARTITION_SIZE = "maxPartitionSize"; + + private final Integer targetRowsPerSegment; + private final Integer maxRowsPerSegment; private final String partitionDimension; private final boolean assumeGrouped; - public SingleDimensionPartitionsSpec( - int maxRowsPerSegment, - @Nullable Integer maxPartitionSize, - @Nullable String partitionDimension, - boolean assumeGrouped - ) - { - this(null, maxRowsPerSegment, maxPartitionSize, partitionDimension, assumeGrouped); - } + // Values for these fields are derived from the one above: + private final int resolvedMaxRowPerSegment; @JsonCreator public SingleDimensionPartitionsSpec( - @JsonProperty("targetPartitionSize") @Nullable Integer targetPartitionSize, - @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment, - @JsonProperty("maxPartitionSize") @Nullable Integer maxPartitionSize, + @JsonProperty(DimensionBasedPartitionsSpec.TARGET_ROWS_PER_SEGMENT) @Nullable Integer targetRowsPerSegment, + @JsonProperty(PartitionsSpec.MAX_ROWS_PER_SEGMENT) @Nullable Integer maxRowsPerSegment, @JsonProperty("partitionDimension") @Nullable String partitionDimension, - @JsonProperty("assumeGrouped") boolean assumeGrouped // false by default + @JsonProperty("assumeGrouped") boolean assumeGrouped, // false by default + + // Deprecated properties preserved for backward compatibility: + @Deprecated @JsonProperty(DimensionBasedPartitionsSpec.TARGET_PARTITION_SIZE) @Nullable + Integer targetPartitionSize, // prefer targetRowsPerSegment + @Deprecated @JsonProperty(MAX_PARTITION_SIZE) @Nullable + Integer maxPartitionSize // prefer maxRowsPerSegment ) { - Preconditions.checkArgument( - PartitionsSpec.isEffectivelyNull(targetPartitionSize) || PartitionsSpec.isEffectivelyNull(maxRowsPerSegment), - "Can't set both targetPartitionSize and maxRowsPerSegment" + Property target = Checks.checkAtMostOneNotNull( + DimensionBasedPartitionsSpec.TARGET_ROWS_PER_SEGMENT, + targetRowsPerSegment, + DimensionBasedPartitionsSpec.TARGET_PARTITION_SIZE, + targetPartitionSize + ); + + Property max = Checks.checkAtMostOneNotNull( + PartitionsSpec.MAX_ROWS_PER_SEGMENT, + maxRowsPerSegment, + MAX_PARTITION_SIZE, + maxPartitionSize ); + Preconditions.checkArgument( - !PartitionsSpec.isEffectivelyNull(targetPartitionSize) || !PartitionsSpec.isEffectivelyNull(maxRowsPerSegment), - "Either targetPartitionSize or maxRowsPerSegment must be specified" + (target.getValue() == null) != (max.getValue() == null), + "Exactly one of " + target.getName() + " or " + max.getName() + " must be present" ); - final int realMaxRowsPerSegment = targetPartitionSize == null ? maxRowsPerSegment : targetPartitionSize; - Preconditions.checkArgument(realMaxRowsPerSegment > 0, "maxRowsPerSegment must be specified"); - this.maxRowsPerSegment = realMaxRowsPerSegment; - this.maxPartitionSize = PartitionsSpec.isEffectivelyNull(maxPartitionSize) - ? Math.addExact(realMaxRowsPerSegment, (int) (realMaxRowsPerSegment * 0.5)) - : maxPartitionSize; + this.partitionDimension = partitionDimension; this.assumeGrouped = assumeGrouped; + this.targetRowsPerSegment = target.getValue(); + this.maxRowsPerSegment = max.getValue(); + + this.resolvedMaxRowPerSegment = resolveMaxRowsPerSegment(target, max); + } + + @VisibleForTesting + public SingleDimensionPartitionsSpec( + @Nullable Integer targetRowsPerSegment, + @Nullable Integer maxRowsPerSegment, + @Nullable String partitionDimension, + boolean assumeGrouped + ) + { + this(targetRowsPerSegment, maxRowsPerSegment, partitionDimension, assumeGrouped, null, null); + } + + private static int resolveMaxRowsPerSegment(Property target, Property max) + { + final int resolvedValue; + + if (target.getValue() != null) { + Preconditions.checkArgument(target.getValue() > 0, target.getName() + " must be greater than 0"); + try { + resolvedValue = Math.addExact(target.getValue(), (target.getValue() / 2)); + } + catch (ArithmeticException e) { + throw new IllegalArgumentException(target.getName() + " is too large"); + } + } else { + Preconditions.checkArgument(max.getValue() > 0, max.getName() + " must be greater than 0"); + resolvedValue = max.getValue(); + } + return resolvedValue; } - @Override @JsonProperty - public Integer getMaxRowsPerSegment() + @Nullable + public Integer getTargetRowsPerSegment() { - return maxRowsPerSegment; + return targetRowsPerSegment; } + @JsonIgnore @Override - public boolean needsDeterminePartitions(boolean useForHadoopTask) + @NotNull + public Integer getMaxRowsPerSegment() { - return true; + return resolvedMaxRowPerSegment; // NOTE: This returns the *resolved* value } - @JsonProperty - public int getMaxPartitionSize() + @JsonProperty(PartitionsSpec.MAX_ROWS_PER_SEGMENT) + private Integer getMaxRowsPerSegmentForJson() { - return maxPartitionSize; + return maxRowsPerSegment; } @JsonProperty @@ -106,12 +155,19 @@ public boolean isAssumeGrouped() return assumeGrouped; } + @JsonIgnore @Override public List getPartitionDimensions() { return partitionDimension == null ? Collections.emptyList() : Collections.singletonList(partitionDimension); } + @Override + public boolean needsDeterminePartitions(boolean useForHadoopTask) + { + return true; + } + @Override public boolean equals(Object o) { @@ -122,26 +178,34 @@ public boolean equals(Object o) return false; } SingleDimensionPartitionsSpec that = (SingleDimensionPartitionsSpec) o; - return maxRowsPerSegment == that.maxRowsPerSegment && - maxPartitionSize == that.maxPartitionSize && - assumeGrouped == that.assumeGrouped && + return assumeGrouped == that.assumeGrouped && + resolvedMaxRowPerSegment == that.resolvedMaxRowPerSegment && + Objects.equals(targetRowsPerSegment, that.targetRowsPerSegment) && + Objects.equals(maxRowsPerSegment, that.maxRowsPerSegment) && Objects.equals(partitionDimension, that.partitionDimension); } @Override public int hashCode() { - return Objects.hash(maxRowsPerSegment, maxPartitionSize, partitionDimension, assumeGrouped); + return Objects.hash( + targetRowsPerSegment, + maxRowsPerSegment, + partitionDimension, + assumeGrouped, + resolvedMaxRowPerSegment + ); } @Override public String toString() { return "SingleDimensionPartitionsSpec{" + - "maxRowsPerSegment=" + maxRowsPerSegment + - ", maxPartitionSize=" + maxPartitionSize + + "targetRowsPerSegment=" + targetRowsPerSegment + + ", maxRowsPerSegment=" + maxRowsPerSegment + ", partitionDimension='" + partitionDimension + '\'' + ", assumeGrouped=" + assumeGrouped + + ", resolvedMaxRowPerSegment=" + resolvedMaxRowPerSegment + '}'; } } diff --git a/core/src/test/java/org/apache/druid/indexer/partitions/ChecksTest.java b/core/src/test/java/org/apache/druid/indexer/partitions/ChecksTest.java new file mode 100644 index 000000000000..97d1a69602db --- /dev/null +++ b/core/src/test/java/org/apache/druid/indexer/partitions/ChecksTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexer.partitions; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class ChecksTest +{ + private static final String NAME1 = "name1"; + private static final Integer VALUE1 = 1; + private static final String NAME2 = "name2"; + private static final Integer VALUE2 = 2; + private static final Integer NULL = null; + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @Test + public void checkAtMostOneNotNullFirstNull() + { + Property result = Checks.checkAtMostOneNotNull(NAME1, NULL, NAME2, VALUE2); + Assert.assertEquals(NAME2, result.getName()); + Assert.assertEquals(VALUE2, result.getValue()); + } + + @Test + public void checkAtMostOneNotNullSecondNull() + { + Property result = Checks.checkAtMostOneNotNull(NAME1, VALUE1, NAME2, NULL); + Assert.assertEquals(NAME1, result.getName()); + Assert.assertEquals(VALUE1, result.getValue()); + } + + @Test + public void checkAtMostOneNotNullBothNull() + { + Property result = Checks.checkAtMostOneNotNull(NAME1, NULL, NAME2, NULL); + Assert.assertEquals(NAME1, result.getName()); + Assert.assertEquals(NULL, result.getValue()); + } + + @Test + public void checkAtMostOneNotNullNeitherNull() + { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("At most one of " + NAME1 + " or " + NAME2 + " must be present"); + + //noinspection ConstantConditions (expected to fail) + Checks.checkAtMostOneNotNull(NAME1, VALUE1, NAME2, VALUE2); + } +} diff --git a/core/src/test/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpecTest.java b/core/src/test/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpecTest.java new file mode 100644 index 000000000000..12c4c84569b3 --- /dev/null +++ b/core/src/test/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpecTest.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexer.partitions; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.Collections; + +public class SingleDimensionPartitionsSpecTest +{ + private static final Integer TARGET_ROWS_PER_SEGMENT = 1; + private static final Integer MAX_ROWS_PER_SEGMENT = null; + private static final String PARTITION_DIMENSION = "a"; + private static final boolean ASSUME_GROUPED = false; + private static final SingleDimensionPartitionsSpec SPEC = new SingleDimensionPartitionsSpec( + TARGET_ROWS_PER_SEGMENT, + MAX_ROWS_PER_SEGMENT, + PARTITION_DIMENSION, + ASSUME_GROUPED + ); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @Test + public void serde() + { + String json = serialize(SPEC); + SingleDimensionPartitionsSpec spec = deserialize(json); + Assert.assertEquals(SPEC, spec); + } + + @Test + public void deserializeWithBackwardCompatibility() + { + String serialized = "{" + + "\"type\":\"" + SingleDimensionPartitionsSpec.NAME + "\"" + + ",\"targetPartitionSize\":" + TARGET_ROWS_PER_SEGMENT // test backward-compatible for this + + ",\"maxPartitionSize\":" + MAX_ROWS_PER_SEGMENT // test backward-compatible for this + + ",\"partitionDimension\":\"" + PARTITION_DIMENSION + "\"" + + ",\"assumeGrouped\":" + ASSUME_GROUPED + + "}"; + SingleDimensionPartitionsSpec spec = deserialize(serialized); + Assert.assertEquals(SPEC, spec); + } + + @Test + public void havingBothTargetForbidden() + { + new Tester() + .targetRowsPerSegment(1) + .targetPartitionSize(1) + .testIllegalArgumentException("At most one of targetRowsPerSegment or targetPartitionSize must be present"); + } + + @Test + public void havingBothMaxForbidden() + { + new Tester() + .maxRowsPerSegment(1) + .maxPartitionSize(1) + .testIllegalArgumentException("At most one of maxRowsPerSegment or maxPartitionSize must be present"); + } + + @Test + public void havingNeitherTargetNorMaxForbidden() + { + new Tester() + .testIllegalArgumentException("Exactly one of targetRowsPerSegment or maxRowsPerSegment must be present"); + + } + + @Test + public void targetRowsPerSegmentMustBePositive() + { + new Tester() + .targetRowsPerSegment(0) + .testIllegalArgumentException("targetRowsPerSegment must be greater than 0"); + } + + @Test + public void targetPartitionSizeMustBePositive() + { + new Tester() + .targetPartitionSize(0) + .testIllegalArgumentException("targetPartitionSize must be greater than 0"); + } + + @Test + public void targetMaxRowsPerSegmentOverflows() + { + new Tester() + .targetRowsPerSegment(Integer.MAX_VALUE) + .testIllegalArgumentException("targetRowsPerSegment is too large"); + } + + @Test + public void targetPartitionSizeOverflows() + { + new Tester() + .targetPartitionSize(Integer.MAX_VALUE) + .testIllegalArgumentException("targetPartitionSize is too large"); + } + + @Test + public void maxRowsPerSegmentMustBePositive() + { + new Tester() + .maxRowsPerSegment(0) + .testIllegalArgumentException("maxRowsPerSegment must be greater than 0"); + } + + @Test + public void maxPartitionSizeMustBePositive() + { + new Tester() + .maxPartitionSize(0) + .testIllegalArgumentException("maxPartitionSize must be greater than 0"); + } + + @Test + public void resolvesMaxFromTargetRowsPerSegment() + { + SingleDimensionPartitionsSpec spec = new Tester() + .targetRowsPerSegment(123) + .build(); + Assert.assertEquals(184, spec.getMaxRowsPerSegment().intValue()); + } + + @Test + public void resolvesMaxFromTargetPartitionSize() + { + SingleDimensionPartitionsSpec spec = new Tester() + .targetPartitionSize(123) + .build(); + Assert.assertEquals(Integer.valueOf(184), spec.getMaxRowsPerSegment()); + } + + @Test + public void resolvesMaxFromMaxRowsPerSegment() + { + SingleDimensionPartitionsSpec spec = new Tester() + .maxRowsPerSegment(123) + .build(); + Assert.assertEquals(123, spec.getMaxRowsPerSegment().intValue()); + } + + @Test + public void resolvesMaxFromMaxPartitionSize() + { + SingleDimensionPartitionsSpec spec = new Tester() + .maxPartitionSize(123) + .build(); + Assert.assertEquals(123, spec.getMaxRowsPerSegment().intValue()); + } + + @Test + public void getPartitionDimensionFromNull() + { + SingleDimensionPartitionsSpec spec = new Tester() + .targetPartitionSize(1) + .partitionDimension(null) + .build(); + Assert.assertEquals(Collections.emptyList(), spec.getPartitionDimensions()); + } + + @Test + public void getPartitionDimensionFromNonNull() + { + String partitionDimension = "a"; + SingleDimensionPartitionsSpec spec = new Tester() + .targetPartitionSize(1) + .partitionDimension(partitionDimension) + .build(); + Assert.assertEquals(Collections.singletonList(partitionDimension), spec.getPartitionDimensions()); + + } + + private static String serialize(SingleDimensionPartitionsSpec spec) + { + try { + return OBJECT_MAPPER.writeValueAsString(spec); + } + catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + private static SingleDimensionPartitionsSpec deserialize(String serialized) + { + try { + return OBJECT_MAPPER.readValue(serialized, SingleDimensionPartitionsSpec.class); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + + private class Tester + { + private Integer targetRowsPerSegment; + private Integer maxRowsPerSegment; + private String partitionDimension; + private Integer targetPartitionSize; + private Integer maxPartitionSize; + + Tester targetRowsPerSegment(Integer targetRowsPerSegment) + { + this.targetRowsPerSegment = targetRowsPerSegment; + return this; + } + + Tester maxRowsPerSegment(Integer maxRowsPerSegment) + { + this.maxRowsPerSegment = maxRowsPerSegment; + return this; + } + + Tester partitionDimension(String partitionDimension) + { + this.partitionDimension = partitionDimension; + return this; + } + + Tester targetPartitionSize(Integer targetPartitionSize) + { + this.targetPartitionSize = targetPartitionSize; + return this; + } + + Tester maxPartitionSize(Integer maxPartitionSize) + { + this.maxPartitionSize = maxPartitionSize; + return this; + } + + void testIllegalArgumentException(String exceptionExpectedMessage) + { + exception.expect(IllegalArgumentException.class); + exception.expectMessage(exceptionExpectedMessage); + build(); + } + + SingleDimensionPartitionsSpec build() + { + return new SingleDimensionPartitionsSpec( + targetRowsPerSegment, + maxRowsPerSegment, + partitionDimension, + SingleDimensionPartitionsSpecTest.ASSUME_GROUPED, + targetPartitionSize, + maxPartitionSize + ); + } + } +} diff --git a/docs/ingestion/hadoop.md b/docs/ingestion/hadoop.md index 7ca812ac85a8..43931d3466f1 100644 --- a/docs/ingestion/hadoop.md +++ b/docs/ingestion/hadoop.md @@ -285,7 +285,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |jobProperties|Object|A map of properties to add to the Hadoop job configuration, see below for details.|no (default == null)| |indexSpec|Object|Tune how data is indexed. See [`indexSpec`](index.md#indexspec) on the main ingestion page for more information.|no| |indexSpecForIntermediatePersists|defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [`indexSpec`](index.md#indexspec) for possible values.|no (default = same as indexSpec)| -|numBackgroundPersistThreads|Integer|The number of new background threads to use for incremental persists. Using this feature causes a notable increase in memory pressure and cpu usage but will make the job finish more quickly. If changing from the default of 0 (use current thread for persists), we recommend setting it to 1.|no (default == 0)| +|numBackgroundPersistThreads|Integer|The number of new background threads to use for incremental persists. Using this feature causes a notable increase in memory pressure and CPU usage but will make the job finish more quickly. If changing from the default of 0 (use current thread for persists), we recommend setting it to 1.|no (default == 0)| |forceExtendableShardSpecs|Boolean|Forces use of extendable shardSpecs. Hash-based partitioning always uses an extendable shardSpec. For single-dimension partitioning, this option should be set to true to use an extendable shardSpec. For partitioning, please check [Partitioning specification](#partitionsspec). This option can be useful when you need to append more data to existing dataSource.|no (default = false)| |useExplicitVersion|Boolean|Forces HadoopIndexTask to use version.|no (default = false)| |logParseExceptions|Boolean|If true, log an error message when a parsing exception occurs, containing information about the row where the error occurred.|false|no| @@ -324,7 +324,7 @@ sized data segments relative to single-dimension partitioning. ```json "partitionsSpec": { "type": "hashed", - "targetPartitionSize": 5000000 + "targetRowsPerSegment": 5000000 } ``` @@ -337,16 +337,19 @@ The configuration options are: |Field|Description|Required| |--------|-----------|---------| |type|Type of partitionSpec to be used.|"hashed"| -|targetPartitionSize|Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|either this or numShards| -|numShards|Specify the number of partitions directly, instead of a target partition size. Ingestion will run faster, since it can skip the step necessary to select a number of partitions automatically.|either this or targetPartitionSize| -|partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions. Only used with numShards, will be ignored when targetPartitionSize is set|no| +|targetRowsPerSegment|Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|either this or `numShards`| +|targetPartitionSize|Deprecated. Renamed to `targetRowsPerSegment`. Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|either this or `numShards`| +|maxRowsPerSegment|Deprecated. Renamed to `targetRowsPerSegment`. Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|either this or `numShards`| +|numShards|Specify the number of partitions directly, instead of a target partition size. Ingestion will run faster, since it can skip the step necessary to select a number of partitions automatically.|either this or `maxRowsPerSegment`| +|partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions. Only used with `numShards`, will be ignored when `targetRowsPerSegment` is set.|no| + ### Single-dimension range partitioning ```json "partitionsSpec": { "type": "single_dim", - "targetPartitionSize": 5000000 + "targetRowsPerSegment": 5000000 } ``` @@ -361,8 +364,10 @@ The configuration options are: |Field|Description|Required| |--------|-----------|---------| |type|Type of partitionSpec to be used.|"single_dim"| -|targetPartitionSize|Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|yes| -|maxPartitionSize|Maximum number of rows to include in a partition. Defaults to 50% larger than the targetPartitionSize.|no| +|targetRowsPerSegment|Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|yes| +|targetPartitionSize|Deprecated. Renamed to `targetRowsPerSegment`. Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|no| +|maxRowsPerSegment|Maximum number of rows to include in a partition. Defaults to 50% larger than the `targetPartitionSize`.|no| +|maxPartitionSize|Deprecated. Use `maxRowsPerSegment` instead. Maximum number of rows to include in a partition. Defaults to 50% larger than the `targetPartitionSize`.|no| |partitionDimension|The dimension to partition on. Leave blank to select a dimension automatically.|no| |assumeGrouped|Assume that input data has already been grouped on time and dimensions. Ingestion will run faster, but may choose sub-optimal partitions if this assumption is violated.|no| diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index cdd92d78775d..8f1cdbce319e 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -229,8 +229,9 @@ For perfect rollup, you should use `hashed`. |property|description|default|required?| |--------|-----------|-------|---------| |type|This should always be `hashed`|none|yes| -|numShards|Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data. `numShards` cannot be specified if `maxRowsPerSegment` is set.|null|no| -|partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions.|null|no| +|targetRowsPerSegment|Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|null|either this or `numShards`| +|numShards|Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data. `numShards` cannot be specified if `targetRowsPerSegment` is set.|null|no| +|partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions. Only used with `numShards`, will be ignored when `targetRowsPerSegment` is set.|null|no| For best-effort rollup, you should use `dynamic`. @@ -629,7 +630,7 @@ For perfect rollup, you should use `hashed`. |--------|-----------|-------|---------| |type|This should always be `hashed`|none|yes| |maxRowsPerSegment|Used in sharding. Determines how many rows are in each segment.|5000000|no| -|numShards|Directly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if maxRowsPerSegment is set.|null|no| +|numShards|Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data. `numShards` cannot be specified if `maxRowsPerSegment` is set.|null|no| |partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions.|null|no| For best-effort rollup, you should use `dynamic`. diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java index f9c4fe8f9806..59245a1f5a23 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java @@ -103,7 +103,7 @@ public class DeterminePartitionsJob implements Jobby private String failureCause; - public DeterminePartitionsJob( + DeterminePartitionsJob( HadoopDruidIndexerConfig config ) { @@ -168,7 +168,7 @@ public boolean run() try { if (!groupByJob.waitForCompletion(true)) { log.error("Job failed: %s", groupByJob.getJobID()); - failureCause = Utils.getFailureMessage(groupByJob, config.JSON_MAPPER); + failureCause = Utils.getFailureMessage(groupByJob, HadoopDruidIndexerConfig.JSON_MAPPER); return false; } } @@ -238,7 +238,7 @@ public boolean run() try { if (!dimSelectionJob.waitForCompletion(true)) { log.error("Job failed: %s", dimSelectionJob.getJobID().toString()); - failureCause = Utils.getFailureMessage(dimSelectionJob, config.JSON_MAPPER); + failureCause = Utils.getFailureMessage(dimSelectionJob, HadoopDruidIndexerConfig.JSON_MAPPER); return false; } } @@ -262,7 +262,7 @@ public boolean run() fileSystem = partitionInfoPath.getFileSystem(dimSelectionJob.getConfiguration()); } if (Utils.exists(dimSelectionJob, fileSystem, partitionInfoPath)) { - List specs = config.JSON_MAPPER.readValue( + List specs = HadoopDruidIndexerConfig.JSON_MAPPER.readValue( Utils.openInputStream(dimSelectionJob, partitionInfoPath), new TypeReference>() { } @@ -298,14 +298,13 @@ public Map getStats() try { Counters jobCounters = groupByJob.getCounters(); - Map metrics = TaskMetricsUtils.makeIngestionRowMetrics( + return TaskMetricsUtils.makeIngestionRowMetrics( jobCounters.findCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_PROCESSED_COUNTER).getValue(), - jobCounters.findCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_PROCESSED_WITH_ERRORS_COUNTER).getValue(), + jobCounters.findCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_PROCESSED_WITH_ERRORS_COUNTER) + .getValue(), jobCounters.findCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_UNPARSEABLE_COUNTER).getValue(), jobCounters.findCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_THROWN_AWAY_COUNTER).getValue() ); - - return metrics; } catch (IllegalStateException ise) { log.debug("Couldn't get counters due to job state"); @@ -433,13 +432,13 @@ protected void innerMap( * Since we have two slightly different DimSelectionMappers, this class encapsulates the shared logic for * emitting dimension value counts. */ - public static class DeterminePartitionsDimSelectionMapperHelper + static class DeterminePartitionsDimSelectionMapperHelper { private final HadoopDruidIndexerConfig config; private final String partitionDimension; private final Map intervalIndexes; - public DeterminePartitionsDimSelectionMapperHelper(HadoopDruidIndexerConfig config, String partitionDimension) + DeterminePartitionsDimSelectionMapperHelper(HadoopDruidIndexerConfig config, String partitionDimension) { this.config = config; this.partitionDimension = partitionDimension; @@ -454,7 +453,7 @@ public DeterminePartitionsDimSelectionMapperHelper(HadoopDruidIndexerConfig conf this.intervalIndexes = timeIndexBuilder.build(); } - public void emitDimValueCounts( + void emitDimValueCounts( TaskInputOutputContext context, DateTime timestamp, Map> dims @@ -568,7 +567,7 @@ protected abstract void innerReduce( Iterable combinedIterable ) throws IOException, InterruptedException; - private Iterable combineRows(Iterable input) + private static Iterable combineRows(Iterable input) { return new CombiningIterable<>( Iterables.transform( @@ -771,7 +770,7 @@ protected void innerReduce(Context context, SortableBytes keyBytes, Iterable partitionsSpec.getMaxPartitionSize()) { + if (partition.rows > partitionsSpec.getMaxRowsPerSegment()) { log.info("Dimension[%s] has an oversized shard: %s", dimPartitions.dim, partition.shardSpec); oversized = true; } @@ -857,7 +856,7 @@ public void close(TaskAttemptContext context) @Override public void checkOutputSpecs(JobContext job) throws IOException { - Path outDir = getOutputPath(job); + Path outDir = FileOutputFormat.getOutputPath(job); if (outDir == null) { throw new InvalidJobConfException("Output directory not set."); } @@ -874,7 +873,7 @@ private DimPartitions(String dim) this.dim = dim; } - public int getCardinality() + int getCardinality() { int sum = 0; for (final DimPartition dimPartition : partitions) { @@ -883,7 +882,7 @@ public int getCardinality() return sum; } - public long getDistanceSquaredFromTarget(long target) + long getDistanceSquaredFromTarget(long target) { long distance = 0; for (final DimPartition dimPartition : partitions) { @@ -907,7 +906,7 @@ public long getRows() private static class DimPartition { public ShardSpec shardSpec = null; - public int cardinality = 0; + int cardinality = 0; public long rows = 0; } @@ -924,12 +923,12 @@ private DimValueCount(String dim, String value, long numRows) this.numRows = numRows; } - public Text toText() + Text toText() { return new Text(TAB_JOINER.join(dim, String.valueOf(numRows), value)); } - public static DimValueCount fromText(Text text) + static DimValueCount fromText(Text text) { final Iterator splits = TAB_SPLITTER.limit(3).split(text.toString()).iterator(); final String dim = splits.next(); diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java index 35174f56dfd7..89b7fde43d61 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java @@ -51,16 +51,15 @@ @RunWith(Parameterized.class) public class DeterminePartitionsJobTest { - - private HadoopDruidIndexerConfig config; - private int expectedNumOfSegments; - private int[] expectedNumOfShardsForEachSegment; - private String[][][] expectedStartEndForEachShard; - private File dataFile; - private File tmpDir; + private final HadoopDruidIndexerConfig config; + private final int expectedNumOfSegments; + private final int[] expectedNumOfShardsForEachSegment; + private final String[][][] expectedStartEndForEachShard; + private final File dataFile; + private final File tmpDir; @Parameterized.Parameters(name = "assumeGrouped={0}, " - + "targetPartitionSize={1}, " + + "maxRowsPerSegment={1}, " + "interval={2}" + "expectedNumOfSegments={3}, " + "expectedNumOfShardsForEachSegment={4}, " @@ -82,7 +81,7 @@ public static Collection constructFeed() {"c.example.com", "e.example.com"}, {"e.example.com", "g.example.com"}, {"g.example.com", "i.example.com"}, - {"i.example.com", null } + {"i.example.com", null} } }, ImmutableList.of( @@ -222,7 +221,7 @@ public static Collection constructFeed() public DeterminePartitionsJobTest( boolean assumeGrouped, - Integer targetPartitionSize, + Integer maxRowsPerSegment, String interval, int expectedNumOfSegments, int[] expectedNumOfShardsForEachSegment, @@ -249,7 +248,11 @@ public DeterminePartitionsJobTest( new StringInputRowParser( new CSVParseSpec( new TimestampSpec("timestamp", "yyyyMMddHH", null), - new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host", "country")), null, null), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas(ImmutableList.of("host", "country")), + null, + null + ), null, ImmutableList.of("timestamp", "host", "country", "visited_num"), false, @@ -281,7 +284,7 @@ public DeterminePartitionsJobTest( new HadoopTuningConfig( tmpDir.getCanonicalPath(), null, - new SingleDimensionPartitionsSpec(targetPartitionSize, null, null, assumeGrouped), + new SingleDimensionPartitionsSpec(null, maxRowsPerSegment, null, assumeGrouped), null, null, null, @@ -319,9 +322,9 @@ public void testPartitionJob() Assert.assertEquals(expectedNumOfSegments, config.getSchema().getTuningConfig().getShardSpecs().size()); for (Map.Entry> entry : config.getSchema() - .getTuningConfig() - .getShardSpecs() - .entrySet()) { + .getTuningConfig() + .getShardSpecs() + .entrySet()) { int partitionNum = 0; List specs = entry.getValue(); Assert.assertEquals(expectedNumOfShardsForEachSegment[segmentNum], specs.size()); diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopIngestionSpecTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopIngestionSpecTest.java index fbd13c92f560..8bafae5d15a8 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopIngestionSpecTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopIngestionSpecTest.java @@ -150,8 +150,8 @@ public void testPartitionsSpecAutoHashed() Assert.assertEquals( "getTargetPartitionSize", - partitionsSpec.getMaxRowsPerSegment().intValue(), - 100 + 100, + partitionsSpec.getMaxRowsPerSegment().intValue() ); Assert.assertTrue( @@ -167,14 +167,13 @@ public void testPartitionsSpecMaxPartitionSize() try { schema = jsonReadWriteRead( - "{\n" + " \"tuningConfig\": {\n" + " \"type\": \"hadoop\",\n" + " \"partitionsSpec\": {\n" + " \"type\": \"dimension\",\n" + " \"targetPartitionSize\": 100,\n" - + " \"maxPartitionSize\" : 200,\n" + + " \"maxPartitionSize\" : null,\n" + " \"partitionDimension\" : \"foo\"\n" + " }\n" + " }\n" @@ -186,32 +185,29 @@ public void testPartitionsSpecMaxPartitionSize() throw new RuntimeException(e); } - final SingleDimensionPartitionsSpec partitionsSpec = - (SingleDimensionPartitionsSpec) schema.getTuningConfig().getPartitionsSpec(); + PartitionsSpec partitionsSpec = schema.getTuningConfig().getPartitionsSpec(); + Assert.assertTrue("partitionsSpec", partitionsSpec instanceof SingleDimensionPartitionsSpec); - Assert.assertEquals( - "isDeterminingPartitions", - partitionsSpec.needsDeterminePartitions(true), - true - ); + SingleDimensionPartitionsSpec singleDimensionPartitionsSpec = (SingleDimensionPartitionsSpec) partitionsSpec; + + Assert.assertTrue("isDeterminingPartitions", singleDimensionPartitionsSpec.needsDeterminePartitions(true)); Assert.assertEquals( "getTargetPartitionSize", - partitionsSpec.getMaxRowsPerSegment().intValue(), - 100 + 100, + singleDimensionPartitionsSpec.getTargetRowsPerSegment().intValue() ); Assert.assertEquals( "getMaxPartitionSize", - partitionsSpec.getMaxPartitionSize(), - 200 + 150, + singleDimensionPartitionsSpec.getMaxRowsPerSegment().intValue() ); - Assert.assertTrue("partitionsSpec", partitionsSpec instanceof SingleDimensionPartitionsSpec); Assert.assertEquals( "getPartitionDimension", - partitionsSpec.getPartitionDimension(), - "foo" + "foo", + singleDimensionPartitionsSpec.getPartitionDimension() ); } @@ -262,15 +258,11 @@ public void testDefaultSettings() Assert.assertEquals( "cleanupOnFailure", - schema.getTuningConfig().isCleanupOnFailure(), - true + true, + schema.getTuningConfig().isCleanupOnFailure() ); - Assert.assertEquals( - "overwriteFiles", - schema.getTuningConfig().isOverwriteFiles(), - false - ); + Assert.assertFalse("overwriteFiles", schema.getTuningConfig().isOverwriteFiles()); Assert.assertFalse( "isDeterminingPartitions", @@ -324,14 +316,10 @@ public void testNoCleanupOnFailure() throw new RuntimeException(e); } - Assert.assertEquals( - "cleanupOnFailure", - schema.getTuningConfig().isCleanupOnFailure(), - false - ); + Assert.assertFalse("cleanupOnFailure", schema.getTuningConfig().isCleanupOnFailure()); } - private T jsonReadWriteRead(String s, Class klass) + private static T jsonReadWriteRead(String s, Class klass) { try { return JSON_MAPPER.readValue(JSON_MAPPER.writeValueAsBytes(JSON_MAPPER.readValue(s, klass)), klass); diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/partitions/HashedPartitionsSpecTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/partitions/HashedPartitionsSpecTest.java index 3e13fa2239f7..82189f2861ba 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/partitions/HashedPartitionsSpecTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/partitions/HashedPartitionsSpecTest.java @@ -23,90 +23,129 @@ import com.google.common.collect.ImmutableList; import org.apache.druid.jackson.DefaultObjectMapper; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; -/** - */ public class HashedPartitionsSpecTest { private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper(); + @Rule + public ExpectedException exception = ExpectedException.none(); + @Test public void testHashedPartitionsSpec() { - { - final PartitionsSpec partitionsSpec = jsonReadWriteRead( - "{" - + " \"targetPartitionSize\":100," - + " \"type\":\"hashed\"" - + "}", - PartitionsSpec.class - ); - Assert.assertTrue("partitionsSpec", partitionsSpec instanceof HashedPartitionsSpec); - final HashedPartitionsSpec hadoopHashedPartitionsSpec = (HashedPartitionsSpec) partitionsSpec; - - Assert.assertEquals( - "isDeterminingPartitions", - hadoopHashedPartitionsSpec.needsDeterminePartitions(true), - true - ); - - Assert.assertEquals( - "getTargetPartitionSize", - hadoopHashedPartitionsSpec.getMaxRowsPerSegment().intValue(), - 100 - ); - - Assert.assertEquals( - "getPartitionDimensions", - hadoopHashedPartitionsSpec.getPartitionDimensions(), - ImmutableList.of() - ); - } + final HashedPartitionsSpec hadoopHashedPartitionsSpec = jsonReadWriteRead( + "{" + + " \"targetRowsPerSegment\":100," + + " \"type\":\"hashed\"" + + "}" + ); + + Assert.assertTrue("isDeterminingPartitions", hadoopHashedPartitionsSpec.needsDeterminePartitions(true)); + + Assert.assertNotNull(hadoopHashedPartitionsSpec.getMaxRowsPerSegment()); + Assert.assertEquals( + "getMaxRowsPerSegment", + 100, + hadoopHashedPartitionsSpec.getMaxRowsPerSegment().intValue() + ); + + Assert.assertEquals( + "getPartitionDimensions", + ImmutableList.of(), + hadoopHashedPartitionsSpec.getPartitionDimensions() + ); } @Test public void testHashedPartitionsSpecShardCount() { - final PartitionsSpec partitionsSpec = jsonReadWriteRead( + final HashedPartitionsSpec hadoopHashedPartitionsSpec = jsonReadWriteRead( "{" + " \"type\":\"hashed\"," + " \"numShards\":2" - + "}", - PartitionsSpec.class + + "}" ); - Assert.assertTrue("partitionsSpec", partitionsSpec instanceof HashedPartitionsSpec); - final HashedPartitionsSpec hadoopHashedPartitionsSpec = (HashedPartitionsSpec) partitionsSpec; - Assert.assertEquals( - "isDeterminingPartitions", - hadoopHashedPartitionsSpec.needsDeterminePartitions(true), - false - ); + Assert.assertFalse("isDeterminingPartitions", hadoopHashedPartitionsSpec.needsDeterminePartitions(true)); Assert.assertNull( - "getTargetPartitionSize", + "getMaxRowsPerSegment", hadoopHashedPartitionsSpec.getMaxRowsPerSegment() ); + Assert.assertNotNull(hadoopHashedPartitionsSpec.getNumShards()); Assert.assertEquals( "shardCount", - hadoopHashedPartitionsSpec.getNumShards().intValue(), - 2 + 2, + hadoopHashedPartitionsSpec.getNumShards().intValue() ); Assert.assertEquals( "getPartitionDimensions", - hadoopHashedPartitionsSpec.getPartitionDimensions(), - ImmutableList.of() + ImmutableList.of(), + hadoopHashedPartitionsSpec.getPartitionDimensions() + ); + } + + @Test + public void testHashedPartitionsSpecBothTargetForbidden() + { + exception.expect(RuntimeException.class); + exception.expectMessage("At most one of targetRowsPerSegment or targetPartitionSize must be present"); + + String json = "{" + + "\"type\":\"hashed\"" + + ",\"targetRowsPerSegment\":100" + + ",\"targetPartitionSize\":100" + + "}"; + jsonReadWriteRead(json); + } + + @Test + public void testHashedPartitionsSpecBackwardCompatibleTargetPartitionSize() + { + String json = "{" + + "\"type\":\"hashed\"" + + ",\"targetPartitionSize\":100" + + "}"; + HashedPartitionsSpec hadoopHashedPartitionsSpec = jsonReadWriteRead(json); + + Assert.assertNotNull(hadoopHashedPartitionsSpec.getMaxRowsPerSegment()); + Assert.assertEquals( + "getMaxRowsPerSegment", + 100, + hadoopHashedPartitionsSpec.getMaxRowsPerSegment().intValue() ); + } + + @Test + public void testHashedPartitionsSpecBackwardCompatibleMaxRowsPerSegment() + { + String json = "{" + + "\"type\":\"hashed\"" + + ",\"maxRowsPerSegment\":100" + + "}"; + HashedPartitionsSpec hadoopHashedPartitionsSpec = jsonReadWriteRead(json); + Assert.assertNotNull(hadoopHashedPartitionsSpec.getMaxRowsPerSegment()); + Assert.assertEquals( + "getMaxRowsPerSegment", + 100, + hadoopHashedPartitionsSpec.getMaxRowsPerSegment().intValue() + ); } - - private T jsonReadWriteRead(String s, Class klass) + + private static HashedPartitionsSpec jsonReadWriteRead(String s) { try { - return JSON_MAPPER.readValue(JSON_MAPPER.writeValueAsBytes(JSON_MAPPER.readValue(s, klass)), klass); + byte[] jsonBytes = JSON_MAPPER.writeValueAsBytes(JSON_MAPPER.readValue(s, PartitionsSpec.class)); + PartitionsSpec partitionsSpec = JSON_MAPPER.readValue(jsonBytes, PartitionsSpec.class); + Assert.assertTrue("partitionsSpec", partitionsSpec instanceof HashedPartitionsSpec); + return (HashedPartitionsSpec) partitionsSpec; } catch (Exception e) { throw new RuntimeException(e); diff --git a/website/.spelling b/website/.spelling index 5e4553a04d87..0b875ed486ea 100644 --- a/website/.spelling +++ b/website/.spelling @@ -864,6 +864,7 @@ segmentTable shardSpec single_dim targetPartitionSize +targetRowsPerSegment useCombiner useExplicitVersion useNewAggs