From d2e556a37c635c4a78a6971f122fb85362e4f4d8 Mon Sep 17 00:00:00 2001 From: Chi Cao Minh Date: Tue, 10 Sep 2019 18:01:12 -0700 Subject: [PATCH 1/4] Rename partition spec fields Rename partition spec fields to be consistent across the various types (hashed, single_dim, dynamic). Specifically, use targetNumRowsPerSegment and maxRowsPerSegment in favor of targetPartitionSize and maxSegmentSize. Consistent and clearer names are easier for users to understand and use. Also fix various IntelliJ inspection warnings and doc spelling mistakes. --- .../DimensionBasedPartitionsSpec.java | 6 + .../partitions/DynamicPartitionsSpec.java | 3 +- .../partitions/HashedPartitionsSpec.java | 12 +- .../indexer/partitions/PartitionsSpec.java | 9 +- .../SingleDimensionPartitionsSpec.java | 179 ++++++++--- .../SingleDimensionPartitionsSpecTest.java | 279 ++++++++++++++++++ docs/ingestion/hadoop.md | 23 +- docs/ingestion/native-batch.md | 4 +- .../druid/indexer/DeterminePartitionsJob.java | 37 ++- .../indexer/HadoopIngestionSpecTest.java | 50 ++-- .../partitions/HashedPartitionsSpecTest.java | 62 ++-- 11 files changed, 515 insertions(+), 149 deletions(-) create mode 100644 core/src/test/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpecTest.java diff --git a/core/src/main/java/org/apache/druid/indexer/partitions/DimensionBasedPartitionsSpec.java b/core/src/main/java/org/apache/druid/indexer/partitions/DimensionBasedPartitionsSpec.java index b1f4e322a9ce..58c0189ee729 100644 --- a/core/src/main/java/org/apache/druid/indexer/partitions/DimensionBasedPartitionsSpec.java +++ b/core/src/main/java/org/apache/druid/indexer/partitions/DimensionBasedPartitionsSpec.java @@ -26,5 +26,11 @@ */ public interface DimensionBasedPartitionsSpec extends PartitionsSpec { + String TARGET_ROWS_PER_SEGMENT = "targetRowsPerSegment"; + + // Deprecated properties preserved for backward compatibility: + @Deprecated + String TARGET_PARTITION_SIZE = "targetPartitionSize"; + List getPartitionDimensions(); } diff --git a/core/src/main/java/org/apache/druid/indexer/partitions/DynamicPartitionsSpec.java b/core/src/main/java/org/apache/druid/indexer/partitions/DynamicPartitionsSpec.java index fb326633015a..242c0918c675 100644 --- a/core/src/main/java/org/apache/druid/indexer/partitions/DynamicPartitionsSpec.java +++ b/core/src/main/java/org/apache/druid/indexer/partitions/DynamicPartitionsSpec.java @@ -31,13 +31,14 @@ public class DynamicPartitionsSpec implements PartitionsSpec { public static final long DEFAULT_MAX_TOTAL_ROWS = 20_000_000; + static final String NAME = "dynamic"; private final int maxRowsPerSegment; private final long maxTotalRows; @JsonCreator public DynamicPartitionsSpec( - @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment, + @JsonProperty(PartitionsSpec.MAX_ROWS_PER_SEGMENT) @Nullable Integer maxRowsPerSegment, @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows ) { diff --git a/core/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java b/core/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java index 8002fb5af7d9..f51aed718aaf 100644 --- a/core/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java +++ b/core/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java @@ -31,6 +31,7 @@ public class HashedPartitionsSpec implements DimensionBasedPartitionsSpec { + static final String NAME = "hashed"; private static final Logger LOG = new Logger(HashedPartitionsSpec.class); @Nullable @@ -50,15 +51,18 @@ public HashedPartitionsSpec( @Nullable List partitionDimensions ) { - this(null, maxRowsPerSegment, numShards, partitionDimensions); + this(maxRowsPerSegment, numShards, partitionDimensions, null); } @JsonCreator public HashedPartitionsSpec( - @JsonProperty("targetPartitionSize") @Deprecated @Nullable Integer targetPartitionSize, - @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment, + @JsonProperty(PartitionsSpec.MAX_ROWS_PER_SEGMENT) @Nullable Integer maxRowsPerSegment, @JsonProperty("numShards") @Nullable Integer numShards, - @JsonProperty("partitionDimensions") @Nullable List partitionDimensions + @JsonProperty("partitionDimensions") @Nullable List partitionDimensions, + + // Deprecated properties preserved for backward compatibility: + @Deprecated @JsonProperty(DimensionBasedPartitionsSpec.TARGET_PARTITION_SIZE) @Nullable + Integer targetPartitionSize ) { Preconditions.checkArgument( diff --git a/core/src/main/java/org/apache/druid/indexer/partitions/PartitionsSpec.java b/core/src/main/java/org/apache/druid/indexer/partitions/PartitionsSpec.java index 2f7396d168e0..c4961e9c0c69 100644 --- a/core/src/main/java/org/apache/druid/indexer/partitions/PartitionsSpec.java +++ b/core/src/main/java/org/apache/druid/indexer/partitions/PartitionsSpec.java @@ -29,14 +29,15 @@ */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = HashedPartitionsSpec.class) @JsonSubTypes(value = { - @JsonSubTypes.Type(name = "single_dim", value = SingleDimensionPartitionsSpec.class), - @JsonSubTypes.Type(name = "dimension", value = SingleDimensionPartitionsSpec.class), // for backward compatibility - @JsonSubTypes.Type(name = "hashed", value = HashedPartitionsSpec.class), - @JsonSubTypes.Type(name = "dynamic", value = DynamicPartitionsSpec.class) + @JsonSubTypes.Type(name = SingleDimensionPartitionsSpec.NAME, value = SingleDimensionPartitionsSpec.class), + @JsonSubTypes.Type(name = SingleDimensionPartitionsSpec.OLD_NAME, value = SingleDimensionPartitionsSpec.class), // for backward compatibility + @JsonSubTypes.Type(name = HashedPartitionsSpec.NAME, value = HashedPartitionsSpec.class), + @JsonSubTypes.Type(name = DynamicPartitionsSpec.NAME, value = DynamicPartitionsSpec.class) }) public interface PartitionsSpec { int DEFAULT_MAX_ROWS_PER_SEGMENT = 5_000_000; + String MAX_ROWS_PER_SEGMENT = "maxRowsPerSegment"; /** * Returns the max number of rows per segment. diff --git a/core/src/main/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpec.java b/core/src/main/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpec.java index aa6ef87f3804..8b5b108592ef 100644 --- a/core/src/main/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpec.java +++ b/core/src/main/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpec.java @@ -21,76 +21,158 @@ import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; import java.util.Collections; import java.util.List; import java.util.Objects; +/** + * Partition a segment by a single dimension. + */ public class SingleDimensionPartitionsSpec implements DimensionBasedPartitionsSpec { - private final int maxRowsPerSegment; - private final int maxPartitionSize; - @Nullable + static final String NAME = "single_dim"; + static final String OLD_NAME = "dimension"; // for backward compatibility + + private static final String MAX_PARTITION_SIZE = "maxPartitionSize"; + + private final Integer targetRowsPerSegment; + private final Integer maxRowsPerSegment; private final String partitionDimension; private final boolean assumeGrouped; - public SingleDimensionPartitionsSpec( - int maxRowsPerSegment, - @Nullable Integer maxPartitionSize, - @Nullable String partitionDimension, - boolean assumeGrouped - ) - { - this(null, maxRowsPerSegment, maxPartitionSize, partitionDimension, assumeGrouped); - } + // Values for these fields are derived from the one above: + private final int resolvedMaxRowPerSegment; @JsonCreator public SingleDimensionPartitionsSpec( - @JsonProperty("targetPartitionSize") @Nullable Integer targetPartitionSize, - @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment, - @JsonProperty("maxPartitionSize") @Nullable Integer maxPartitionSize, + @JsonProperty(DimensionBasedPartitionsSpec.TARGET_ROWS_PER_SEGMENT) @Nullable Integer targetRowsPerSegment, + @JsonProperty(PartitionsSpec.MAX_ROWS_PER_SEGMENT) @Nullable Integer maxRowsPerSegment, @JsonProperty("partitionDimension") @Nullable String partitionDimension, - @JsonProperty("assumeGrouped") boolean assumeGrouped // false by default + @JsonProperty("assumeGrouped") boolean assumeGrouped, // false by default + + // Deprecated properties preserved for backward compatibility: + @Deprecated @JsonProperty(DimensionBasedPartitionsSpec.TARGET_PARTITION_SIZE) @Nullable + Integer targetPartitionSize, // prefer targetRowsPerSegment + @Deprecated @JsonProperty(MAX_PARTITION_SIZE) @Nullable + Integer maxPartitionSize // prefer maxRowsPerSegment ) { - Preconditions.checkArgument( - PartitionsSpec.isEffectivelyNull(targetPartitionSize) || PartitionsSpec.isEffectivelyNull(maxRowsPerSegment), - "Can't set both targetPartitionSize and maxRowsPerSegment" + Property target = checkAtMostOneNotNull( + TARGET_ROWS_PER_SEGMENT, + targetRowsPerSegment, + TARGET_PARTITION_SIZE, + targetPartitionSize + ); + + Property max = checkAtMostOneNotNull( + MAX_ROWS_PER_SEGMENT, + maxRowsPerSegment, + MAX_PARTITION_SIZE, + maxPartitionSize ); + Preconditions.checkArgument( - !PartitionsSpec.isEffectivelyNull(targetPartitionSize) || !PartitionsSpec.isEffectivelyNull(maxRowsPerSegment), - "Either targetPartitionSize or maxRowsPerSegment must be specified" + (target.value == null) != (max.value == null), + "Exactly one of " + target.name + " or " + max.name + " must be present" ); - final int realMaxRowsPerSegment = targetPartitionSize == null ? maxRowsPerSegment : targetPartitionSize; - Preconditions.checkArgument(realMaxRowsPerSegment > 0, "maxRowsPerSegment must be specified"); - this.maxRowsPerSegment = realMaxRowsPerSegment; - this.maxPartitionSize = PartitionsSpec.isEffectivelyNull(maxPartitionSize) - ? Math.addExact(realMaxRowsPerSegment, (int) (realMaxRowsPerSegment * 0.5)) - : maxPartitionSize; + this.partitionDimension = partitionDimension; this.assumeGrouped = assumeGrouped; + this.targetRowsPerSegment = target.value; + this.maxRowsPerSegment = max.value; + + this.resolvedMaxRowPerSegment = resolveMaxRowsPerSegment(target, max); + } + + @VisibleForTesting + public SingleDimensionPartitionsSpec( + @Nullable Integer targetRowsPerSegment, + @Nullable Integer maxRowsPerSegment, + @Nullable String partitionDimension, + boolean assumeGrouped + ) + { + this(targetRowsPerSegment, maxRowsPerSegment, partitionDimension, assumeGrouped, null, null); + } + + /** + * @return Non-null value, or first one if both are null + */ + @SuppressWarnings("VariableNotUsedInsideIf") // false positive: checked for 'null' not used inside 'if + private static Property checkAtMostOneNotNull(String name1, Integer value1, String name2, Integer value2) + { + final Property property; + + if (value1 == null && value2 == null) { + property = new Property<>(name1, value1); + } else if (value1 == null) { + property = new Property<>(name2, value2); + } else if (value2 == null) { + property = new Property<>(name1, value1); + } else { + throw new IllegalArgumentException("At most one of " + name1 + " or " + name2 + " must be present"); + } + + return property; + } + + private static int resolveMaxRowsPerSegment(Property target, Property max) + { + final int resolvedValue; + + if (target.value != null) { + Preconditions.checkArgument(target.value > 0, target.name + " must be greater than 0"); + try { + resolvedValue = Math.addExact(target.value, (target.value / 2)); + } + catch (ArithmeticException e) { + throw new IllegalArgumentException(target.name + " is too large"); + } + } else { + Preconditions.checkArgument(max.value > 0, max.name + " must be greater than 0"); + resolvedValue = max.value; + } + return resolvedValue; + } + + private static class Property + { + private final String name; + private final T value; + + Property(String name, T value) + { + this.name = name; + this.value = value; + } } - @Override @JsonProperty - public Integer getMaxRowsPerSegment() + @Nullable + public Integer getTargetRowsPerSegment() { - return maxRowsPerSegment; + return targetRowsPerSegment; } + @JsonIgnore @Override - public boolean needsDeterminePartitions(boolean useForHadoopTask) + @NotNull + public Integer getMaxRowsPerSegment() { - return true; + return resolvedMaxRowPerSegment; // NOTE: This returns the *resolved* value } - @JsonProperty - public int getMaxPartitionSize() + @JsonProperty(MAX_ROWS_PER_SEGMENT) + private Integer getMaxRowsPerSegmentForJson() { - return maxPartitionSize; + return maxRowsPerSegment; } @JsonProperty @@ -106,12 +188,19 @@ public boolean isAssumeGrouped() return assumeGrouped; } + @JsonIgnore @Override public List getPartitionDimensions() { return partitionDimension == null ? Collections.emptyList() : Collections.singletonList(partitionDimension); } + @Override + public boolean needsDeterminePartitions(boolean useForHadoopTask) + { + return true; + } + @Override public boolean equals(Object o) { @@ -122,26 +211,34 @@ public boolean equals(Object o) return false; } SingleDimensionPartitionsSpec that = (SingleDimensionPartitionsSpec) o; - return maxRowsPerSegment == that.maxRowsPerSegment && - maxPartitionSize == that.maxPartitionSize && - assumeGrouped == that.assumeGrouped && + return assumeGrouped == that.assumeGrouped && + resolvedMaxRowPerSegment == that.resolvedMaxRowPerSegment && + Objects.equals(targetRowsPerSegment, that.targetRowsPerSegment) && + Objects.equals(maxRowsPerSegment, that.maxRowsPerSegment) && Objects.equals(partitionDimension, that.partitionDimension); } @Override public int hashCode() { - return Objects.hash(maxRowsPerSegment, maxPartitionSize, partitionDimension, assumeGrouped); + return Objects.hash( + targetRowsPerSegment, + maxRowsPerSegment, + partitionDimension, + assumeGrouped, + resolvedMaxRowPerSegment + ); } @Override public String toString() { return "SingleDimensionPartitionsSpec{" + - "maxRowsPerSegment=" + maxRowsPerSegment + - ", maxPartitionSize=" + maxPartitionSize + + "targetRowsPerSegment=" + targetRowsPerSegment + + ", maxRowsPerSegment=" + maxRowsPerSegment + ", partitionDimension='" + partitionDimension + '\'' + ", assumeGrouped=" + assumeGrouped + + ", resolvedMaxRowPerSegment=" + resolvedMaxRowPerSegment + '}'; } } diff --git a/core/src/test/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpecTest.java b/core/src/test/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpecTest.java new file mode 100644 index 000000000000..12c4c84569b3 --- /dev/null +++ b/core/src/test/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpecTest.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexer.partitions; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.Collections; + +public class SingleDimensionPartitionsSpecTest +{ + private static final Integer TARGET_ROWS_PER_SEGMENT = 1; + private static final Integer MAX_ROWS_PER_SEGMENT = null; + private static final String PARTITION_DIMENSION = "a"; + private static final boolean ASSUME_GROUPED = false; + private static final SingleDimensionPartitionsSpec SPEC = new SingleDimensionPartitionsSpec( + TARGET_ROWS_PER_SEGMENT, + MAX_ROWS_PER_SEGMENT, + PARTITION_DIMENSION, + ASSUME_GROUPED + ); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @Test + public void serde() + { + String json = serialize(SPEC); + SingleDimensionPartitionsSpec spec = deserialize(json); + Assert.assertEquals(SPEC, spec); + } + + @Test + public void deserializeWithBackwardCompatibility() + { + String serialized = "{" + + "\"type\":\"" + SingleDimensionPartitionsSpec.NAME + "\"" + + ",\"targetPartitionSize\":" + TARGET_ROWS_PER_SEGMENT // test backward-compatible for this + + ",\"maxPartitionSize\":" + MAX_ROWS_PER_SEGMENT // test backward-compatible for this + + ",\"partitionDimension\":\"" + PARTITION_DIMENSION + "\"" + + ",\"assumeGrouped\":" + ASSUME_GROUPED + + "}"; + SingleDimensionPartitionsSpec spec = deserialize(serialized); + Assert.assertEquals(SPEC, spec); + } + + @Test + public void havingBothTargetForbidden() + { + new Tester() + .targetRowsPerSegment(1) + .targetPartitionSize(1) + .testIllegalArgumentException("At most one of targetRowsPerSegment or targetPartitionSize must be present"); + } + + @Test + public void havingBothMaxForbidden() + { + new Tester() + .maxRowsPerSegment(1) + .maxPartitionSize(1) + .testIllegalArgumentException("At most one of maxRowsPerSegment or maxPartitionSize must be present"); + } + + @Test + public void havingNeitherTargetNorMaxForbidden() + { + new Tester() + .testIllegalArgumentException("Exactly one of targetRowsPerSegment or maxRowsPerSegment must be present"); + + } + + @Test + public void targetRowsPerSegmentMustBePositive() + { + new Tester() + .targetRowsPerSegment(0) + .testIllegalArgumentException("targetRowsPerSegment must be greater than 0"); + } + + @Test + public void targetPartitionSizeMustBePositive() + { + new Tester() + .targetPartitionSize(0) + .testIllegalArgumentException("targetPartitionSize must be greater than 0"); + } + + @Test + public void targetMaxRowsPerSegmentOverflows() + { + new Tester() + .targetRowsPerSegment(Integer.MAX_VALUE) + .testIllegalArgumentException("targetRowsPerSegment is too large"); + } + + @Test + public void targetPartitionSizeOverflows() + { + new Tester() + .targetPartitionSize(Integer.MAX_VALUE) + .testIllegalArgumentException("targetPartitionSize is too large"); + } + + @Test + public void maxRowsPerSegmentMustBePositive() + { + new Tester() + .maxRowsPerSegment(0) + .testIllegalArgumentException("maxRowsPerSegment must be greater than 0"); + } + + @Test + public void maxPartitionSizeMustBePositive() + { + new Tester() + .maxPartitionSize(0) + .testIllegalArgumentException("maxPartitionSize must be greater than 0"); + } + + @Test + public void resolvesMaxFromTargetRowsPerSegment() + { + SingleDimensionPartitionsSpec spec = new Tester() + .targetRowsPerSegment(123) + .build(); + Assert.assertEquals(184, spec.getMaxRowsPerSegment().intValue()); + } + + @Test + public void resolvesMaxFromTargetPartitionSize() + { + SingleDimensionPartitionsSpec spec = new Tester() + .targetPartitionSize(123) + .build(); + Assert.assertEquals(Integer.valueOf(184), spec.getMaxRowsPerSegment()); + } + + @Test + public void resolvesMaxFromMaxRowsPerSegment() + { + SingleDimensionPartitionsSpec spec = new Tester() + .maxRowsPerSegment(123) + .build(); + Assert.assertEquals(123, spec.getMaxRowsPerSegment().intValue()); + } + + @Test + public void resolvesMaxFromMaxPartitionSize() + { + SingleDimensionPartitionsSpec spec = new Tester() + .maxPartitionSize(123) + .build(); + Assert.assertEquals(123, spec.getMaxRowsPerSegment().intValue()); + } + + @Test + public void getPartitionDimensionFromNull() + { + SingleDimensionPartitionsSpec spec = new Tester() + .targetPartitionSize(1) + .partitionDimension(null) + .build(); + Assert.assertEquals(Collections.emptyList(), spec.getPartitionDimensions()); + } + + @Test + public void getPartitionDimensionFromNonNull() + { + String partitionDimension = "a"; + SingleDimensionPartitionsSpec spec = new Tester() + .targetPartitionSize(1) + .partitionDimension(partitionDimension) + .build(); + Assert.assertEquals(Collections.singletonList(partitionDimension), spec.getPartitionDimensions()); + + } + + private static String serialize(SingleDimensionPartitionsSpec spec) + { + try { + return OBJECT_MAPPER.writeValueAsString(spec); + } + catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + private static SingleDimensionPartitionsSpec deserialize(String serialized) + { + try { + return OBJECT_MAPPER.readValue(serialized, SingleDimensionPartitionsSpec.class); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + + private class Tester + { + private Integer targetRowsPerSegment; + private Integer maxRowsPerSegment; + private String partitionDimension; + private Integer targetPartitionSize; + private Integer maxPartitionSize; + + Tester targetRowsPerSegment(Integer targetRowsPerSegment) + { + this.targetRowsPerSegment = targetRowsPerSegment; + return this; + } + + Tester maxRowsPerSegment(Integer maxRowsPerSegment) + { + this.maxRowsPerSegment = maxRowsPerSegment; + return this; + } + + Tester partitionDimension(String partitionDimension) + { + this.partitionDimension = partitionDimension; + return this; + } + + Tester targetPartitionSize(Integer targetPartitionSize) + { + this.targetPartitionSize = targetPartitionSize; + return this; + } + + Tester maxPartitionSize(Integer maxPartitionSize) + { + this.maxPartitionSize = maxPartitionSize; + return this; + } + + void testIllegalArgumentException(String exceptionExpectedMessage) + { + exception.expect(IllegalArgumentException.class); + exception.expectMessage(exceptionExpectedMessage); + build(); + } + + SingleDimensionPartitionsSpec build() + { + return new SingleDimensionPartitionsSpec( + targetRowsPerSegment, + maxRowsPerSegment, + partitionDimension, + SingleDimensionPartitionsSpecTest.ASSUME_GROUPED, + targetPartitionSize, + maxPartitionSize + ); + } + } +} diff --git a/docs/ingestion/hadoop.md b/docs/ingestion/hadoop.md index 383a10301c8c..ec6cc011c179 100644 --- a/docs/ingestion/hadoop.md +++ b/docs/ingestion/hadoop.md @@ -186,7 +186,7 @@ Here is what goes inside `ingestionSpec`: |-----|----|-----------|--------| |dataSource|String|Druid dataSource name from which you are loading the data.|yes| |intervals|List|A list of strings representing ISO-8601 Intervals.|yes| -|segments|List|List of segments from which to read data from, by default it is obtained automatically. You can obtain list of segments to put here by making a POST query to Coordinator at url /druid/coordinator/v1/metadata/datasources/segments?full with list of intervals specified in the request paylod e.g. ["2012-01-01T00:00:00.000/2012-01-03T00:00:00.000", "2012-01-05T00:00:00.000/2012-01-07T00:00:00.000"]. You may want to provide this list manually in order to ensure that segments read are exactly same as they were at the time of task submission, task would fail if the list provided by the user does not match with state of database when the task actually runs.|no| +|segments|List|List of segments from which to read data from, by default it is obtained automatically. You can obtain list of segments to put here by making a POST query to Coordinator at url /druid/coordinator/v1/metadata/datasources/segments?full with list of intervals specified in the request payload e.g. ["2012-01-01T00:00:00.000/2012-01-03T00:00:00.000", "2012-01-05T00:00:00.000/2012-01-07T00:00:00.000"]. You may want to provide this list manually in order to ensure that segments read are exactly same as they were at the time of task submission, task would fail if the list provided by the user does not match with state of database when the task actually runs.|no| |filter|JSON|See [Filters](../querying/filters.md)|no| |dimensions|Array of String|Name of dimension columns to load. By default, the list will be constructed from parseSpec. If parseSpec does not have an explicit list of dimensions then all the dimension columns present in stored data will be read.|no| |metrics|Array of String|Name of metric columns to load. By default, the list will be constructed from the "name" of all the configured aggregators.|no| @@ -285,7 +285,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |jobProperties|Object|A map of properties to add to the Hadoop job configuration, see below for details.|no (default == null)| |indexSpec|Object|Tune how data is indexed. See [`indexSpec`](index.md#indexspec) on the main ingestion page for more information.|no| |indexSpecForIntermediatePersists|defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [`indexSpec`](index.md#indexspec) for possible values.|no (default = same as indexSpec)| -|numBackgroundPersistThreads|Integer|The number of new background threads to use for incremental persists. Using this feature causes a notable increase in memory pressure and cpu usage but will make the job finish more quickly. If changing from the default of 0 (use current thread for persists), we recommend setting it to 1.|no (default == 0)| +|numBackgroundPersistThreads|Integer|The number of new background threads to use for incremental persists. Using this feature causes a notable increase in memory pressure and CPU usage but will make the job finish more quickly. If changing from the default of 0 (use current thread for persists), we recommend setting it to 1.|no (default == 0)| |forceExtendableShardSpecs|Boolean|Forces use of extendable shardSpecs. Hash-based partitioning always uses an extendable shardSpec. For single-dimension partitioning, this option should be set to true to use an extendable shardSpec. For partitioning, please check [Partitioning specification](#partitionsspec). This option can be useful when you need to append more data to existing dataSource.|no (default = false)| |useExplicitVersion|Boolean|Forces HadoopIndexTask to use version.|no (default = false)| |logParseExceptions|Boolean|If true, log an error message when a parsing exception occurs, containing information about the row where the error occurred.|false|no| @@ -324,7 +324,7 @@ sized data segments relative to single-dimension partitioning. ```json "partitionsSpec": { "type": "hashed", - "targetPartitionSize": 5000000 + "maxRowsPerSegment": 5000000 } ``` @@ -337,16 +337,17 @@ The configuration options are: |Field|Description|Required| |--------|-----------|---------| |type|Type of partitionSpec to be used.|"hashed"| -|targetPartitionSize|Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|either this or numShards| -|numShards|Specify the number of partitions directly, instead of a target partition size. Ingestion will run faster, since it can skip the step necessary to select a number of partitions automatically.|either this or targetPartitionSize| -|partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions. Only used with numShards, will be ignored when targetPartitionSize is set|no| +|maxRowsPerSegment|Used in sharding. Determines how many rows are in each segment. Defaults to 5000000|no| +|targetPartitionSize|Deprecated. Use `maxRowsPerSegment` instead. Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|either this or numShards| +|numShards|Specify the number of partitions directly, instead of a target partition size. Ingestion will run faster, since it can skip the step necessary to select a number of partitions automatically.|either this or `maxRowsPerSegment`| +|partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions. Only used with `numShards`, will be ignored when `maxRowsPerSegment` is set.|no| ### Single-dimension range partitioning ```json "partitionsSpec": { "type": "single_dim", - "targetPartitionSize": 5000000 + "targetRowsPerSegment": 5000000 } ``` @@ -361,8 +362,10 @@ The configuration options are: |Field|Description|Required| |--------|-----------|---------| |type|Type of partitionSpec to be used.|"single_dim"| -|targetPartitionSize|Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|yes| -|maxPartitionSize|Maximum number of rows to include in a partition. Defaults to 50% larger than the targetPartitionSize.|no| +|targetRowsPerSegment|Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|yes| +|targetPartitionSize|Deprecated. Use `targetRowsPerSegment` instead. Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|no| +|maxRowsPerSegment|Maximum number of rows to include in a partition. Defaults to 50% larger than the `targetPartitionSize`.|no| +|maxPartitionSize|Deprecated. Use `maxRowsPerSegment` instead. Maximum number of rows to include in a partition. Defaults to 50% larger than the `targetPartitionSize`.|no| |partitionDimension|The dimension to partition on. Leave blank to select a dimension automatically.|no| |assumeGrouped|Assume that input data has already been grouped on time and dimensions. Ingestion will run faster, but may choose sub-optimal partitions if this assumption is violated.|no| @@ -391,7 +394,7 @@ on your EMR master. ## Kerberized Hadoop clusters -By default druid can use the exisiting TGT kerberos ticket available in local kerberos key cache. +By default druid can use the existing TGT kerberos ticket available in local kerberos key cache. Although TGT ticket has a limited life cycle, therefore you need to call `kinit` command periodically to ensure validity of TGT ticket. To avoid this extra external cron job script calling `kinit` periodically, diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index 6f8490862edb..93695ef155b0 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -61,7 +61,7 @@ partitioned data from middleManagers or indexers and merges them to create the f As in the single phase execution, the created segments are reported to the supervisor task to publish at once. To use this task, the `firehose` in `ioConfig` should be _splittable_ and `maxNumConcurrentSubTasks` should be set something larger than 1 in `tuningConfig`. -Otherwise, this task runs sequentially. Here is the list of currently splittable fireshoses. +Otherwise, this task runs sequentially. Here is the list of currently splittable firehoses. - [`LocalFirehose`](#local-firehose) - [`IngestSegmentFirehose`](#segment-firehose) @@ -629,7 +629,7 @@ For perfect rollup, you should use `hashed`. |--------|-----------|-------|---------| |type|This should always be `hashed`|none|yes| |maxRowsPerSegment|Used in sharding. Determines how many rows are in each segment.|5000000|no| -|numShards|Directly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if maxRowsPerSegment is set.|null|no| +|numShards|Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data. `numShards` cannot be specified if `maxRowsPerSegment` is set.|null|no| |partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions.|null|no| For best-effort rollup, you should use `dynamic`. diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java index f9c4fe8f9806..59245a1f5a23 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java @@ -103,7 +103,7 @@ public class DeterminePartitionsJob implements Jobby private String failureCause; - public DeterminePartitionsJob( + DeterminePartitionsJob( HadoopDruidIndexerConfig config ) { @@ -168,7 +168,7 @@ public boolean run() try { if (!groupByJob.waitForCompletion(true)) { log.error("Job failed: %s", groupByJob.getJobID()); - failureCause = Utils.getFailureMessage(groupByJob, config.JSON_MAPPER); + failureCause = Utils.getFailureMessage(groupByJob, HadoopDruidIndexerConfig.JSON_MAPPER); return false; } } @@ -238,7 +238,7 @@ public boolean run() try { if (!dimSelectionJob.waitForCompletion(true)) { log.error("Job failed: %s", dimSelectionJob.getJobID().toString()); - failureCause = Utils.getFailureMessage(dimSelectionJob, config.JSON_MAPPER); + failureCause = Utils.getFailureMessage(dimSelectionJob, HadoopDruidIndexerConfig.JSON_MAPPER); return false; } } @@ -262,7 +262,7 @@ public boolean run() fileSystem = partitionInfoPath.getFileSystem(dimSelectionJob.getConfiguration()); } if (Utils.exists(dimSelectionJob, fileSystem, partitionInfoPath)) { - List specs = config.JSON_MAPPER.readValue( + List specs = HadoopDruidIndexerConfig.JSON_MAPPER.readValue( Utils.openInputStream(dimSelectionJob, partitionInfoPath), new TypeReference>() { } @@ -298,14 +298,13 @@ public Map getStats() try { Counters jobCounters = groupByJob.getCounters(); - Map metrics = TaskMetricsUtils.makeIngestionRowMetrics( + return TaskMetricsUtils.makeIngestionRowMetrics( jobCounters.findCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_PROCESSED_COUNTER).getValue(), - jobCounters.findCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_PROCESSED_WITH_ERRORS_COUNTER).getValue(), + jobCounters.findCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_PROCESSED_WITH_ERRORS_COUNTER) + .getValue(), jobCounters.findCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_UNPARSEABLE_COUNTER).getValue(), jobCounters.findCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_THROWN_AWAY_COUNTER).getValue() ); - - return metrics; } catch (IllegalStateException ise) { log.debug("Couldn't get counters due to job state"); @@ -433,13 +432,13 @@ protected void innerMap( * Since we have two slightly different DimSelectionMappers, this class encapsulates the shared logic for * emitting dimension value counts. */ - public static class DeterminePartitionsDimSelectionMapperHelper + static class DeterminePartitionsDimSelectionMapperHelper { private final HadoopDruidIndexerConfig config; private final String partitionDimension; private final Map intervalIndexes; - public DeterminePartitionsDimSelectionMapperHelper(HadoopDruidIndexerConfig config, String partitionDimension) + DeterminePartitionsDimSelectionMapperHelper(HadoopDruidIndexerConfig config, String partitionDimension) { this.config = config; this.partitionDimension = partitionDimension; @@ -454,7 +453,7 @@ public DeterminePartitionsDimSelectionMapperHelper(HadoopDruidIndexerConfig conf this.intervalIndexes = timeIndexBuilder.build(); } - public void emitDimValueCounts( + void emitDimValueCounts( TaskInputOutputContext context, DateTime timestamp, Map> dims @@ -568,7 +567,7 @@ protected abstract void innerReduce( Iterable combinedIterable ) throws IOException, InterruptedException; - private Iterable combineRows(Iterable input) + private static Iterable combineRows(Iterable input) { return new CombiningIterable<>( Iterables.transform( @@ -771,7 +770,7 @@ protected void innerReduce(Context context, SortableBytes keyBytes, Iterable partitionsSpec.getMaxPartitionSize()) { + if (partition.rows > partitionsSpec.getMaxRowsPerSegment()) { log.info("Dimension[%s] has an oversized shard: %s", dimPartitions.dim, partition.shardSpec); oversized = true; } @@ -857,7 +856,7 @@ public void close(TaskAttemptContext context) @Override public void checkOutputSpecs(JobContext job) throws IOException { - Path outDir = getOutputPath(job); + Path outDir = FileOutputFormat.getOutputPath(job); if (outDir == null) { throw new InvalidJobConfException("Output directory not set."); } @@ -874,7 +873,7 @@ private DimPartitions(String dim) this.dim = dim; } - public int getCardinality() + int getCardinality() { int sum = 0; for (final DimPartition dimPartition : partitions) { @@ -883,7 +882,7 @@ public int getCardinality() return sum; } - public long getDistanceSquaredFromTarget(long target) + long getDistanceSquaredFromTarget(long target) { long distance = 0; for (final DimPartition dimPartition : partitions) { @@ -907,7 +906,7 @@ public long getRows() private static class DimPartition { public ShardSpec shardSpec = null; - public int cardinality = 0; + int cardinality = 0; public long rows = 0; } @@ -924,12 +923,12 @@ private DimValueCount(String dim, String value, long numRows) this.numRows = numRows; } - public Text toText() + Text toText() { return new Text(TAB_JOINER.join(dim, String.valueOf(numRows), value)); } - public static DimValueCount fromText(Text text) + static DimValueCount fromText(Text text) { final Iterator splits = TAB_SPLITTER.limit(3).split(text.toString()).iterator(); final String dim = splits.next(); diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopIngestionSpecTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopIngestionSpecTest.java index fbd13c92f560..8bafae5d15a8 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopIngestionSpecTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopIngestionSpecTest.java @@ -150,8 +150,8 @@ public void testPartitionsSpecAutoHashed() Assert.assertEquals( "getTargetPartitionSize", - partitionsSpec.getMaxRowsPerSegment().intValue(), - 100 + 100, + partitionsSpec.getMaxRowsPerSegment().intValue() ); Assert.assertTrue( @@ -167,14 +167,13 @@ public void testPartitionsSpecMaxPartitionSize() try { schema = jsonReadWriteRead( - "{\n" + " \"tuningConfig\": {\n" + " \"type\": \"hadoop\",\n" + " \"partitionsSpec\": {\n" + " \"type\": \"dimension\",\n" + " \"targetPartitionSize\": 100,\n" - + " \"maxPartitionSize\" : 200,\n" + + " \"maxPartitionSize\" : null,\n" + " \"partitionDimension\" : \"foo\"\n" + " }\n" + " }\n" @@ -186,32 +185,29 @@ public void testPartitionsSpecMaxPartitionSize() throw new RuntimeException(e); } - final SingleDimensionPartitionsSpec partitionsSpec = - (SingleDimensionPartitionsSpec) schema.getTuningConfig().getPartitionsSpec(); + PartitionsSpec partitionsSpec = schema.getTuningConfig().getPartitionsSpec(); + Assert.assertTrue("partitionsSpec", partitionsSpec instanceof SingleDimensionPartitionsSpec); - Assert.assertEquals( - "isDeterminingPartitions", - partitionsSpec.needsDeterminePartitions(true), - true - ); + SingleDimensionPartitionsSpec singleDimensionPartitionsSpec = (SingleDimensionPartitionsSpec) partitionsSpec; + + Assert.assertTrue("isDeterminingPartitions", singleDimensionPartitionsSpec.needsDeterminePartitions(true)); Assert.assertEquals( "getTargetPartitionSize", - partitionsSpec.getMaxRowsPerSegment().intValue(), - 100 + 100, + singleDimensionPartitionsSpec.getTargetRowsPerSegment().intValue() ); Assert.assertEquals( "getMaxPartitionSize", - partitionsSpec.getMaxPartitionSize(), - 200 + 150, + singleDimensionPartitionsSpec.getMaxRowsPerSegment().intValue() ); - Assert.assertTrue("partitionsSpec", partitionsSpec instanceof SingleDimensionPartitionsSpec); Assert.assertEquals( "getPartitionDimension", - partitionsSpec.getPartitionDimension(), - "foo" + "foo", + singleDimensionPartitionsSpec.getPartitionDimension() ); } @@ -262,15 +258,11 @@ public void testDefaultSettings() Assert.assertEquals( "cleanupOnFailure", - schema.getTuningConfig().isCleanupOnFailure(), - true + true, + schema.getTuningConfig().isCleanupOnFailure() ); - Assert.assertEquals( - "overwriteFiles", - schema.getTuningConfig().isOverwriteFiles(), - false - ); + Assert.assertFalse("overwriteFiles", schema.getTuningConfig().isOverwriteFiles()); Assert.assertFalse( "isDeterminingPartitions", @@ -324,14 +316,10 @@ public void testNoCleanupOnFailure() throw new RuntimeException(e); } - Assert.assertEquals( - "cleanupOnFailure", - schema.getTuningConfig().isCleanupOnFailure(), - false - ); + Assert.assertFalse("cleanupOnFailure", schema.getTuningConfig().isCleanupOnFailure()); } - private T jsonReadWriteRead(String s, Class klass) + private static T jsonReadWriteRead(String s, Class klass) { try { return JSON_MAPPER.readValue(JSON_MAPPER.writeValueAsBytes(JSON_MAPPER.readValue(s, klass)), klass); diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/partitions/HashedPartitionsSpecTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/partitions/HashedPartitionsSpecTest.java index 3e13fa2239f7..fad5de2f76bf 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/partitions/HashedPartitionsSpecTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/partitions/HashedPartitionsSpecTest.java @@ -25,8 +25,6 @@ import org.junit.Assert; import org.junit.Test; -/** - */ public class HashedPartitionsSpecTest { private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper(); @@ -34,35 +32,29 @@ public class HashedPartitionsSpecTest @Test public void testHashedPartitionsSpec() { - { - final PartitionsSpec partitionsSpec = jsonReadWriteRead( - "{" - + " \"targetPartitionSize\":100," - + " \"type\":\"hashed\"" - + "}", - PartitionsSpec.class - ); - Assert.assertTrue("partitionsSpec", partitionsSpec instanceof HashedPartitionsSpec); - final HashedPartitionsSpec hadoopHashedPartitionsSpec = (HashedPartitionsSpec) partitionsSpec; + final PartitionsSpec partitionsSpec = jsonReadWriteRead( + "{" + + " \"targetPartitionSize\":100," + + " \"type\":\"hashed\"" + + "}", + PartitionsSpec.class + ); + Assert.assertTrue("partitionsSpec", partitionsSpec instanceof HashedPartitionsSpec); + final HashedPartitionsSpec hadoopHashedPartitionsSpec = (HashedPartitionsSpec) partitionsSpec; - Assert.assertEquals( - "isDeterminingPartitions", - hadoopHashedPartitionsSpec.needsDeterminePartitions(true), - true - ); + Assert.assertTrue("isDeterminingPartitions", hadoopHashedPartitionsSpec.needsDeterminePartitions(true)); - Assert.assertEquals( - "getTargetPartitionSize", - hadoopHashedPartitionsSpec.getMaxRowsPerSegment().intValue(), - 100 - ); + Assert.assertEquals( + "getTargetPartitionSize", + 100, + hadoopHashedPartitionsSpec.getMaxRowsPerSegment().intValue() + ); - Assert.assertEquals( - "getPartitionDimensions", - hadoopHashedPartitionsSpec.getPartitionDimensions(), - ImmutableList.of() - ); - } + Assert.assertEquals( + "getPartitionDimensions", + ImmutableList.of(), + hadoopHashedPartitionsSpec.getPartitionDimensions() + ); } @Test @@ -78,11 +70,7 @@ public void testHashedPartitionsSpecShardCount() Assert.assertTrue("partitionsSpec", partitionsSpec instanceof HashedPartitionsSpec); final HashedPartitionsSpec hadoopHashedPartitionsSpec = (HashedPartitionsSpec) partitionsSpec; - Assert.assertEquals( - "isDeterminingPartitions", - hadoopHashedPartitionsSpec.needsDeterminePartitions(true), - false - ); + Assert.assertFalse("isDeterminingPartitions", hadoopHashedPartitionsSpec.needsDeterminePartitions(true)); Assert.assertNull( "getTargetPartitionSize", @@ -91,14 +79,14 @@ public void testHashedPartitionsSpecShardCount() Assert.assertEquals( "shardCount", - hadoopHashedPartitionsSpec.getNumShards().intValue(), - 2 + 2, + hadoopHashedPartitionsSpec.getNumShards().intValue() ); Assert.assertEquals( "getPartitionDimensions", - hadoopHashedPartitionsSpec.getPartitionDimensions(), - ImmutableList.of() + ImmutableList.of(), + hadoopHashedPartitionsSpec.getPartitionDimensions() ); } From 6190d1651d2fd9f5d6f8a9c5ec893168e83da96f Mon Sep 17 00:00:00 2001 From: Chi Cao Minh Date: Wed, 11 Sep 2019 15:00:50 -0700 Subject: [PATCH 2/4] Fix test --- .../indexer/DeterminePartitionsJobTest.java | 33 ++++++++++--------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java index 35174f56dfd7..89b7fde43d61 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java @@ -51,16 +51,15 @@ @RunWith(Parameterized.class) public class DeterminePartitionsJobTest { - - private HadoopDruidIndexerConfig config; - private int expectedNumOfSegments; - private int[] expectedNumOfShardsForEachSegment; - private String[][][] expectedStartEndForEachShard; - private File dataFile; - private File tmpDir; + private final HadoopDruidIndexerConfig config; + private final int expectedNumOfSegments; + private final int[] expectedNumOfShardsForEachSegment; + private final String[][][] expectedStartEndForEachShard; + private final File dataFile; + private final File tmpDir; @Parameterized.Parameters(name = "assumeGrouped={0}, " - + "targetPartitionSize={1}, " + + "maxRowsPerSegment={1}, " + "interval={2}" + "expectedNumOfSegments={3}, " + "expectedNumOfShardsForEachSegment={4}, " @@ -82,7 +81,7 @@ public static Collection constructFeed() {"c.example.com", "e.example.com"}, {"e.example.com", "g.example.com"}, {"g.example.com", "i.example.com"}, - {"i.example.com", null } + {"i.example.com", null} } }, ImmutableList.of( @@ -222,7 +221,7 @@ public static Collection constructFeed() public DeterminePartitionsJobTest( boolean assumeGrouped, - Integer targetPartitionSize, + Integer maxRowsPerSegment, String interval, int expectedNumOfSegments, int[] expectedNumOfShardsForEachSegment, @@ -249,7 +248,11 @@ public DeterminePartitionsJobTest( new StringInputRowParser( new CSVParseSpec( new TimestampSpec("timestamp", "yyyyMMddHH", null), - new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host", "country")), null, null), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas(ImmutableList.of("host", "country")), + null, + null + ), null, ImmutableList.of("timestamp", "host", "country", "visited_num"), false, @@ -281,7 +284,7 @@ public DeterminePartitionsJobTest( new HadoopTuningConfig( tmpDir.getCanonicalPath(), null, - new SingleDimensionPartitionsSpec(targetPartitionSize, null, null, assumeGrouped), + new SingleDimensionPartitionsSpec(null, maxRowsPerSegment, null, assumeGrouped), null, null, null, @@ -319,9 +322,9 @@ public void testPartitionJob() Assert.assertEquals(expectedNumOfSegments, config.getSchema().getTuningConfig().getShardSpecs().size()); for (Map.Entry> entry : config.getSchema() - .getTuningConfig() - .getShardSpecs() - .entrySet()) { + .getTuningConfig() + .getShardSpecs() + .entrySet()) { int partitionNum = 0; List specs = entry.getValue(); Assert.assertEquals(expectedNumOfShardsForEachSegment[segmentNum], specs.size()); From e9c5b0cabed020392b87d329d20085f15e5623a5 Mon Sep 17 00:00:00 2001 From: Chi Cao Minh Date: Wed, 18 Sep 2019 16:13:55 -0700 Subject: [PATCH 3/4] Improve docs --- docs/ingestion/hadoop.md | 4 ++-- website/.spelling | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/ingestion/hadoop.md b/docs/ingestion/hadoop.md index 3cdfbdbec59a..352d2a2f4fcc 100644 --- a/docs/ingestion/hadoop.md +++ b/docs/ingestion/hadoop.md @@ -338,7 +338,7 @@ The configuration options are: |--------|-----------|---------| |type|Type of partitionSpec to be used.|"hashed"| |maxRowsPerSegment|Used in sharding. Determines how many rows are in each segment. Defaults to 5000000|no| -|targetPartitionSize|Deprecated. Use `maxRowsPerSegment` instead. Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|either this or numShards| +|targetPartitionSize|Deprecated. Renamed to `maxRowsPerSegment`. Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|either this or numShards| |numShards|Specify the number of partitions directly, instead of a target partition size. Ingestion will run faster, since it can skip the step necessary to select a number of partitions automatically.|either this or `maxRowsPerSegment`| |partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions. Only used with `numShards`, will be ignored when `maxRowsPerSegment` is set.|no| @@ -363,7 +363,7 @@ The configuration options are: |--------|-----------|---------| |type|Type of partitionSpec to be used.|"single_dim"| |targetRowsPerSegment|Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|yes| -|targetPartitionSize|Deprecated. Use `targetRowsPerSegment` instead. Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|no| +|targetPartitionSize|Deprecated. Renamed to `targetRowsPerSegment`. Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|no| |maxRowsPerSegment|Maximum number of rows to include in a partition. Defaults to 50% larger than the `targetPartitionSize`.|no| |maxPartitionSize|Deprecated. Use `maxRowsPerSegment` instead. Maximum number of rows to include in a partition. Defaults to 50% larger than the `targetPartitionSize`.|no| |partitionDimension|The dimension to partition on. Leave blank to select a dimension automatically.|no| diff --git a/website/.spelling b/website/.spelling index 5e4553a04d87..0b875ed486ea 100644 --- a/website/.spelling +++ b/website/.spelling @@ -864,6 +864,7 @@ segmentTable shardSpec single_dim targetPartitionSize +targetRowsPerSegment useCombiner useExplicitVersion useNewAggs From b9c530ed2feaff06a03975a0b580338e06a8aed2 Mon Sep 17 00:00:00 2001 From: Chi Cao Minh Date: Thu, 19 Sep 2019 11:55:07 -0700 Subject: [PATCH 4/4] Add targetRowsPerSegment to HashedPartitionsSpec --- .../druid/indexer/partitions/Checks.java | 47 +++++++++++ .../partitions/HashedPartitionsSpec.java | 42 ++++++---- .../druid/indexer/partitions/Property.java | 67 +++++++++++++++ .../SingleDimensionPartitionsSpec.java | 65 ++++----------- .../druid/indexer/partitions/ChecksTest.java | 71 ++++++++++++++++ docs/ingestion/hadoop.md | 10 ++- docs/ingestion/native-batch.md | 5 +- .../partitions/HashedPartitionsSpecTest.java | 83 +++++++++++++++---- 8 files changed, 303 insertions(+), 87 deletions(-) create mode 100644 core/src/main/java/org/apache/druid/indexer/partitions/Checks.java create mode 100644 core/src/main/java/org/apache/druid/indexer/partitions/Property.java create mode 100644 core/src/test/java/org/apache/druid/indexer/partitions/ChecksTest.java diff --git a/core/src/main/java/org/apache/druid/indexer/partitions/Checks.java b/core/src/main/java/org/apache/druid/indexer/partitions/Checks.java new file mode 100644 index 000000000000..a0e47f8efb88 --- /dev/null +++ b/core/src/main/java/org/apache/druid/indexer/partitions/Checks.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexer.partitions; + +/** + * Various helper methods useful for checking the validity of arguments to spec constructors. + */ +class Checks +{ + /** + * @return Non-null value, or first one if both are null + */ + @SuppressWarnings("VariableNotUsedInsideIf") // false positive: checked for 'null' not used inside 'if + static Property checkAtMostOneNotNull(String name1, Integer value1, String name2, Integer value2) + { + final Property property; + + if (value1 == null && value2 == null) { + property = new Property<>(name1, value1); + } else if (value1 == null) { + property = new Property<>(name2, value2); + } else if (value2 == null) { + property = new Property<>(name1, value1); + } else { + throw new IllegalArgumentException("At most one of " + name1 + " or " + name2 + " must be present"); + } + + return property; + } +} diff --git a/core/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java b/core/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java index f51aed718aaf..950015f7c2c3 100644 --- a/core/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java +++ b/core/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java @@ -42,37 +42,37 @@ public class HashedPartitionsSpec implements DimensionBasedPartitionsSpec public static HashedPartitionsSpec defaultSpec() { - return new HashedPartitionsSpec(null, null, null, null); - } - - public HashedPartitionsSpec( - @Nullable Integer maxRowsPerSegment, - @Nullable Integer numShards, - @Nullable List partitionDimensions - ) - { - this(maxRowsPerSegment, numShards, partitionDimensions, null); + return new HashedPartitionsSpec(null, null, null, null, null); } @JsonCreator public HashedPartitionsSpec( - @JsonProperty(PartitionsSpec.MAX_ROWS_PER_SEGMENT) @Nullable Integer maxRowsPerSegment, + @JsonProperty(DimensionBasedPartitionsSpec.TARGET_ROWS_PER_SEGMENT) @Nullable Integer targetRowsPerSegment, @JsonProperty("numShards") @Nullable Integer numShards, @JsonProperty("partitionDimensions") @Nullable List partitionDimensions, // Deprecated properties preserved for backward compatibility: @Deprecated @JsonProperty(DimensionBasedPartitionsSpec.TARGET_PARTITION_SIZE) @Nullable - Integer targetPartitionSize + Integer targetPartitionSize, + @Deprecated @JsonProperty(PartitionsSpec.MAX_ROWS_PER_SEGMENT) @Nullable + Integer maxRowsPerSegment ) { + Property target = Checks.checkAtMostOneNotNull( + DimensionBasedPartitionsSpec.TARGET_ROWS_PER_SEGMENT, + targetRowsPerSegment, + DimensionBasedPartitionsSpec.TARGET_PARTITION_SIZE, + targetPartitionSize + ); + Preconditions.checkArgument( - PartitionsSpec.isEffectivelyNull(targetPartitionSize) || PartitionsSpec.isEffectivelyNull(maxRowsPerSegment), - "Can't set both targetPartitionSize and maxRowsPerSegment" + PartitionsSpec.isEffectivelyNull(target.getValue()) || PartitionsSpec.isEffectivelyNull(maxRowsPerSegment), + "Can't set both " + target.getName() + " and maxRowsPerSegment" ); - final Integer realMaxRowsPerSegment = targetPartitionSize == null ? maxRowsPerSegment : targetPartitionSize; + final Integer realMaxRowsPerSegment = target.getValue() == null ? maxRowsPerSegment : target.getValue(); Preconditions.checkArgument( PartitionsSpec.isEffectivelyNull(realMaxRowsPerSegment) || PartitionsSpec.isEffectivelyNull(numShards), - "Can't use maxRowsPerSegment or targetPartitionSize and numShards together" + "Can't use maxRowsPerSegment or " + target.getName() + " and numShards together" ); // Needs to determine partitions if the _given_ numShards is null this.maxRowsPerSegment = getValidMaxRowsPerSegment(realMaxRowsPerSegment, numShards); @@ -104,6 +104,16 @@ public HashedPartitionsSpec( } } + public HashedPartitionsSpec( + @Nullable Integer maxRowsPerSegment, + @Nullable Integer numShards, + @Nullable List partitionDimensions + ) + { + this(null, numShards, partitionDimensions, null, maxRowsPerSegment); + } + + private static boolean needsDeterminePartitions(@Nullable Integer numShards) { return PartitionsSpec.isEffectivelyNull(numShards); diff --git a/core/src/main/java/org/apache/druid/indexer/partitions/Property.java b/core/src/main/java/org/apache/druid/indexer/partitions/Property.java new file mode 100644 index 000000000000..6d4715edd1b5 --- /dev/null +++ b/core/src/main/java/org/apache/druid/indexer/partitions/Property.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexer.partitions; + +import java.util.Objects; + +/** + * Convenience class for holding a pair of string key and templated value. + */ +class Property +{ + private final String name; + private final T value; + + Property(String name, T value) + { + this.name = name; + this.value = value; + } + + public String getName() + { + return name; + } + + public T getValue() + { + return value; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Property property = (Property) o; + return Objects.equals(name, property.name) && + Objects.equals(value, property.value); + } + + @Override + public int hashCode() + { + return Objects.hash(name, value); + } +} diff --git a/core/src/main/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpec.java b/core/src/main/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpec.java index 8b5b108592ef..9b985fb04058 100644 --- a/core/src/main/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpec.java +++ b/core/src/main/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpec.java @@ -64,29 +64,29 @@ public SingleDimensionPartitionsSpec( Integer maxPartitionSize // prefer maxRowsPerSegment ) { - Property target = checkAtMostOneNotNull( - TARGET_ROWS_PER_SEGMENT, + Property target = Checks.checkAtMostOneNotNull( + DimensionBasedPartitionsSpec.TARGET_ROWS_PER_SEGMENT, targetRowsPerSegment, - TARGET_PARTITION_SIZE, + DimensionBasedPartitionsSpec.TARGET_PARTITION_SIZE, targetPartitionSize ); - Property max = checkAtMostOneNotNull( - MAX_ROWS_PER_SEGMENT, + Property max = Checks.checkAtMostOneNotNull( + PartitionsSpec.MAX_ROWS_PER_SEGMENT, maxRowsPerSegment, MAX_PARTITION_SIZE, maxPartitionSize ); Preconditions.checkArgument( - (target.value == null) != (max.value == null), - "Exactly one of " + target.name + " or " + max.name + " must be present" + (target.getValue() == null) != (max.getValue() == null), + "Exactly one of " + target.getName() + " or " + max.getName() + " must be present" ); this.partitionDimension = partitionDimension; this.assumeGrouped = assumeGrouped; - this.targetRowsPerSegment = target.value; - this.maxRowsPerSegment = max.value; + this.targetRowsPerSegment = target.getValue(); + this.maxRowsPerSegment = max.getValue(); this.resolvedMaxRowPerSegment = resolveMaxRowsPerSegment(target, max); } @@ -102,58 +102,25 @@ public SingleDimensionPartitionsSpec( this(targetRowsPerSegment, maxRowsPerSegment, partitionDimension, assumeGrouped, null, null); } - /** - * @return Non-null value, or first one if both are null - */ - @SuppressWarnings("VariableNotUsedInsideIf") // false positive: checked for 'null' not used inside 'if - private static Property checkAtMostOneNotNull(String name1, Integer value1, String name2, Integer value2) - { - final Property property; - - if (value1 == null && value2 == null) { - property = new Property<>(name1, value1); - } else if (value1 == null) { - property = new Property<>(name2, value2); - } else if (value2 == null) { - property = new Property<>(name1, value1); - } else { - throw new IllegalArgumentException("At most one of " + name1 + " or " + name2 + " must be present"); - } - - return property; - } - private static int resolveMaxRowsPerSegment(Property target, Property max) { final int resolvedValue; - if (target.value != null) { - Preconditions.checkArgument(target.value > 0, target.name + " must be greater than 0"); + if (target.getValue() != null) { + Preconditions.checkArgument(target.getValue() > 0, target.getName() + " must be greater than 0"); try { - resolvedValue = Math.addExact(target.value, (target.value / 2)); + resolvedValue = Math.addExact(target.getValue(), (target.getValue() / 2)); } catch (ArithmeticException e) { - throw new IllegalArgumentException(target.name + " is too large"); + throw new IllegalArgumentException(target.getName() + " is too large"); } } else { - Preconditions.checkArgument(max.value > 0, max.name + " must be greater than 0"); - resolvedValue = max.value; + Preconditions.checkArgument(max.getValue() > 0, max.getName() + " must be greater than 0"); + resolvedValue = max.getValue(); } return resolvedValue; } - private static class Property - { - private final String name; - private final T value; - - Property(String name, T value) - { - this.name = name; - this.value = value; - } - } - @JsonProperty @Nullable public Integer getTargetRowsPerSegment() @@ -169,7 +136,7 @@ public Integer getMaxRowsPerSegment() return resolvedMaxRowPerSegment; // NOTE: This returns the *resolved* value } - @JsonProperty(MAX_ROWS_PER_SEGMENT) + @JsonProperty(PartitionsSpec.MAX_ROWS_PER_SEGMENT) private Integer getMaxRowsPerSegmentForJson() { return maxRowsPerSegment; diff --git a/core/src/test/java/org/apache/druid/indexer/partitions/ChecksTest.java b/core/src/test/java/org/apache/druid/indexer/partitions/ChecksTest.java new file mode 100644 index 000000000000..97d1a69602db --- /dev/null +++ b/core/src/test/java/org/apache/druid/indexer/partitions/ChecksTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexer.partitions; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class ChecksTest +{ + private static final String NAME1 = "name1"; + private static final Integer VALUE1 = 1; + private static final String NAME2 = "name2"; + private static final Integer VALUE2 = 2; + private static final Integer NULL = null; + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @Test + public void checkAtMostOneNotNullFirstNull() + { + Property result = Checks.checkAtMostOneNotNull(NAME1, NULL, NAME2, VALUE2); + Assert.assertEquals(NAME2, result.getName()); + Assert.assertEquals(VALUE2, result.getValue()); + } + + @Test + public void checkAtMostOneNotNullSecondNull() + { + Property result = Checks.checkAtMostOneNotNull(NAME1, VALUE1, NAME2, NULL); + Assert.assertEquals(NAME1, result.getName()); + Assert.assertEquals(VALUE1, result.getValue()); + } + + @Test + public void checkAtMostOneNotNullBothNull() + { + Property result = Checks.checkAtMostOneNotNull(NAME1, NULL, NAME2, NULL); + Assert.assertEquals(NAME1, result.getName()); + Assert.assertEquals(NULL, result.getValue()); + } + + @Test + public void checkAtMostOneNotNullNeitherNull() + { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("At most one of " + NAME1 + " or " + NAME2 + " must be present"); + + //noinspection ConstantConditions (expected to fail) + Checks.checkAtMostOneNotNull(NAME1, VALUE1, NAME2, VALUE2); + } +} diff --git a/docs/ingestion/hadoop.md b/docs/ingestion/hadoop.md index 352d2a2f4fcc..43931d3466f1 100644 --- a/docs/ingestion/hadoop.md +++ b/docs/ingestion/hadoop.md @@ -324,7 +324,7 @@ sized data segments relative to single-dimension partitioning. ```json "partitionsSpec": { "type": "hashed", - "maxRowsPerSegment": 5000000 + "targetRowsPerSegment": 5000000 } ``` @@ -337,10 +337,12 @@ The configuration options are: |Field|Description|Required| |--------|-----------|---------| |type|Type of partitionSpec to be used.|"hashed"| -|maxRowsPerSegment|Used in sharding. Determines how many rows are in each segment. Defaults to 5000000|no| -|targetPartitionSize|Deprecated. Renamed to `maxRowsPerSegment`. Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|either this or numShards| +|targetRowsPerSegment|Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|either this or `numShards`| +|targetPartitionSize|Deprecated. Renamed to `targetRowsPerSegment`. Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|either this or `numShards`| +|maxRowsPerSegment|Deprecated. Renamed to `targetRowsPerSegment`. Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|either this or `numShards`| |numShards|Specify the number of partitions directly, instead of a target partition size. Ingestion will run faster, since it can skip the step necessary to select a number of partitions automatically.|either this or `maxRowsPerSegment`| -|partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions. Only used with `numShards`, will be ignored when `maxRowsPerSegment` is set.|no| +|partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions. Only used with `numShards`, will be ignored when `targetRowsPerSegment` is set.|no| + ### Single-dimension range partitioning diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index 8eff1c0fc716..8f1cdbce319e 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -229,8 +229,9 @@ For perfect rollup, you should use `hashed`. |property|description|default|required?| |--------|-----------|-------|---------| |type|This should always be `hashed`|none|yes| -|numShards|Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data. `numShards` cannot be specified if `maxRowsPerSegment` is set.|null|no| -|partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions.|null|no| +|targetRowsPerSegment|Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|null|either this or `numShards`| +|numShards|Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data. `numShards` cannot be specified if `targetRowsPerSegment` is set.|null|no| +|partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions. Only used with `numShards`, will be ignored when `targetRowsPerSegment` is set.|null|no| For best-effort rollup, you should use `dynamic`. diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/partitions/HashedPartitionsSpecTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/partitions/HashedPartitionsSpecTest.java index fad5de2f76bf..82189f2861ba 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/partitions/HashedPartitionsSpecTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/partitions/HashedPartitionsSpecTest.java @@ -23,29 +23,32 @@ import com.google.common.collect.ImmutableList; import org.apache.druid.jackson.DefaultObjectMapper; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; public class HashedPartitionsSpecTest { private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper(); + @Rule + public ExpectedException exception = ExpectedException.none(); + @Test public void testHashedPartitionsSpec() { - final PartitionsSpec partitionsSpec = jsonReadWriteRead( + final HashedPartitionsSpec hadoopHashedPartitionsSpec = jsonReadWriteRead( "{" - + " \"targetPartitionSize\":100," + + " \"targetRowsPerSegment\":100," + " \"type\":\"hashed\"" - + "}", - PartitionsSpec.class + + "}" ); - Assert.assertTrue("partitionsSpec", partitionsSpec instanceof HashedPartitionsSpec); - final HashedPartitionsSpec hadoopHashedPartitionsSpec = (HashedPartitionsSpec) partitionsSpec; Assert.assertTrue("isDeterminingPartitions", hadoopHashedPartitionsSpec.needsDeterminePartitions(true)); + Assert.assertNotNull(hadoopHashedPartitionsSpec.getMaxRowsPerSegment()); Assert.assertEquals( - "getTargetPartitionSize", + "getMaxRowsPerSegment", 100, hadoopHashedPartitionsSpec.getMaxRowsPerSegment().intValue() ); @@ -60,23 +63,21 @@ public void testHashedPartitionsSpec() @Test public void testHashedPartitionsSpecShardCount() { - final PartitionsSpec partitionsSpec = jsonReadWriteRead( + final HashedPartitionsSpec hadoopHashedPartitionsSpec = jsonReadWriteRead( "{" + " \"type\":\"hashed\"," + " \"numShards\":2" - + "}", - PartitionsSpec.class + + "}" ); - Assert.assertTrue("partitionsSpec", partitionsSpec instanceof HashedPartitionsSpec); - final HashedPartitionsSpec hadoopHashedPartitionsSpec = (HashedPartitionsSpec) partitionsSpec; Assert.assertFalse("isDeterminingPartitions", hadoopHashedPartitionsSpec.needsDeterminePartitions(true)); Assert.assertNull( - "getTargetPartitionSize", + "getMaxRowsPerSegment", hadoopHashedPartitionsSpec.getMaxRowsPerSegment() ); + Assert.assertNotNull(hadoopHashedPartitionsSpec.getNumShards()); Assert.assertEquals( "shardCount", 2, @@ -88,13 +89,63 @@ public void testHashedPartitionsSpecShardCount() ImmutableList.of(), hadoopHashedPartitionsSpec.getPartitionDimensions() ); + } + + @Test + public void testHashedPartitionsSpecBothTargetForbidden() + { + exception.expect(RuntimeException.class); + exception.expectMessage("At most one of targetRowsPerSegment or targetPartitionSize must be present"); + + String json = "{" + + "\"type\":\"hashed\"" + + ",\"targetRowsPerSegment\":100" + + ",\"targetPartitionSize\":100" + + "}"; + jsonReadWriteRead(json); + } + + @Test + public void testHashedPartitionsSpecBackwardCompatibleTargetPartitionSize() + { + String json = "{" + + "\"type\":\"hashed\"" + + ",\"targetPartitionSize\":100" + + "}"; + HashedPartitionsSpec hadoopHashedPartitionsSpec = jsonReadWriteRead(json); + + Assert.assertNotNull(hadoopHashedPartitionsSpec.getMaxRowsPerSegment()); + Assert.assertEquals( + "getMaxRowsPerSegment", + 100, + hadoopHashedPartitionsSpec.getMaxRowsPerSegment().intValue() + ); + } + + @Test + public void testHashedPartitionsSpecBackwardCompatibleMaxRowsPerSegment() + { + String json = "{" + + "\"type\":\"hashed\"" + + ",\"maxRowsPerSegment\":100" + + "}"; + HashedPartitionsSpec hadoopHashedPartitionsSpec = jsonReadWriteRead(json); + Assert.assertNotNull(hadoopHashedPartitionsSpec.getMaxRowsPerSegment()); + Assert.assertEquals( + "getMaxRowsPerSegment", + 100, + hadoopHashedPartitionsSpec.getMaxRowsPerSegment().intValue() + ); } - - private T jsonReadWriteRead(String s, Class klass) + + private static HashedPartitionsSpec jsonReadWriteRead(String s) { try { - return JSON_MAPPER.readValue(JSON_MAPPER.writeValueAsBytes(JSON_MAPPER.readValue(s, klass)), klass); + byte[] jsonBytes = JSON_MAPPER.writeValueAsBytes(JSON_MAPPER.readValue(s, PartitionsSpec.class)); + PartitionsSpec partitionsSpec = JSON_MAPPER.readValue(jsonBytes, PartitionsSpec.class); + Assert.assertTrue("partitionsSpec", partitionsSpec instanceof HashedPartitionsSpec); + return (HashedPartitionsSpec) partitionsSpec; } catch (Exception e) { throw new RuntimeException(e);