diff --git a/core/src/main/java/org/apache/druid/indexer/partitions/DimensionBasedPartitionsSpec.java b/core/src/main/java/org/apache/druid/indexer/partitions/DimensionBasedPartitionsSpec.java new file mode 100644 index 000000000000..b1f4e322a9ce --- /dev/null +++ b/core/src/main/java/org/apache/druid/indexer/partitions/DimensionBasedPartitionsSpec.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexer.partitions; + +import java.util.List; + +/** + * PartitionsSpec based on dimension values. + */ +public interface DimensionBasedPartitionsSpec extends PartitionsSpec +{ + List getPartitionDimensions(); +} diff --git a/core/src/main/java/org/apache/druid/indexer/partitions/DynamicPartitionsSpec.java b/core/src/main/java/org/apache/druid/indexer/partitions/DynamicPartitionsSpec.java new file mode 100644 index 000000000000..fb326633015a --- /dev/null +++ b/core/src/main/java/org/apache/druid/indexer/partitions/DynamicPartitionsSpec.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexer.partitions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import javax.annotation.Nullable; +import java.util.Objects; + +/** + * Dynamically determine partitions in the middle of indexing. + */ +public class DynamicPartitionsSpec implements PartitionsSpec +{ + public static final long DEFAULT_MAX_TOTAL_ROWS = 20_000_000; + + private final int maxRowsPerSegment; + private final long maxTotalRows; + + @JsonCreator + public DynamicPartitionsSpec( + @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment, + @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows + ) + { + this.maxRowsPerSegment = PartitionsSpec.isEffectivelyNull(maxRowsPerSegment) + ? DEFAULT_MAX_ROWS_PER_SEGMENT + : maxRowsPerSegment; + this.maxTotalRows = PartitionsSpec.isEffectivelyNull(maxTotalRows) ? DEFAULT_MAX_TOTAL_ROWS : maxTotalRows; + } + + @Override + @JsonProperty + public Integer getMaxRowsPerSegment() + { + return maxRowsPerSegment; + } + + @JsonProperty + public long getMaxTotalRows() + { + return maxTotalRows; + } + + @Override + public boolean needsDeterminePartitions(boolean useForHadoopTask) + { + return false; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DynamicPartitionsSpec that = (DynamicPartitionsSpec) o; + return maxRowsPerSegment == that.maxRowsPerSegment && + maxTotalRows == that.maxTotalRows; + } + + @Override + public int hashCode() + { + return Objects.hash(maxRowsPerSegment, maxTotalRows); + } + + @Override + public String toString() + { + return "DynamicPartitionsSpec{" + + "maxRowsPerSegment=" + maxRowsPerSegment + + ", maxTotalRows=" + maxTotalRows + + '}'; + } +} diff --git a/core/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java b/core/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java new file mode 100644 index 000000000000..8002fb5af7d9 --- /dev/null +++ b/core/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexer.partitions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.druid.java.util.common.logger.Logger; + +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +public class HashedPartitionsSpec implements DimensionBasedPartitionsSpec +{ + private static final Logger LOG = new Logger(HashedPartitionsSpec.class); + + @Nullable + private final Integer maxRowsPerSegment; + @Nullable + private final Integer numShards; + private final List partitionDimensions; + + public static HashedPartitionsSpec defaultSpec() + { + return new HashedPartitionsSpec(null, null, null, null); + } + + public HashedPartitionsSpec( + @Nullable Integer maxRowsPerSegment, + @Nullable Integer numShards, + @Nullable List partitionDimensions + ) + { + this(null, maxRowsPerSegment, numShards, partitionDimensions); + } + + @JsonCreator + public HashedPartitionsSpec( + @JsonProperty("targetPartitionSize") @Deprecated @Nullable Integer targetPartitionSize, + @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment, + @JsonProperty("numShards") @Nullable Integer numShards, + @JsonProperty("partitionDimensions") @Nullable List partitionDimensions + ) + { + Preconditions.checkArgument( + PartitionsSpec.isEffectivelyNull(targetPartitionSize) || PartitionsSpec.isEffectivelyNull(maxRowsPerSegment), + "Can't set both targetPartitionSize and maxRowsPerSegment" + ); + final Integer realMaxRowsPerSegment = targetPartitionSize == null ? maxRowsPerSegment : targetPartitionSize; + Preconditions.checkArgument( + PartitionsSpec.isEffectivelyNull(realMaxRowsPerSegment) || PartitionsSpec.isEffectivelyNull(numShards), + "Can't use maxRowsPerSegment or targetPartitionSize and numShards together" + ); + // Needs to determine partitions if the _given_ numShards is null + this.maxRowsPerSegment = getValidMaxRowsPerSegment(realMaxRowsPerSegment, numShards); + this.numShards = PartitionsSpec.isEffectivelyNull(numShards) ? null : numShards; + this.partitionDimensions = partitionDimensions == null ? Collections.emptyList() : partitionDimensions; + + Preconditions.checkArgument( + this.maxRowsPerSegment == null || this.maxRowsPerSegment > 0, + "maxRowsPerSegment[%s] should be positive", + this.maxRowsPerSegment + ); + Preconditions.checkArgument( + this.numShards == null || this.numShards > 0, + "numShards[%s] should be positive", + this.numShards + ); + + final boolean needsPartitionDetermination = needsDeterminePartitions(numShards); + if (!needsPartitionDetermination) { + Preconditions.checkState( + this.maxRowsPerSegment == null, + "maxRowsPerSegment[%s] must be null if we don't need to determine partitions", + this.maxRowsPerSegment + ); + Preconditions.checkState( + this.numShards != null, + "numShards must not be null if we don't need to determine partitions" + ); + } + } + + private static boolean needsDeterminePartitions(@Nullable Integer numShards) + { + return PartitionsSpec.isEffectivelyNull(numShards); + } + + @Nullable + private static Integer getValidMaxRowsPerSegment(@Nullable Integer maxRowsPerSegment, @Nullable Integer numShards) + { + if (needsDeterminePartitions(numShards)) { + return PartitionsSpec.isEffectivelyNull(maxRowsPerSegment) ? null : maxRowsPerSegment; + } else { + if (!PartitionsSpec.isEffectivelyNull(maxRowsPerSegment)) { + LOG.warn("maxRowsPerSegment[%s] is ignored since numShards[%s] is specified", maxRowsPerSegment, numShards); + } + return null; + } + } + + @Nullable + @Override + @JsonProperty + public Integer getMaxRowsPerSegment() + { + return maxRowsPerSegment; + } + + @Override + public boolean needsDeterminePartitions(boolean useForHadoopTask) + { + return useForHadoopTask ? maxRowsPerSegment != null : numShards == null; + } + + @Nullable + @JsonProperty + public Integer getNumShards() + { + return numShards; + } + + @Override + @JsonProperty + public List getPartitionDimensions() + { + return partitionDimensions; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + HashedPartitionsSpec that = (HashedPartitionsSpec) o; + return Objects.equals(maxRowsPerSegment, that.maxRowsPerSegment) && + Objects.equals(numShards, that.numShards) && + Objects.equals(partitionDimensions, that.partitionDimensions); + } + + @Override + public int hashCode() + { + return Objects.hash(maxRowsPerSegment, numShards, partitionDimensions); + } + + @Override + public String toString() + { + return "HashedPartitionsSpec{" + + "maxRowsPerSegment=" + maxRowsPerSegment + + ", numShards=" + numShards + + ", partitionDimensions=" + partitionDimensions + + '}'; + } +} diff --git a/core/src/main/java/org/apache/druid/indexer/partitions/PartitionsSpec.java b/core/src/main/java/org/apache/druid/indexer/partitions/PartitionsSpec.java new file mode 100644 index 000000000000..2f7396d168e0 --- /dev/null +++ b/core/src/main/java/org/apache/druid/indexer/partitions/PartitionsSpec.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexer.partitions; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +import javax.annotation.Nullable; + +/** + * PartitionsSpec describes the secondary partitioning method for data ingestion. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = HashedPartitionsSpec.class) +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "single_dim", value = SingleDimensionPartitionsSpec.class), + @JsonSubTypes.Type(name = "dimension", value = SingleDimensionPartitionsSpec.class), // for backward compatibility + @JsonSubTypes.Type(name = "hashed", value = HashedPartitionsSpec.class), + @JsonSubTypes.Type(name = "dynamic", value = DynamicPartitionsSpec.class) +}) +public interface PartitionsSpec +{ + int DEFAULT_MAX_ROWS_PER_SEGMENT = 5_000_000; + + /** + * Returns the max number of rows per segment. + * Implementations can have different default values which it could be even null. + * Callers should use the right value depending on the context if this returns null. + */ + @Nullable + Integer getMaxRowsPerSegment(); + + /** + * Returns true if this partitionsSpec needs to determine the number of partitions to start data ingestion. + * It should usually return true if perfect rollup is enforced but number of partitions is not specified. + */ + boolean needsDeterminePartitions(boolean useForHadoopTask); + + /** + * '-1' regarded as null for some historical reason. + */ + static boolean isEffectivelyNull(@Nullable Integer val) + { + return val == null || val == -1; + } + + /** + * '-1' regarded as null for some historical reason. + */ + static boolean isEffectivelyNull(@Nullable Long val) + { + return val == null || val == -1; + } +} diff --git a/core/src/main/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpec.java b/core/src/main/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpec.java new file mode 100644 index 000000000000..aa6ef87f3804 --- /dev/null +++ b/core/src/main/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpec.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexer.partitions; + + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; + +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +public class SingleDimensionPartitionsSpec implements DimensionBasedPartitionsSpec +{ + private final int maxRowsPerSegment; + private final int maxPartitionSize; + @Nullable + private final String partitionDimension; + private final boolean assumeGrouped; + + public SingleDimensionPartitionsSpec( + int maxRowsPerSegment, + @Nullable Integer maxPartitionSize, + @Nullable String partitionDimension, + boolean assumeGrouped + ) + { + this(null, maxRowsPerSegment, maxPartitionSize, partitionDimension, assumeGrouped); + } + + @JsonCreator + public SingleDimensionPartitionsSpec( + @JsonProperty("targetPartitionSize") @Nullable Integer targetPartitionSize, + @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment, + @JsonProperty("maxPartitionSize") @Nullable Integer maxPartitionSize, + @JsonProperty("partitionDimension") @Nullable String partitionDimension, + @JsonProperty("assumeGrouped") boolean assumeGrouped // false by default + ) + { + Preconditions.checkArgument( + PartitionsSpec.isEffectivelyNull(targetPartitionSize) || PartitionsSpec.isEffectivelyNull(maxRowsPerSegment), + "Can't set both targetPartitionSize and maxRowsPerSegment" + ); + Preconditions.checkArgument( + !PartitionsSpec.isEffectivelyNull(targetPartitionSize) || !PartitionsSpec.isEffectivelyNull(maxRowsPerSegment), + "Either targetPartitionSize or maxRowsPerSegment must be specified" + ); + final int realMaxRowsPerSegment = targetPartitionSize == null ? maxRowsPerSegment : targetPartitionSize; + Preconditions.checkArgument(realMaxRowsPerSegment > 0, "maxRowsPerSegment must be specified"); + this.maxRowsPerSegment = realMaxRowsPerSegment; + this.maxPartitionSize = PartitionsSpec.isEffectivelyNull(maxPartitionSize) + ? Math.addExact(realMaxRowsPerSegment, (int) (realMaxRowsPerSegment * 0.5)) + : maxPartitionSize; + this.partitionDimension = partitionDimension; + this.assumeGrouped = assumeGrouped; + } + + @Override + @JsonProperty + public Integer getMaxRowsPerSegment() + { + return maxRowsPerSegment; + } + + @Override + public boolean needsDeterminePartitions(boolean useForHadoopTask) + { + return true; + } + + @JsonProperty + public int getMaxPartitionSize() + { + return maxPartitionSize; + } + + @JsonProperty + @Nullable + public String getPartitionDimension() + { + return partitionDimension; + } + + @JsonProperty + public boolean isAssumeGrouped() + { + return assumeGrouped; + } + + @Override + public List getPartitionDimensions() + { + return partitionDimension == null ? Collections.emptyList() : Collections.singletonList(partitionDimension); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SingleDimensionPartitionsSpec that = (SingleDimensionPartitionsSpec) o; + return maxRowsPerSegment == that.maxRowsPerSegment && + maxPartitionSize == that.maxPartitionSize && + assumeGrouped == that.assumeGrouped && + Objects.equals(partitionDimension, that.partitionDimension); + } + + @Override + public int hashCode() + { + return Objects.hash(maxRowsPerSegment, maxPartitionSize, partitionDimension, assumeGrouped); + } + + @Override + public String toString() + { + return "SingleDimensionPartitionsSpec{" + + "maxRowsPerSegment=" + maxRowsPerSegment + + ", maxPartitionSize=" + maxPartitionSize + + ", partitionDimension='" + partitionDimension + '\'' + + ", assumeGrouped=" + assumeGrouped + + '}'; + } +} diff --git a/docs/content/ingestion/hadoop.md b/docs/content/ingestion/hadoop.md index 1c0bfbc58c62..100526043db7 100644 --- a/docs/content/ingestion/hadoop.md +++ b/docs/content/ingestion/hadoop.md @@ -182,7 +182,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |-----|----|-----------|--------| |workingPath|String|The working path to use for intermediate results (results between Hadoop jobs).|Only used by the [CLI Hadoop Indexer](../ingestion/command-line-hadoop-indexer.html). The default is '/tmp/druid-indexing'. This field must be null otherwise.| |version|String|The version of created segments. Ignored for HadoopIndexTask unless useExplicitVersion is set to true|no (default == datetime that indexing starts at)| -|partitionsSpec|Object|A specification of how to partition each time bucket into segments. Absence of this property means no partitioning will occur. See 'Partitioning specification' below.|no (default == 'hashed')| +|partitionsSpec|Object|A specification of how to partition each time bucket into segments. Absence of this property means no partitioning will occur. See 'Partitioning specification' below.|no (default == 'hadoop_hashed_partitions')| |maxRowsInMemory|Integer|The number of rows to aggregate before persisting. Note that this is the number of post-aggregation rows which may not be equal to the number of input events due to roll-up. This is used to manage the required JVM heap size. Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set.|no (default == 1000000)| |maxBytesInMemory|Long|The number of bytes to aggregate in heap memory before persisting. Normally this is computed internally and user does not need to set it. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists).|no (default == One-sixth of max JVM memory)| |leaveIntermediate|Boolean|Leave behind intermediate files (for debugging) in the workingPath when a job completes, whether it passes or fails.|no (default == false)| @@ -246,8 +246,8 @@ For Roaring bitmaps: ## Partitioning specification Segments are always partitioned based on timestamp (according to the granularitySpec) and may be further partitioned in -some other way depending on partition type. Druid supports two types of partitioning strategies: "hashed" (based on the -hash of all dimensions in each row), and "dimension" (based on ranges of a single dimension). +some other way depending on partition type. Druid supports two types of partitioning strategies: `hadoop_hashed_partitions` (based on the +hash of all dimensions in each row), and `hadoop_single_dim_partitions` (based on ranges of a single dimension). Hashed partitioning is recommended in most cases, as it will improve indexing performance and create more uniformly sized data segments relative to single-dimension partitioning. @@ -256,7 +256,7 @@ sized data segments relative to single-dimension partitioning. ```json "partitionsSpec": { - "type": "hashed", + "type": "hadoop_hashed_partitions", "targetPartitionSize": 5000000 } ``` @@ -269,7 +269,7 @@ The configuration options are: |Field|Description|Required| |--------|-----------|---------| -|type|Type of partitionSpec to be used.|"hashed"| +|type|Type of partitionSpec to be used.|"hadoop_hashed_partitions"| |targetPartitionSize|Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|either this or numShards| |numShards|Specify the number of partitions directly, instead of a target partition size. Ingestion will run faster, since it can skip the step necessary to select a number of partitions automatically.|either this or targetPartitionSize| |partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions. Only used with numShards, will be ignored when targetPartitionSize is set|no| @@ -278,7 +278,7 @@ The configuration options are: ```json "partitionsSpec": { - "type": "dimension", + "type": "hadoop_single_dim_partitions", "targetPartitionSize": 5000000 } ``` @@ -293,7 +293,7 @@ The configuration options are: |Field|Description|Required| |--------|-----------|---------| -|type|Type of partitionSpec to be used.|"dimension"| +|type|Type of partitionSpec to be used.|"hadoop_single_dim_partitions"| |targetPartitionSize|Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|yes| |maxPartitionSize|Maximum number of rows to include in a partition. Defaults to 50% larger than the targetPartitionSize.|no| |partitionDimension|The dimension to partition on. Leave blank to select a dimension automatically.|no| diff --git a/docs/content/ingestion/native_tasks.md b/docs/content/ingestion/native_tasks.md index 1cf5e0169b5b..76dfd2065f03 100644 --- a/docs/content/ingestion/native_tasks.md +++ b/docs/content/ingestion/native_tasks.md @@ -180,13 +180,11 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |property|description|default|required?| |--------|-----------|-------|---------| |type|The task type, this should always be `index_parallel`.|none|yes| -|maxRowsPerSegment|Used in sharding. Determines how many rows are in each segment.|5000000|no| |maxRowsInMemory|Used in determining when intermediate persists to disk should occur. Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set.|1000000|no| |maxBytesInMemory|Used in determining when intermediate persists to disk should occur. Normally this is computed internally and user does not need to set it. This value represents number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists)|1/6 of max JVM memory|no| -|maxTotalRows|Total number of rows in segments waiting for being pushed. Used in determining when intermediate pushing should occur.|20000000|no| -|numShards|Directly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if maxRowsPerSegment is set.|null|no| -|indexSpec|defines segment storage format options to be used at indexing time, see [IndexSpec](#indexspec)|null|no| -|indexSpecForIntermediatePersists|defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.|same as indexSpec|no| +|partitionsSpec|Defines how to partition the segments in timeChunk, see [PartitionsSpec](#partitionsspec)|`dynamic_partitions`|no| +|indexSpec|Defines segment storage format options to be used at indexing time, see [IndexSpec](#indexspec)|null|no| +|indexSpecForIntermediatePersists|Defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.|same as indexSpec|no| |maxPendingPersists|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|0 (meaning one persist can be running concurrently with ingestion, and none can be queued up)|no| |reportParseExceptions|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|false|no| |pushTimeout|Milliseconds to wait for pushing segments. It must be >= 0, where 0 means to wait forever.|0|no| @@ -197,6 +195,18 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |chatHandlerTimeout|Timeout for reporting the pushed segments in worker tasks.|PT10S|no| |chatHandlerNumRetries|Retries for reporting the pushed segments in worker tasks.|5|no| +#### PartitionsSpec + +PartitionsSpec is to describe the secondary partitioning method. +Parallel Index Task supports only the best-effort rollup mode, +and thus `dynamic_partitions` is only available option currently. + +|property|description|default|required?| +|--------|-----------|-------|---------| +|type|This should always be `dynamic_partitions`|none|yes| +|maxRowsPerSegment|Used in sharding. Determines how many rows are in each segment.|5000000|no| +|maxTotalRows|Total number of rows in segments waiting for being pushed. Used in determining when intermediate segment push should occur.|20000000|no| + #### HTTP Endpoints The supervisor task provides some HTTP endpoints to get running status. @@ -558,14 +568,11 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |property|description|default|required?| |--------|-----------|-------|---------| |type|The task type, this should always be "index".|none|yes| -|maxRowsPerSegment|Used in sharding. Determines how many rows are in each segment.|5000000|no| |maxRowsInMemory|Used in determining when intermediate persists to disk should occur. Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set.|1000000|no| |maxBytesInMemory|Used in determining when intermediate persists to disk should occur. Normally this is computed internally and user does not need to set it. This value represents number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists)|1/6 of max JVM memory|no| -|maxTotalRows|Total number of rows in segments waiting for being pushed. Used in determining when intermediate pushing should occur.|20000000|no| -|numShards|Directly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if maxRowsPerSegment is set.|null|no| -|partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions. Only used with `forceGuaranteedRollup` = true, will be ignored otherwise.|null|no| -|indexSpec|defines segment storage format options to be used at indexing time, see [IndexSpec](#indexspec)|null|no| -|indexSpecForIntermediatePersists|defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.|same as indexSpec|no| +|partitionsSpec|Defines how to partition the segments in timeChunk, see [PartitionsSpec](#partitionsspec)|`dynamic_partitions` if `forceGuaranteedRollup` = false, `hashed_partitions` if `forceGuaranteedRollup` = true|no| +|indexSpec|Defines segment storage format options to be used at indexing time, see [IndexSpec](#indexspec)|null|no| +|indexSpecForIntermediatePersists|Defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.|same as indexSpec|no| |maxPendingPersists|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|0 (meaning one persist can be running concurrently with ingestion, and none can be queued up)|no| |forceGuaranteedRollup|Forces guaranteeing the [perfect rollup](../ingestion/index.html#roll-up-modes). The perfect rollup optimizes the total size of generated segments and querying time while indexing time will be increased. If this is set to true, the index task will read the entire input data twice: one for finding the optimal number of partitions per time chunk and one for generating segments. Note that the result segments would be hash-partitioned. You can set `forceExtendableShardSpecs` if you plan to append more data to the same time range in the future. This flag cannot be used with `appendToExisting` of IOConfig. For more details, see the below __Segment pushing modes__ section.|false|no| |reportParseExceptions|DEPRECATED. If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped. Setting `reportParseExceptions` to true will override existing configurations for `maxParseExceptions` and `maxSavedParseExceptions`, setting `maxParseExceptions` to 0 and limiting `maxSavedParseExceptions` to no more than 1.|false|no| @@ -575,6 +582,27 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |maxParseExceptions|The maximum number of parse exceptions that can occur before the task halts ingestion and fails. Overridden if `reportParseExceptions` is set.|unlimited|no| |maxSavedParseExceptions|When a parse exception occurs, Druid can keep track of the most recent parse exceptions. "maxSavedParseExceptions" limits how many exception instances will be saved. These saved exceptions will be made available after the task finishes in the [task completion report](../ingestion/reports.html). Overridden if `reportParseExceptions` is set.|0|no| +#### PartitionsSpec + +PartitionsSpec is to describe the secondary partitioning method. +You should use different partitionsSpec depending on the [rollup mode](../ingestion/index.html#roll-up-modes) you want. +For perfect rollup, you should use `hashed_partitions`. + +|property|description|default|required?| +|--------|-----------|-------|---------| +|type|This should always be `hashed_partitions`|none|yes| +|maxRowsPerSegment|Used in sharding. Determines how many rows are in each segment.|5000000|no| +|numShards|Directly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if maxRowsPerSegment is set.|null|no| +|partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions.|null|no| + +For best-effort rollup, you should use `dynamic_partitions`. + +|property|description|default|required?| +|--------|-----------|-------|---------| +|type|This should always be `dynamic_partitions`|none|yes| +|maxRowsPerSegment|Used in sharding. Determines how many rows are in each segment.|5000000|no| +|maxTotalRows|Total number of rows in segments waiting for being pushed. Used in determining when intermediate segment push should occur.|20000000|no| + #### IndexSpec The indexSpec defines segment storage format options to be used at indexing time, such as bitmap type and column diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java index 57b4f372cf83..93884933dd34 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig; import org.apache.druid.indexing.kafka.test.TestModifiedKafkaIndexTaskTuningConfig; import org.apache.druid.jackson.DefaultObjectMapper; @@ -62,7 +63,7 @@ public void testSerdeWithDefaults() throws Exception Assert.assertNotNull(config.getBasePersistDirectory()); Assert.assertEquals(1000000, config.getMaxRowsInMemory()); Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue()); - Assert.assertEquals(null, config.getMaxTotalRows()); + Assert.assertEquals(DynamicPartitionsSpec.DEFAULT_MAX_TOTAL_ROWS, config.getMaxTotalRows().longValue()); Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod()); Assert.assertEquals(0, config.getMaxPendingPersists()); Assert.assertEquals(new IndexSpec(), config.getIndexSpec()); diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml index 3a3f5ff9f408..ea6f6c3fb5d9 100644 --- a/indexing-hadoop/pom.xml +++ b/indexing-hadoop/pom.xml @@ -42,12 +42,12 @@ org.apache.druid druid-core ${project.parent.version} - - - org.slf4j - slf4j-api - - + + + org.slf4j + slf4j-api + + diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java index 669366c58458..984d104a80c0 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java @@ -126,7 +126,10 @@ public boolean run() ); } - if (!config.getPartitionsSpec().isAssumeGrouped()) { + final SingleDimensionPartitionsSpec partitionsSpec = + (SingleDimensionPartitionsSpec) config.getPartitionsSpec(); + + if (!partitionsSpec.isAssumeGrouped()) { groupByJob = Job.getInstance( new Configuration(), StringUtils.format("%s-determine_partitions_groupby-%s", config.getDataSource(), config.getIntervals()) @@ -191,7 +194,7 @@ public boolean run() JobHelper.injectSystemProperties(dimSelectionJob); config.addJobProperties(dimSelectionJob); - if (!config.getPartitionsSpec().isAssumeGrouped()) { + if (!partitionsSpec.isAssumeGrouped()) { // Read grouped data from the groupByJob. dimSelectionJob.setMapperClass(DeterminePartitionsDimSelectionPostGroupByMapper.class); dimSelectionJob.setInputFormatClass(SequenceFileInputFormat.class); @@ -764,8 +767,10 @@ protected void innerReduce(Context context, SortableBytes keyBytes, Iterable config.getMaxPartitionSize()) { + if (partition.rows > partitionsSpec.getMaxPartitionSize()) { log.info("Dimension[%s] has an oversized shard: %s", dimPartitions.dim, partition.shardSpec); oversized = true; } diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java index 534afcb8cb1c..a25f274aae31 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java @@ -21,6 +21,10 @@ import com.google.common.collect.Lists; import com.google.inject.Inject; +import org.apache.druid.indexer.partitions.HashedPartitionsSpec; +import org.apache.druid.indexer.partitions.PartitionsSpec; +import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec; import org.joda.time.DateTime; @@ -40,9 +44,7 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby private String hadoopJobIdFile; @Inject - public HadoopDruidDetermineConfigurationJob( - HadoopDruidIndexerConfig config - ) + public HadoopDruidDetermineConfigurationJob(HadoopDruidIndexerConfig config) { this.config = config; } @@ -53,24 +55,32 @@ public boolean run() JobHelper.ensurePaths(config); if (config.isDeterminingPartitions()) { - job = config.getPartitionsSpec().getPartitionJob(config); + job = createPartitionJob(config); config.setHadoopJobIdFileName(hadoopJobIdFile); return JobHelper.runSingleJob(job, config); } else { - int shardsPerInterval = config.getPartitionsSpec().getNumShards(); + final PartitionsSpec partitionsSpec = config.getPartitionsSpec(); + final int shardsPerInterval; + if (partitionsSpec instanceof HashedPartitionsSpec) { + final HashedPartitionsSpec hashedPartitionsSpec = (HashedPartitionsSpec) partitionsSpec; + shardsPerInterval = PartitionsSpec.isEffectivelyNull(hashedPartitionsSpec.getNumShards()) + ? 1 + : hashedPartitionsSpec.getNumShards(); + } else { + shardsPerInterval = 1; + } Map> shardSpecs = new TreeMap<>(); int shardCount = 0; for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) { DateTime bucket = segmentGranularity.getStart(); // negative shardsPerInterval means a single shard - final int realShardsPerInterval = shardsPerInterval < 0 ? 1 : shardsPerInterval; - List specs = Lists.newArrayListWithCapacity(realShardsPerInterval); - for (int i = 0; i < realShardsPerInterval; i++) { + List specs = Lists.newArrayListWithCapacity(shardsPerInterval); + for (int i = 0; i < shardsPerInterval; i++) { specs.add( new HadoopyShardSpec( new HashBasedNumberedShardSpec( i, - realShardsPerInterval, + shardsPerInterval, config.getPartitionsSpec().getPartitionDimensions(), HadoopDruidIndexerConfig.JSON_MAPPER ), @@ -86,6 +96,18 @@ public boolean run() } } + private static Jobby createPartitionJob(HadoopDruidIndexerConfig config) + { + final PartitionsSpec partitionsSpec = config.getPartitionsSpec(); + if (partitionsSpec instanceof HashedPartitionsSpec) { + return new DetermineHashedPartitionsJob(config); + } else if (partitionsSpec instanceof SingleDimensionPartitionsSpec) { + return new DeterminePartitionsJob(config); + } else { + throw new ISE("Unknown partitionsSpec[%s]", partitionsSpec); + } + } + @Override public Map getStats() { diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java index 4aa8cd25d462..37e811781141 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java @@ -39,7 +39,7 @@ import org.apache.druid.guice.GuiceInjectors; import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.guice.annotations.Self; -import org.apache.druid.indexer.partitions.PartitionsSpec; +import org.apache.druid.indexer.partitions.DimensionBasedPartitionsSpec; import org.apache.druid.indexer.path.PathSpec; import org.apache.druid.initialization.Initialization; import org.apache.druid.java.util.common.DateTimes; @@ -289,7 +289,7 @@ public void setGranularitySpec(GranularitySpec granularitySpec) this.pathSpec = JSON_MAPPER.convertValue(schema.getIOConfig().getPathSpec(), PathSpec.class); } - public PartitionsSpec getPartitionsSpec() + public DimensionBasedPartitionsSpec getPartitionsSpec() { return schema.getTuningConfig().getPartitionsSpec(); } @@ -327,12 +327,13 @@ public Optional> getIntervals() public boolean isDeterminingPartitions() { - return schema.getTuningConfig().getPartitionsSpec().isDeterminingPartitions(); + return schema.getTuningConfig().getPartitionsSpec().needsDeterminePartitions(true); } - public Long getTargetPartitionSize() + public int getTargetPartitionSize() { - return schema.getTuningConfig().getPartitionsSpec().getTargetPartitionSize(); + final Integer targetPartitionSize = schema.getTuningConfig().getPartitionsSpec().getMaxRowsPerSegment(); + return targetPartitionSize == null ? -1 : targetPartitionSize; } public boolean isForceExtendableShardSpecs() @@ -340,11 +341,6 @@ public boolean isForceExtendableShardSpecs() return schema.getTuningConfig().isForceExtendableShardSpecs(); } - public long getMaxPartitionSize() - { - return schema.getTuningConfig().getPartitionsSpec().getMaxPartitionSize(); - } - public boolean isUpdaterJobSpecSet() { return (schema.getIOConfig().getMetadataUpdateSpec() != null); diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java index e61f912e9e8e..704f4c794d53 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java @@ -25,8 +25,8 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import org.apache.druid.indexer.partitions.DimensionBasedPartitionsSpec; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; -import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.indexing.TuningConfig; @@ -40,7 +40,7 @@ @JsonTypeName("hadoop") public class HadoopTuningConfig implements TuningConfig { - private static final PartitionsSpec DEFAULT_PARTITIONS_SPEC = HashedPartitionsSpec.makeDefaultHashedPartitionsSpec(); + private static final DimensionBasedPartitionsSpec DEFAULT_PARTITIONS_SPEC = HashedPartitionsSpec.defaultSpec(); private static final Map> DEFAULT_SHARD_SPECS = ImmutableMap.of(); private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec(); private static final int DEFAULT_ROW_FLUSH_BOUNDARY = TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY; @@ -79,7 +79,7 @@ public static HadoopTuningConfig makeDefaultTuningConfig() private final String workingPath; private final String version; - private final PartitionsSpec partitionsSpec; + private final DimensionBasedPartitionsSpec partitionsSpec; private final Map> shardSpecs; private final IndexSpec indexSpec; private final IndexSpec indexSpecForIntermediatePersists; @@ -104,7 +104,7 @@ public static HadoopTuningConfig makeDefaultTuningConfig() public HadoopTuningConfig( final @JsonProperty("workingPath") String workingPath, final @JsonProperty("version") String version, - final @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec, + final @JsonProperty("partitionsSpec") DimensionBasedPartitionsSpec partitionsSpec, final @JsonProperty("shardSpecs") Map> shardSpecs, final @JsonProperty("indexSpec") IndexSpec indexSpec, final @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, @@ -187,7 +187,7 @@ public String getVersion() } @JsonProperty - public PartitionsSpec getPartitionsSpec() + public DimensionBasedPartitionsSpec getPartitionsSpec() { return partitionsSpec; } diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexingHadoopModule.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexingHadoopModule.java index 9e0d29cc7136..1d7185e525c8 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexingHadoopModule.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexingHadoopModule.java @@ -36,7 +36,7 @@ public class IndexingHadoopModule implements DruidModule public List getJacksonModules() { return Collections.singletonList( - new SimpleModule("IndexingHadoopModule") + new SimpleModule(getClass().getSimpleName()) .registerSubtypes( new NamedType(HadoopyStringInputRowParser.class, "hadoopyString") ) diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/partitions/AbstractPartitionsSpec.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/partitions/AbstractPartitionsSpec.java deleted file mode 100644 index 28812797dd6f..000000000000 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/partitions/AbstractPartitionsSpec.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexer.partitions; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Preconditions; - - -public abstract class AbstractPartitionsSpec implements PartitionsSpec -{ - private static final double DEFAULT_OVERSIZE_THRESHOLD = 1.5; - private static final long DEFAULT_TARGET_PARTITION_SIZE = -1; - - private final long targetPartitionSize; - private final long maxPartitionSize; - private final boolean assumeGrouped; - private final int numShards; - - public AbstractPartitionsSpec( - Long targetPartitionSize, - Long maxPartitionSize, - Boolean assumeGrouped, - Integer numShards - ) - { - this.targetPartitionSize = targetPartitionSize == null ? DEFAULT_TARGET_PARTITION_SIZE : targetPartitionSize; - this.maxPartitionSize = maxPartitionSize == null - ? (long) (this.targetPartitionSize * DEFAULT_OVERSIZE_THRESHOLD) - : maxPartitionSize; - this.assumeGrouped = assumeGrouped == null ? false : assumeGrouped; - this.numShards = numShards == null ? -1 : numShards; - Preconditions.checkArgument( - this.targetPartitionSize == -1 || this.numShards == -1, - "targetPartitionsSize and shardCount both cannot be set" - ); - } - - @Override - @JsonProperty - public long getTargetPartitionSize() - { - return targetPartitionSize; - } - - @Override - @JsonProperty - public long getMaxPartitionSize() - { - return maxPartitionSize; - } - - @Override - @JsonProperty - public boolean isAssumeGrouped() - { - return assumeGrouped; - } - - @Override - public boolean isDeterminingPartitions() - { - return targetPartitionSize > 0; - } - - @Override - public int getNumShards() - { - return numShards; - } -} diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java deleted file mode 100644 index 31905e420e5f..000000000000 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexer.partitions; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.ImmutableList; -import org.apache.druid.indexer.DetermineHashedPartitionsJob; -import org.apache.druid.indexer.HadoopDruidIndexerConfig; -import org.apache.druid.indexer.Jobby; - -import javax.annotation.Nullable; -import java.util.List; - -public class HashedPartitionsSpec extends AbstractPartitionsSpec -{ - private static final List DEFAULT_PARTITION_DIMENSIONS = ImmutableList.of(); - - public static HashedPartitionsSpec makeDefaultHashedPartitionsSpec() - { - return new HashedPartitionsSpec(null, null, null, null, null); - } - - @JsonIgnore - private final List partitionDimensions; - - @JsonCreator - public HashedPartitionsSpec( - @JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize, - @JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize, - @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped, - @JsonProperty("numShards") @Nullable Integer numShards, - @JsonProperty("partitionDimensions") @Nullable List partitionDimensions - ) - { - super(targetPartitionSize, maxPartitionSize, assumeGrouped, numShards); - this.partitionDimensions = partitionDimensions == null ? DEFAULT_PARTITION_DIMENSIONS : partitionDimensions; - } - - @Override - public Jobby getPartitionJob(HadoopDruidIndexerConfig config) - { - return new DetermineHashedPartitionsJob(config); - } - - @Override - @JsonProperty - public List getPartitionDimensions() - { - return partitionDimensions; - } -} diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/partitions/PartitionsSpec.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/partitions/PartitionsSpec.java deleted file mode 100644 index 44268e2a4c2b..000000000000 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/partitions/PartitionsSpec.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexer.partitions; - -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonSubTypes; -import com.fasterxml.jackson.annotation.JsonTypeInfo; -import org.apache.druid.indexer.HadoopDruidIndexerConfig; -import org.apache.druid.indexer.Jobby; - -import java.util.List; - -@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = HashedPartitionsSpec.class) -@JsonSubTypes(value = { - @JsonSubTypes.Type(name = "dimension", value = SingleDimensionPartitionsSpec.class), - @JsonSubTypes.Type(name = "hashed", value = HashedPartitionsSpec.class) -}) -public interface PartitionsSpec -{ - @JsonIgnore - Jobby getPartitionJob(HadoopDruidIndexerConfig config); - - @JsonProperty - long getTargetPartitionSize(); - - @JsonProperty - long getMaxPartitionSize(); - - @JsonProperty - boolean isAssumeGrouped(); - - @JsonIgnore - boolean isDeterminingPartitions(); - - @JsonProperty - int getNumShards(); - - @JsonProperty - List getPartitionDimensions(); -} diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpec.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpec.java deleted file mode 100644 index 5013044b1de5..000000000000 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpec.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexer.partitions; - - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.ImmutableList; -import org.apache.druid.indexer.DeterminePartitionsJob; -import org.apache.druid.indexer.HadoopDruidIndexerConfig; -import org.apache.druid.indexer.Jobby; - -import javax.annotation.Nullable; -import java.util.List; - -public class SingleDimensionPartitionsSpec extends AbstractPartitionsSpec -{ - @Nullable - private final String partitionDimension; - - @JsonCreator - public SingleDimensionPartitionsSpec( - @JsonProperty("partitionDimension") @Nullable String partitionDimension, - @JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize, - @JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize, - @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped - ) - { - super(targetPartitionSize, maxPartitionSize, assumeGrouped, null); - this.partitionDimension = partitionDimension; - } - - @JsonProperty - @Nullable - public String getPartitionDimension() - { - return partitionDimension; - } - - @Override - public Jobby getPartitionJob(HadoopDruidIndexerConfig config) - { - return new DeterminePartitionsJob(config); - } - - @Override - @JsonProperty - public List getPartitionDimensions() - { - return ImmutableList.of(); - } -} diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java index 26498746c217..c513f0cf5874 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java @@ -73,7 +73,7 @@ public static Collection data() new Object[][]{ { DetermineHashedPartitionsJobTest.class.getResource("/druid.test.data.with.duplicate.rows.tsv").getPath(), - 1L, + 1, "2011-04-10T00:00:00.000Z/2011-04-11T00:00:00.000Z", 0, 1, @@ -82,7 +82,7 @@ public static Collection data() }, { DetermineHashedPartitionsJobTest.class.getResource("/druid.test.data.with.duplicate.rows.tsv").getPath(), - 100L, + 100, "2011-04-10T00:00:00.000Z/2011-04-16T00:00:00.000Z", 0, 6, @@ -91,7 +91,7 @@ public static Collection data() }, { DetermineHashedPartitionsJobTest.class.getResource("/druid.test.data.with.duplicate.rows.tsv").getPath(), - 1L, + 1, "2011-04-10T00:00:00.000Z/2011-04-16T00:00:00.000Z", 0, 6, @@ -100,7 +100,7 @@ public static Collection data() }, { DetermineHashedPartitionsJobTest.class.getResource("/druid.test.data.with.duplicate.rows.tsv").getPath(), - 1L, + 1, null, 0, 6, @@ -109,7 +109,7 @@ public static Collection data() }, { DetermineHashedPartitionsJobTest.class.getResource("/druid.test.data.with.rows.in.timezone.tsv").getPath(), - 1L, + 1, null, 0, 1, @@ -122,7 +122,7 @@ public static Collection data() public DetermineHashedPartitionsJobTest( String dataFilePath, - long targetPartitionSize, + int targetPartitionSize, String interval, int errorMargin, int expectedNumTimeBuckets, @@ -194,7 +194,7 @@ public DetermineHashedPartitionsJobTest( new HadoopTuningConfig( tmpDir.getAbsolutePath(), null, - new HashedPartitionsSpec(targetPartitionSize, null, true, null, null), + new HashedPartitionsSpec(targetPartitionSize, null, null), null, null, null, diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java index 3d8b06bab777..2c35b75e2511 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java @@ -72,7 +72,7 @@ public static Collection constructFeed() new Object[][]{ { true, - 3L, + 3, "2014-10-22T00:00:00Z/P1D", 1, new int[]{5}, @@ -100,7 +100,7 @@ public static Collection constructFeed() }, { false, - 3L, + 3, "2014-10-20T00:00:00Z/P1D", 1, new int[]{5}, @@ -138,7 +138,7 @@ public static Collection constructFeed() }, { true, - 6L, + 6, "2014-10-20T00:00:00Z/P3D", 3, new int[]{2, 2, 2}, @@ -198,7 +198,7 @@ public static Collection constructFeed() public DeterminePartitionsJobTest( boolean assumeGrouped, - Long targetPartitionSize, + Integer targetPartitionSize, String interval, int expectedNumOfSegments, int[] expectedNumOfShardsForEachSegment, @@ -257,7 +257,7 @@ public DeterminePartitionsJobTest( new HadoopTuningConfig( tmpDir.getCanonicalPath(), null, - new SingleDimensionPartitionsSpec(null, targetPartitionSize, null, assumeGrouped), + new SingleDimensionPartitionsSpec(targetPartitionSize, null, null, assumeGrouped), null, null, null, diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopIngestionSpecTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopIngestionSpecTest.java index 3b17d411b5f8..fbd13c92f560 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopIngestionSpecTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopIngestionSpecTest.java @@ -41,11 +41,11 @@ public class HadoopIngestionSpecTest { - private static final ObjectMapper jsonMapper; + private static final ObjectMapper JSON_MAPPER; static { - jsonMapper = new DefaultObjectMapper(); - jsonMapper.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, jsonMapper)); + JSON_MAPPER = new DefaultObjectMapper(); + JSON_MAPPER.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, JSON_MAPPER)); } @Test @@ -146,15 +146,11 @@ public void testPartitionsSpecAutoHashed() final PartitionsSpec partitionsSpec = schema.getTuningConfig().getPartitionsSpec(); - Assert.assertEquals( - "isDeterminingPartitions", - partitionsSpec.isDeterminingPartitions(), - true - ); + Assert.assertTrue("isDeterminingPartitions", partitionsSpec.needsDeterminePartitions(true)); Assert.assertEquals( "getTargetPartitionSize", - partitionsSpec.getTargetPartitionSize(), + partitionsSpec.getMaxRowsPerSegment().intValue(), 100 ); @@ -190,17 +186,18 @@ public void testPartitionsSpecMaxPartitionSize() throw new RuntimeException(e); } - final PartitionsSpec partitionsSpec = schema.getTuningConfig().getPartitionsSpec(); + final SingleDimensionPartitionsSpec partitionsSpec = + (SingleDimensionPartitionsSpec) schema.getTuningConfig().getPartitionsSpec(); Assert.assertEquals( "isDeterminingPartitions", - partitionsSpec.isDeterminingPartitions(), + partitionsSpec.needsDeterminePartitions(true), true ); Assert.assertEquals( "getTargetPartitionSize", - partitionsSpec.getTargetPartitionSize(), + partitionsSpec.getMaxRowsPerSegment().intValue(), 100 ); @@ -213,7 +210,7 @@ public void testPartitionsSpecMaxPartitionSize() Assert.assertTrue("partitionsSpec", partitionsSpec instanceof SingleDimensionPartitionsSpec); Assert.assertEquals( "getPartitionDimension", - ((SingleDimensionPartitionsSpec) partitionsSpec).getPartitionDimension(), + partitionsSpec.getPartitionDimension(), "foo" ); } @@ -275,10 +272,9 @@ public void testDefaultSettings() false ); - Assert.assertEquals( + Assert.assertFalse( "isDeterminingPartitions", - schema.getTuningConfig().getPartitionsSpec().isDeterminingPartitions(), - false + schema.getTuningConfig().getPartitionsSpec().needsDeterminePartitions(true) ); Assert.assertFalse(Strings.isNullOrEmpty(schema.getUniqueId())); @@ -338,7 +334,7 @@ public void testNoCleanupOnFailure() private T jsonReadWriteRead(String s, Class klass) { try { - return jsonMapper.readValue(jsonMapper.writeValueAsBytes(jsonMapper.readValue(s, klass)), klass); + return JSON_MAPPER.readValue(JSON_MAPPER.writeValueAsBytes(JSON_MAPPER.readValue(s, klass)), klass); } catch (Exception e) { throw new RuntimeException(e); diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/partitions/HashedPartitionsSpecTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/partitions/HashedPartitionsSpecTest.java index e8074eac8958..3e13fa2239f7 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/partitions/HashedPartitionsSpecTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/partitions/HashedPartitionsSpecTest.java @@ -29,110 +29,84 @@ */ public class HashedPartitionsSpecTest { - private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); + private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper(); @Test public void testHashedPartitionsSpec() { { - final PartitionsSpec partitionsSpec; - - try { - partitionsSpec = jsonReadWriteRead( - "{" - + " \"targetPartitionSize\":100," - + " \"type\":\"hashed\"" - + "}", - PartitionsSpec.class - ); - } - catch (Exception e) { - throw new RuntimeException(e); - } + final PartitionsSpec partitionsSpec = jsonReadWriteRead( + "{" + + " \"targetPartitionSize\":100," + + " \"type\":\"hashed\"" + + "}", + PartitionsSpec.class + ); + Assert.assertTrue("partitionsSpec", partitionsSpec instanceof HashedPartitionsSpec); + final HashedPartitionsSpec hadoopHashedPartitionsSpec = (HashedPartitionsSpec) partitionsSpec; Assert.assertEquals( "isDeterminingPartitions", - partitionsSpec.isDeterminingPartitions(), + hadoopHashedPartitionsSpec.needsDeterminePartitions(true), true ); Assert.assertEquals( "getTargetPartitionSize", - partitionsSpec.getTargetPartitionSize(), + hadoopHashedPartitionsSpec.getMaxRowsPerSegment().intValue(), 100 ); - Assert.assertEquals( - "getMaxPartitionSize", - partitionsSpec.getMaxPartitionSize(), - 150 - ); - Assert.assertEquals( "getPartitionDimensions", - partitionsSpec.getPartitionDimensions(), + hadoopHashedPartitionsSpec.getPartitionDimensions(), ImmutableList.of() ); - - Assert.assertTrue("partitionsSpec", partitionsSpec instanceof HashedPartitionsSpec); } } @Test public void testHashedPartitionsSpecShardCount() { - final PartitionsSpec partitionsSpec; - - try { - partitionsSpec = jsonReadWriteRead( - "{" - + " \"type\":\"hashed\"," - + " \"numShards\":2" - + "}", - PartitionsSpec.class - ); - } - catch (Exception e) { - throw new RuntimeException(e); - } + final PartitionsSpec partitionsSpec = jsonReadWriteRead( + "{" + + " \"type\":\"hashed\"," + + " \"numShards\":2" + + "}", + PartitionsSpec.class + ); + Assert.assertTrue("partitionsSpec", partitionsSpec instanceof HashedPartitionsSpec); + final HashedPartitionsSpec hadoopHashedPartitionsSpec = (HashedPartitionsSpec) partitionsSpec; Assert.assertEquals( "isDeterminingPartitions", - partitionsSpec.isDeterminingPartitions(), + hadoopHashedPartitionsSpec.needsDeterminePartitions(true), false ); - Assert.assertEquals( + Assert.assertNull( "getTargetPartitionSize", - partitionsSpec.getTargetPartitionSize(), - -1 - ); - - Assert.assertEquals( - "getMaxPartitionSize", - partitionsSpec.getMaxPartitionSize(), - -1 + hadoopHashedPartitionsSpec.getMaxRowsPerSegment() ); Assert.assertEquals( "shardCount", - partitionsSpec.getNumShards(), + hadoopHashedPartitionsSpec.getNumShards().intValue(), 2 ); Assert.assertEquals( "getPartitionDimensions", - partitionsSpec.getPartitionDimensions(), + hadoopHashedPartitionsSpec.getPartitionDimensions(), ImmutableList.of() ); - Assert.assertTrue("partitionsSpec", partitionsSpec instanceof HashedPartitionsSpec); } private T jsonReadWriteRead(String s, Class klass) { try { - return jsonMapper.readValue(jsonMapper.writeValueAsBytes(jsonMapper.readValue(s, klass)), klass); + return JSON_MAPPER.readValue(JSON_MAPPER.writeValueAsBytes(JSON_MAPPER.readValue(s, klass)), klass); } catch (Exception e) { throw new RuntimeException(e); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java index 1676a2fa5c97..3a96070f064d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; import com.google.common.io.Files; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.indexing.TuningConfig; import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig; @@ -39,7 +40,6 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, AppenderatorConfig { private static final int defaultMaxRowsInMemory = TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY; - private static final int defaultMaxRowsPerSegment = 5_000_000; private static final Period defaultIntermediatePersistPeriod = new Period("PT10M"); private static final int defaultMaxPendingPersists = 0; private static final ShardSpec defaultShardSpec = new NumberedShardSpec(0, 1); @@ -55,9 +55,7 @@ private static File createNewBasePersistDirectory() private final int maxRowsInMemory; private final long maxBytesInMemory; - private final int maxRowsPerSegment; - @Nullable - private final Long maxTotalRows; + private final DynamicPartitionsSpec partitionsSpec; private final Period intermediatePersistPeriod; private final File basePersistDirectory; private final int maxPendingPersists; @@ -96,11 +94,10 @@ public RealtimeAppenderatorTuningConfig( ) { this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory; - this.maxRowsPerSegment = maxRowsPerSegment == null ? defaultMaxRowsPerSegment : maxRowsPerSegment; // initializing this to 0, it will be lazily intialized to a value // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long) this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory; - this.maxTotalRows = maxTotalRows; + this.partitionsSpec = new DynamicPartitionsSpec(maxRowsPerSegment, maxTotalRows); this.intermediatePersistPeriod = intermediatePersistPeriod == null ? defaultIntermediatePersistPeriod : intermediatePersistPeriod; @@ -155,7 +152,7 @@ public long getMaxBytesInMemory() @JsonProperty public Integer getMaxRowsPerSegment() { - return maxRowsPerSegment; + return partitionsSpec.getMaxRowsPerSegment(); } @Override @@ -163,7 +160,12 @@ public Integer getMaxRowsPerSegment() @Nullable public Long getMaxTotalRows() { - return maxTotalRows; + return partitionsSpec.getMaxTotalRows(); + } + + public DynamicPartitionsSpec getPartitionsSpec() + { + return partitionsSpec; } @Override @@ -257,8 +259,8 @@ public RealtimeAppenderatorTuningConfig withBasePersistDirectory(File dir) return new RealtimeAppenderatorTuningConfig( maxRowsInMemory, maxBytesInMemory, - maxRowsPerSegment, - maxTotalRows, + partitionsSpec.getMaxRowsPerSegment(), + partitionsSpec.getMaxTotalRows(), intermediatePersistPeriod, dir, maxPendingPersists, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index 9ec84de46946..3c22d2043c4f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -368,7 +368,11 @@ public TaskStatus run(final TaskToolbox toolbox) AppenderatorDriverAddResult addResult = driver.add(inputRow, sequenceName, committerSupplier); if (addResult.isOk()) { - if (addResult.isPushRequired(tuningConfig)) { + final boolean isPushRequired = addResult.isPushRequired( + tuningConfig.getPartitionsSpec().getMaxRowsPerSegment(), + tuningConfig.getPartitionsSpec().getMaxTotalRows() + ); + if (isPushRequired) { publishSegments(driver, publisher, committerSupplier, sequenceName); sequenceNumber++; sequenceName = makeSequenceName(getId(), sequenceNumber); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 6dc1e0f3e3c8..1f16f01ca56a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -41,6 +41,8 @@ import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.TimeAndDimsParseSpec; import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.SegmentLoaderFactory; import org.apache.druid.indexing.common.TaskToolbox; @@ -853,8 +855,14 @@ IndexTuningConfig computeTuningConfig(List> qu // Setting maxTotalRows to Long.MAX_VALUE to respect the computed maxRowsPerSegment. // If this is set to something too small, compactionTask can generate small segments // which need to be compacted again, which in turn making auto compaction stuck in the same interval. - return (tuningConfig == null ? IndexTuningConfig.createDefault() : tuningConfig) - .withMaxRowsPerSegment(maxRowsPerSegment).withMaxTotalRows(Long.MAX_VALUE); + final IndexTuningConfig newTuningConfig = tuningConfig == null + ? IndexTuningConfig.createDefault() + : tuningConfig; + if (newTuningConfig.isForceGuaranteedRollup()) { + return newTuningConfig.withPartitionsSpec(new HashedPartitionsSpec(maxRowsPerSegment, null, null)); + } else { + return newTuningConfig.withPartitionsSpec(new DynamicPartitionsSpec(maxRowsPerSegment, Long.MAX_VALUE)); + } } else { return tuningConfig; } @@ -862,8 +870,7 @@ IndexTuningConfig computeTuningConfig(List> qu /** * Check the validity of {@link #targetCompactionSizeBytes} and return a valid value. Note that - * targetCompactionSizeBytes cannot be used with {@link IndexTuningConfig#maxRowsPerSegment}, - * {@link IndexTuningConfig#maxTotalRows}, or {@link IndexTuningConfig#numShards} together. + * targetCompactionSizeBytes cannot be used with {@link IndexTuningConfig#getPartitionsSpec} together. * {@link #hasPartitionConfig} checks one of those configs is set. *

* This throws an {@link IllegalArgumentException} if targetCompactionSizeBytes is set and hasPartitionConfig @@ -880,12 +887,9 @@ private static Long getValidTargetCompactionSizeBytes( if (targetCompactionSizeBytes != null && tuningConfig != null) { Preconditions.checkArgument( !hasPartitionConfig(tuningConfig), - "targetCompactionSizeBytes[%s] cannot be used with maxRowsPerSegment[%s], maxTotalRows[%s]," - + " or numShards[%s] of tuningConfig", + "targetCompactionSizeBytes[%s] cannot be used with partitionsSpec[%s]", targetCompactionSizeBytes, - tuningConfig.getMaxRowsPerSegment(), - tuningConfig.getMaxTotalRows(), - tuningConfig.getNumShards() + tuningConfig.getPartitionsSpec() ); return targetCompactionSizeBytes; } else { @@ -898,9 +902,7 @@ private static Long getValidTargetCompactionSizeBytes( private static boolean hasPartitionConfig(@Nullable IndexTuningConfig tuningConfig) { if (tuningConfig != null) { - return tuningConfig.getMaxRowsPerSegment() != null - || tuningConfig.getMaxTotalRows() != null - || tuningConfig.getNumShards() != null; + return tuningConfig.getPartitionsSpec() != null; } else { return false; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index b89772c05f32..16254de351ad 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -39,6 +39,9 @@ import org.apache.druid.hll.HyperLogLogCollector; import org.apache.druid.indexer.IngestionState; import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.indexer.partitions.HashedPartitionsSpec; +import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; @@ -460,19 +463,13 @@ public TaskStatus run(final TaskToolbox toolbox) // Initialize maxRowsPerSegment and maxTotalRows lazily final IndexTuningConfig tuningConfig = ingestionSchema.tuningConfig; - @Nullable - final Integer maxRowsPerSegment = getValidMaxRowsPerSegment(tuningConfig); - @Nullable - final Long maxTotalRows = getValidMaxTotalRows(tuningConfig); - // Spec for segment allocation. This is used only for perfect rollup mode. - // See createSegmentAllocator(). + final PartitionsSpec partitionsSpec = tuningConfig.getGivenOrDefaultPartitionsSpec(); final Map> allocateSpec = determineShardSpecs( toolbox, firehoseFactory, firehoseTempDir, - maxRowsPerSegment + partitionsSpec ); - final List allocateIntervals = new ArrayList<>(allocateSpec.keySet()); final DataSchema dataSchema; if (determineIntervals) { @@ -496,8 +493,7 @@ public TaskStatus run(final TaskToolbox toolbox) allocateSpec, firehoseFactory, firehoseTempDir, - maxRowsPerSegment, - maxTotalRows + partitionsSpec ); } catch (Exception e) { @@ -597,7 +593,8 @@ private static boolean isGuaranteedRollup(IndexIOConfig ioConfig, IndexTuningCon * specified in {@link IndexTuningConfig}. *

* If both intervals and shardSpecs don't have to be determined, this method simply returns {@link ShardSpecs} for the - * given intervals. Here, if {@link IndexTuningConfig#numShards} is not specified, {@link NumberedShardSpec} is used. + * given intervals. Here, if {@link HashedPartitionsSpec#numShards} is not specified, {@link NumberedShardSpec} is + * used. *

* If one of intervals or shardSpecs need to be determined, this method reads the entire input for determining one of * them. If the perfect rollup must be guaranteed, {@link HashBasedNumberedShardSpec} is used for hash partitioning @@ -609,7 +606,7 @@ private Map> determineShardSpecs( final TaskToolbox toolbox, final FirehoseFactory firehoseFactory, final File firehoseTempDir, - @Nullable final Integer maxRowsPerSegment + final PartitionsSpec nonNullPartitionsSpec ) throws IOException { final ObjectMapper jsonMapper = toolbox.getObjectMapper(); @@ -622,13 +619,17 @@ private Map> determineShardSpecs( final boolean determineIntervals = !granularitySpec.bucketIntervals().isPresent(); // Must determine partitions if rollup is guaranteed and the user didn't provide a specific value. - final boolean determineNumPartitions = tuningConfig.getNumShards() == null - && isGuaranteedRollup(ioConfig, tuningConfig); + final boolean determineNumPartitions = nonNullPartitionsSpec.needsDeterminePartitions(false); // if we were given number of shards per interval and the intervals, we don't need to scan the data if (!determineNumPartitions && !determineIntervals) { log.info("Skipping determine partition scan"); - return createShardSpecWithoutInputScan(granularitySpec, ioConfig, tuningConfig); + return createShardSpecWithoutInputScan( + granularitySpec, + ioConfig, + tuningConfig, + nonNullPartitionsSpec + ); } else { // determine intervals containing data and prime HLL collectors return createShardSpecsFromInput( @@ -637,10 +638,8 @@ private Map> determineShardSpecs( firehoseFactory, firehoseTempDir, granularitySpec, - tuningConfig, - determineIntervals, - determineNumPartitions, - maxRowsPerSegment + nonNullPartitionsSpec, + determineIntervals ); } } @@ -648,7 +647,8 @@ private Map> determineShardSpecs( private Map> createShardSpecWithoutInputScan( GranularitySpec granularitySpec, IndexIOConfig ioConfig, - IndexTuningConfig tuningConfig + IndexTuningConfig tuningConfig, + PartitionsSpec nonNullPartitionsSpec ) { final Map> allocateSpec = new HashMap<>(); @@ -656,11 +656,14 @@ private Map> createShardSpecWithoutInp if (isGuaranteedRollup(ioConfig, tuningConfig)) { // Overwrite mode, guaranteed rollup: shardSpecs must be known in advance. - final int numShards = tuningConfig.getNumShards() == null ? 1 : tuningConfig.getNumShards(); + assert nonNullPartitionsSpec instanceof HashedPartitionsSpec; + final HashedPartitionsSpec partitionsSpec = (HashedPartitionsSpec) nonNullPartitionsSpec; + final int numShards = partitionsSpec.getNumShards() == null ? 1 : partitionsSpec.getNumShards(); + for (Interval interval : intervals) { allocateSpec.put( interval, - createShardSpecFactoryForGuaranteedRollup(numShards, tuningConfig.partitionDimensions) + createShardSpecFactoryForGuaranteedRollup(numShards, partitionsSpec.getPartitionDimensions()) ); } } else { @@ -678,10 +681,8 @@ private Map> createShardSpecsFromInput FirehoseFactory firehoseFactory, File firehoseTempDir, GranularitySpec granularitySpec, - IndexTuningConfig tuningConfig, - boolean determineIntervals, - boolean determineNumPartitions, - @Nullable Integer maxRowsPerSegment + PartitionsSpec nonNullPartitionsSpec, + boolean determineIntervals ) throws IOException { log.info("Determining intervals and shardSpecs"); @@ -693,33 +694,42 @@ private Map> createShardSpecsFromInput firehoseFactory, firehoseTempDir, granularitySpec, - determineIntervals, - determineNumPartitions + nonNullPartitionsSpec, + determineIntervals ); final Map> allocateSpecs = new HashMap<>(); - final int defaultNumShards = tuningConfig.getNumShards() == null ? 1 : tuningConfig.getNumShards(); for (final Map.Entry> entry : hllCollectors.entrySet()) { final Interval interval = entry.getKey(); - final HyperLogLogCollector collector = entry.getValue().orNull(); - - final int numShards; - if (determineNumPartitions) { - final long numRows = Preconditions.checkNotNull(collector, "HLL collector").estimateCardinalityRound(); - numShards = (int) Math.ceil( - (double) numRows / Preconditions.checkNotNull(maxRowsPerSegment, "maxRowsPerSegment") - ); - log.info("Estimated [%,d] rows of data for interval [%s], creating [%,d] shards", numRows, interval, numShards); - } else { - numShards = defaultNumShards; - log.info("Creating [%,d] shards for interval [%s]", numShards, interval); - } if (isGuaranteedRollup(ingestionSchema.getIOConfig(), ingestionSchema.getTuningConfig())) { + assert nonNullPartitionsSpec instanceof HashedPartitionsSpec; + final HashedPartitionsSpec partitionsSpec = (HashedPartitionsSpec) nonNullPartitionsSpec; + + final HyperLogLogCollector collector = entry.getValue().orNull(); + + final int numShards; + if (partitionsSpec.needsDeterminePartitions(false)) { + final long numRows = Preconditions.checkNotNull(collector, "HLL collector").estimateCardinalityRound(); + final int nonNullMaxRowsPerSegment = partitionsSpec.getMaxRowsPerSegment() == null + ? PartitionsSpec.DEFAULT_MAX_ROWS_PER_SEGMENT + : partitionsSpec.getMaxRowsPerSegment(); + numShards = (int) Math.ceil((double) numRows / nonNullMaxRowsPerSegment); + log.info( + "Estimated [%,d] rows of data for interval [%s], creating [%,d] shards", + numRows, + interval, + numShards + ); + } else { + numShards = partitionsSpec.getNumShards() == null ? 1 : partitionsSpec.getNumShards(); + log.info("Creating [%,d] shards for interval [%s]", numShards, interval); + } + // Overwrite mode, guaranteed rollup: # of shards must be known in advance. allocateSpecs.put( interval, - createShardSpecFactoryForGuaranteedRollup(numShards, tuningConfig.partitionDimensions) + createShardSpecFactoryForGuaranteedRollup(numShards, partitionsSpec.getPartitionDimensions()) ); } else { allocateSpecs.put(interval, null); @@ -744,8 +754,8 @@ private Map> collectIntervalsAndShardSp FirehoseFactory firehoseFactory, File firehoseTempDir, GranularitySpec granularitySpec, - boolean determineIntervals, - boolean determineNumPartitions + PartitionsSpec nonNullPartitionsSpec, + boolean determineIntervals ) throws IOException { final Map> hllCollectors = new TreeMap<>( @@ -787,7 +797,7 @@ private Map> collectIntervalsAndShardSp interval = optInterval.get(); } - if (determineNumPartitions) { + if (nonNullPartitionsSpec.needsDeterminePartitions(false)) { hllCollectors.computeIfAbsent(interval, intv -> Optional.of(HyperLogLogCollector.makeLatestCollector())); List groupKey = Rows.toGroupKey( @@ -860,14 +870,14 @@ private IndexTaskSegmentAllocator createSegmentAllocator( /** * This method reads input data row by row and adds the read row to a proper segment using {@link BaseAppenderatorDriver}. * If there is no segment for the row, a new one is created. Segments can be published in the middle of reading inputs - * if one of below conditions are satisfied. + * if {@link DynamicPartitionsSpec} is used and one of below conditions are satisfied. * *
    *
  • - * If the number of rows in a segment exceeds {@link IndexTuningConfig#maxRowsPerSegment} + * If the number of rows in a segment exceeds {@link DynamicPartitionsSpec#maxRowsPerSegment} *
  • *
  • - * If the number of rows added to {@link BaseAppenderatorDriver} so far exceeds {@link IndexTuningConfig#maxTotalRows} + * If the number of rows added to {@link BaseAppenderatorDriver} so far exceeds {@link DynamicPartitionsSpec#maxTotalRows} *
  • *
*

@@ -881,10 +891,13 @@ private TaskStatus generateAndPublishSegments( final Map> allocateSpec, final FirehoseFactory firehoseFactory, final File firehoseTempDir, - @Nullable final Integer maxRowsPerSegment, - @Nullable final Long maxTotalRows + final PartitionsSpec partitionsSpec ) throws IOException, InterruptedException { + @Nullable + final DynamicPartitionsSpec dynamicPartitionsSpec = partitionsSpec instanceof DynamicPartitionsSpec + ? (DynamicPartitionsSpec) partitionsSpec + : null; final GranularitySpec granularitySpec = dataSchema.getGranularitySpec(); final FireDepartment fireDepartmentForMetrics = new FireDepartment(dataSchema, new RealtimeIOConfig(null, null), null); @@ -899,10 +912,8 @@ private TaskStatus generateAndPublishSegments( toolbox.getMonitorScheduler().addMonitor(metricsMonitor); } - final IndexIOConfig ioConfig = ingestionSchema.getIOConfig(); final IndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig(); final long pushTimeout = tuningConfig.getPushTimeout(); - final boolean isGuaranteedRollup = isGuaranteedRollup(ioConfig, tuningConfig); final IndexTaskSegmentAllocator segmentAllocator = createSegmentAllocator( toolbox, @@ -956,13 +967,20 @@ private TaskStatus generateAndPublishSegments( final AppenderatorDriverAddResult addResult = driver.add(inputRow, sequenceName); if (addResult.isOk()) { + // incremental segment publishment is allowed only when rollup don't have to be perfect. - if (!isGuaranteedRollup && addResult.isPushRequired(maxRowsPerSegment, maxTotalRows)) { - // There can be some segments waiting for being published even though any rows won't be added to them. - // If those segments are not published here, the available space in appenderator will be kept to be small - // which makes the size of segments smaller. - final SegmentsAndMetadata pushed = driver.pushAllAndClear(pushTimeout); - log.info("Pushed segments[%s]", pushed.getSegments()); + if (dynamicPartitionsSpec != null) { + final boolean isPushRequired = addResult.isPushRequired( + dynamicPartitionsSpec.getMaxRowsPerSegment(), + dynamicPartitionsSpec.getMaxTotalRows() + ); + if (isPushRequired) { + // There can be some segments waiting for being published even though any rows won't be added to them. + // If those segments are not published here, the available space in appenderator will be kept to be + // small which makes the size of segments smaller. + final SegmentsAndMetadata pushed = driver.pushAllAndClear(pushTimeout); + log.info("Pushed segments[%s]", pushed.getSegments()); + } } } else { throw new ISE("Failed to add a row with timestamp[%s]", inputRow.getTimestamp()); @@ -1017,44 +1035,6 @@ private TaskStatus generateAndPublishSegments( } } - /** - * Return the valid target partition size. If {@link IndexTuningConfig#numShards} is valid, this returns null. - * Otherwise, this returns {@link IndexTuningConfig#DEFAULT_MAX_ROWS_PER_SEGMENT} or the given - * {@link IndexTuningConfig#maxRowsPerSegment}. - */ - public static Integer getValidMaxRowsPerSegment(IndexTuningConfig tuningConfig) - { - @Nullable - final Integer numShards = tuningConfig.numShards; - @Nullable - final Integer maxRowsPerSegment = tuningConfig.maxRowsPerSegment; - if (numShards == null || numShards == -1) { - return maxRowsPerSegment == null || maxRowsPerSegment.equals(-1) - ? IndexTuningConfig.DEFAULT_MAX_ROWS_PER_SEGMENT - : maxRowsPerSegment; - } else { - return null; - } - } - - /** - * Return the valid target partition size. If {@link IndexTuningConfig#numShards} is valid, this returns null. - * Otherwise, this returns {@link IndexTuningConfig#DEFAULT_MAX_TOTAL_ROWS} or the given - * {@link IndexTuningConfig#maxTotalRows}. - */ - public static Long getValidMaxTotalRows(IndexTuningConfig tuningConfig) - { - @Nullable - final Integer numShards = tuningConfig.numShards; - @Nullable - final Long maxTotalRows = tuningConfig.maxTotalRows; - if (numShards == null || numShards == -1) { - return maxTotalRows == null ? IndexTuningConfig.DEFAULT_MAX_TOTAL_ROWS : maxTotalRows; - } else { - return null; - } - } - private void handleParseException(ParseException e) { if (e.isFromPartiallyValidRow()) { @@ -1244,24 +1224,18 @@ public boolean isAppendToExisting() @JsonTypeName("index") public static class IndexTuningConfig implements TuningConfig, AppenderatorConfig { - static final int DEFAULT_MAX_ROWS_PER_SEGMENT = 5_000_000; - static final int DEFAULT_MAX_TOTAL_ROWS = 20_000_000; - private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec(); private static final int DEFAULT_MAX_PENDING_PERSISTS = 0; private static final boolean DEFAULT_GUARANTEE_ROLLUP = false; private static final boolean DEFAULT_REPORT_PARSE_EXCEPTIONS = false; private static final long DEFAULT_PUSH_TIMEOUT = 0; - @Nullable - private final Integer maxRowsPerSegment; private final int maxRowsInMemory; private final long maxBytesInMemory; + + // null if all partitionsSpec related params are null. see getDefaultPartitionsSpec() for details. @Nullable - private final Long maxTotalRows; - @Nullable - private final Integer numShards; - private final List partitionDimensions; + private final PartitionsSpec partitionsSpec; private final IndexSpec indexSpec; private final IndexSpec indexSpecForIntermediatePersists; private final File basePersistDirectory; @@ -1288,21 +1262,60 @@ public static IndexTuningConfig createDefault() return new IndexTuningConfig(); } + @Nullable + private static PartitionsSpec getDefaultPartitionsSpec( + boolean forceGuaranteedRollup, + @Nullable PartitionsSpec partitionsSpec, + @Nullable Integer maxRowsPerSegment, + @Nullable Long maxTotalRows, + @Nullable Integer numShards, + @Nullable List partitionDimensions + ) + { + if (partitionsSpec == null) { + if (forceGuaranteedRollup) { + if (maxRowsPerSegment != null + || numShards != null + || (partitionDimensions != null && !partitionDimensions.isEmpty())) { + return new HashedPartitionsSpec(maxRowsPerSegment, numShards, partitionDimensions); + } else { + return null; + } + } else { + if (maxRowsPerSegment != null || maxTotalRows != null) { + return new DynamicPartitionsSpec(maxRowsPerSegment, maxTotalRows); + } else { + return null; + } + } + } else { + if (forceGuaranteedRollup) { + if (!(partitionsSpec instanceof HashedPartitionsSpec)) { + throw new ISE("HashedPartitionsSpec must be used for perfect rollup"); + } + } else { + if (!(partitionsSpec instanceof DynamicPartitionsSpec)) { + throw new ISE("DynamicPartitionsSpec must be used for best-effort rollup"); + } + } + return partitionsSpec; + } + } + @JsonCreator public IndexTuningConfig( @JsonProperty("targetPartitionSize") @Deprecated @Nullable Integer targetPartitionSize, - @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment, + @JsonProperty("maxRowsPerSegment") @Deprecated @Nullable Integer maxRowsPerSegment, @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory, @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory, - @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows, + @JsonProperty("maxTotalRows") @Deprecated @Nullable Long maxTotalRows, @JsonProperty("rowFlushBoundary") @Deprecated @Nullable Integer rowFlushBoundary_forBackCompatibility, - @JsonProperty("numShards") @Nullable Integer numShards, - @JsonProperty("partitionDimensions") @Nullable List partitionDimensions, + @JsonProperty("numShards") @Deprecated @Nullable Integer numShards, + @JsonProperty("partitionDimensions") @Deprecated @Nullable List partitionDimensions, + @JsonProperty("partitionsSpec") @Nullable PartitionsSpec partitionsSpec, @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec, @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists, - // This parameter is left for compatibility when reading existing JSONs, to be removed in Druid 0.12. - @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly, @JsonProperty("forceGuaranteedRollup") @Nullable Boolean forceGuaranteedRollup, @JsonProperty("reportParseExceptions") @Deprecated @Nullable Boolean reportParseExceptions, @JsonProperty("publishTimeout") @Deprecated @Nullable Long publishTimeout, @@ -1315,12 +1328,16 @@ public IndexTuningConfig( ) { this( - maxRowsPerSegment == null ? targetPartitionSize : maxRowsPerSegment, maxRowsInMemory != null ? maxRowsInMemory : rowFlushBoundary_forBackCompatibility, maxBytesInMemory != null ? maxBytesInMemory : 0, - maxTotalRows, - numShards, - partitionDimensions, + getDefaultPartitionsSpec( + forceGuaranteedRollup == null ? DEFAULT_GUARANTEE_ROLLUP : forceGuaranteedRollup, + partitionsSpec, + maxRowsPerSegment == null ? targetPartitionSize : maxRowsPerSegment, + maxTotalRows, + numShards, + partitionDimensions + ), indexSpec, indexSpecForIntermediatePersists, maxPendingPersists, @@ -1342,16 +1359,13 @@ public IndexTuningConfig( private IndexTuningConfig() { - this(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null); + this(null, null, null, null, null, null, null, null, null, null, null, null, null, null); } private IndexTuningConfig( - @Nullable Integer maxRowsPerSegment, @Nullable Integer maxRowsInMemory, @Nullable Long maxBytesInMemory, - @Nullable Long maxTotalRows, - @Nullable Integer numShards, - @Nullable List partitionDimensions, + @Nullable PartitionsSpec partitionsSpec, @Nullable IndexSpec indexSpec, @Nullable IndexSpec indexSpecForIntermediatePersists, @Nullable Integer maxPendingPersists, @@ -1365,21 +1379,11 @@ private IndexTuningConfig( @Nullable Integer maxSavedParseExceptions ) { - Preconditions.checkArgument( - maxRowsPerSegment == null || maxRowsPerSegment.equals(-1) || numShards == null || numShards.equals(-1), - "maxRowsPerSegment and numShards cannot both be set" - ); - - this.maxRowsPerSegment = (maxRowsPerSegment != null && maxRowsPerSegment == -1) - ? null - : maxRowsPerSegment; this.maxRowsInMemory = maxRowsInMemory == null ? TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory; // initializing this to 0, it will be lazily initialized to a value // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long) this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory; - this.maxTotalRows = maxTotalRows; - this.numShards = numShards == null || numShards.equals(-1) ? null : numShards; - this.partitionDimensions = partitionDimensions == null ? Collections.emptyList() : partitionDimensions; + this.partitionsSpec = partitionsSpec; this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec; this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ? this.indexSpec : indexSpecForIntermediatePersists; @@ -1412,12 +1416,9 @@ private IndexTuningConfig( public IndexTuningConfig withBasePersistDirectory(File dir) { return new IndexTuningConfig( - maxRowsPerSegment, maxRowsInMemory, maxBytesInMemory, - maxTotalRows, - numShards, - partitionDimensions, + partitionsSpec, indexSpec, indexSpecForIntermediatePersists, maxPendingPersists, @@ -1432,38 +1433,12 @@ public IndexTuningConfig withBasePersistDirectory(File dir) ); } - public IndexTuningConfig withMaxTotalRows(Long maxTotalRows) + public IndexTuningConfig withPartitionsSpec(PartitionsSpec partitionsSpec) { return new IndexTuningConfig( - maxRowsPerSegment, maxRowsInMemory, maxBytesInMemory, - maxTotalRows, - numShards, - partitionDimensions, - indexSpec, - indexSpecForIntermediatePersists, - maxPendingPersists, - forceGuaranteedRollup, - reportParseExceptions, - pushTimeout, - basePersistDirectory, - segmentWriteOutMediumFactory, - logParseExceptions, - maxParseExceptions, - maxSavedParseExceptions - ); - } - - public IndexTuningConfig withMaxRowsPerSegment(int maxRowsPerSegment) - { - return new IndexTuningConfig( - maxRowsPerSegment, - maxRowsInMemory, - maxBytesInMemory, - maxTotalRows, - numShards, - partitionDimensions, + partitionsSpec, indexSpec, indexSpecForIntermediatePersists, maxPendingPersists, @@ -1480,14 +1455,14 @@ public IndexTuningConfig withMaxRowsPerSegment(int maxRowsPerSegment) /** * Return the max number of rows per segment. This returns null if it's not specified in tuningConfig. - * Please use {@link IndexTask#getValidMaxRowsPerSegment} instead to get the valid value. + * Deprecated in favor of {@link #getGivenOrDefaultPartitionsSpec()}. */ @Nullable - @JsonProperty @Override + @Deprecated public Integer getMaxRowsPerSegment() { - return maxRowsPerSegment; + return partitionsSpec == null ? null : partitionsSpec.getMaxRowsPerSegment(); } @JsonProperty @@ -1506,26 +1481,50 @@ public long getMaxBytesInMemory() /** * Return the max number of total rows in appenderator. This returns null if it's not specified in tuningConfig. - * Please use {@link IndexTask#getValidMaxTotalRows} instead to get the valid value. + * Deprecated in favor of {@link #getGivenOrDefaultPartitionsSpec()}. */ - @JsonProperty @Override @Nullable + @Deprecated public Long getMaxTotalRows() { - return maxTotalRows; + return partitionsSpec instanceof DynamicPartitionsSpec + ? ((DynamicPartitionsSpec) partitionsSpec).getMaxTotalRows() + : null; } - @JsonProperty + @Deprecated + @Nullable public Integer getNumShards() { - return numShards; + return partitionsSpec instanceof HashedPartitionsSpec + ? ((HashedPartitionsSpec) partitionsSpec).getNumShards() + : null; } - @JsonProperty + @Deprecated public List getPartitionDimensions() { - return partitionDimensions; + return partitionsSpec instanceof HashedPartitionsSpec + ? ((HashedPartitionsSpec) partitionsSpec).getPartitionDimensions() + : Collections.emptyList(); + } + + @JsonProperty + @Nullable + public PartitionsSpec getPartitionsSpec() + { + return partitionsSpec; + } + + public PartitionsSpec getGivenOrDefaultPartitionsSpec() + { + if (partitionsSpec != null) { + return partitionsSpec; + } + return forceGuaranteedRollup + ? new HashedPartitionsSpec(null, null, null) + : new DynamicPartitionsSpec(null, null); } @JsonProperty @@ -1635,10 +1634,7 @@ public boolean equals(Object o) logParseExceptions == that.logParseExceptions && maxParseExceptions == that.maxParseExceptions && maxSavedParseExceptions == that.maxSavedParseExceptions && - Objects.equals(maxRowsPerSegment, that.maxRowsPerSegment) && - Objects.equals(maxTotalRows, that.maxTotalRows) && - Objects.equals(numShards, that.numShards) && - Objects.equals(partitionDimensions, that.partitionDimensions) && + Objects.equals(partitionsSpec, that.partitionsSpec) && Objects.equals(indexSpec, that.indexSpec) && Objects.equals(indexSpecForIntermediatePersists, that.indexSpecForIntermediatePersists) && Objects.equals(basePersistDirectory, that.basePersistDirectory) && @@ -1649,12 +1645,9 @@ public boolean equals(Object o) public int hashCode() { return Objects.hash( - maxRowsPerSegment, maxRowsInMemory, maxBytesInMemory, - maxTotalRows, - numShards, - partitionDimensions, + partitionsSpec, indexSpec, indexSpecForIntermediatePersists, basePersistDirectory, @@ -1673,12 +1666,9 @@ public int hashCode() public String toString() { return "IndexTuningConfig{" + - "maxRowsPerSegment=" + maxRowsPerSegment + - ", maxRowsInMemory=" + maxRowsInMemory + + "maxRowsInMemory=" + maxRowsInMemory + ", maxBytesInMemory=" + maxBytesInMemory + - ", maxTotalRows=" + maxTotalRows + - ", numShards=" + numShards + - ", partitionDimensions=" + partitionDimensions + + ", partitionsSpec=" + partitionsSpec + ", indexSpec=" + indexSpec + ", indexSpecForIntermediatePersists=" + indexSpecForIntermediatePersists + ", basePersistDirectory=" + basePersistDirectory + diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java index ae401539c95f..644b1c1d8fc8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java @@ -30,6 +30,7 @@ import org.apache.druid.data.input.FirehoseFactory; import org.apache.druid.data.input.InputRow; import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator; import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; import org.apache.druid.indexing.common.LockGranularity; @@ -49,7 +50,6 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.parsers.ParseException; @@ -58,7 +58,6 @@ import org.apache.druid.segment.indexing.RealtimeIOConfig; import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec; import org.apache.druid.segment.indexing.granularity.GranularitySpec; -import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.realtime.FireDepartment; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.segment.realtime.RealtimeMetricsMonitor; @@ -369,10 +368,10 @@ private SegmentAllocator createSegmentAllocator() * *

    *
  • - * If the number of rows in a segment exceeds {@link ParallelIndexTuningConfig#maxRowsPerSegment} + * If the number of rows in a segment exceeds {@link DynamicPartitionsSpec#maxRowsPerSegment} *
  • *
  • - * If the number of rows added to {@link BaseAppenderatorDriver} so far exceeds {@link ParallelIndexTuningConfig#maxTotalRows} + * If the number of rows added to {@link BaseAppenderatorDriver} so far exceeds {@link DynamicPartitionsSpec#maxTotalRows} *
  • *
* @@ -404,8 +403,7 @@ private Set generateAndPushSegments( // Initialize maxRowsPerSegment and maxTotalRows lazily final ParallelIndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig(); - @Nullable final Integer maxRowsPerSegment = IndexTask.getValidMaxRowsPerSegment(tuningConfig); - @Nullable final Long maxTotalRows = IndexTask.getValidMaxTotalRows(tuningConfig); + final DynamicPartitionsSpec partitionsSpec = (DynamicPartitionsSpec) tuningConfig.getGivenOrDefaultPartitionsSpec(); final long pushTimeout = tuningConfig.getPushTimeout(); final boolean explicitIntervals = granularitySpec.bucketIntervals().isPresent(); final SegmentAllocator segmentAllocator = createSegmentAllocator(toolbox, taskClient); @@ -451,7 +449,11 @@ private Set generateAndPushSegments( final AppenderatorDriverAddResult addResult = driver.add(inputRow, sequenceName); if (addResult.isOk()) { - if (addResult.isPushRequired(maxRowsPerSegment, maxTotalRows)) { + final boolean isPushRequired = addResult.isPushRequired( + partitionsSpec.getMaxRowsPerSegment(), + partitionsSpec.getMaxTotalRows() + ); + if (isPushRequired) { // There can be some segments waiting for being published even though any rows won't be added to them. // If those segments are not published here, the available space in appenderator will be kept to be small // which makes the size of segments smaller. @@ -485,16 +487,6 @@ private Set generateAndPushSegments( } } - - private static Granularity findSegmentGranularity(GranularitySpec granularitySpec) - { - if (granularitySpec instanceof UniformGranularitySpec) { - return granularitySpec.getSegmentGranularity(); - } else { - return Granularities.ALL; - } - } - private Appenderator newAppenderator( FireDepartmentMetrics metrics, TaskToolbox toolbox, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 1f1b571f4c00..535b0469b4f2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -383,17 +383,17 @@ private static IndexTuningConfig convertToIndexTuningConfig(ParallelIndexTuningC { return new IndexTuningConfig( null, - tuningConfig.getMaxRowsPerSegment(), + null, tuningConfig.getMaxRowsInMemory(), tuningConfig.getMaxBytesInMemory(), - tuningConfig.getMaxTotalRows(), null, - tuningConfig.getNumShards(), null, + null, + null, + tuningConfig.getPartitionsSpec(), tuningConfig.getIndexSpec(), tuningConfig.getIndexSpecForIntermediatePersists(), tuningConfig.getMaxPendingPersists(), - true, false, tuningConfig.isReportParseExceptions(), null, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java index 9c480d26550e..86f1e3fdcff4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java @@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; @@ -72,6 +74,7 @@ public static ParallelIndexTuningConfig defaultConfig() null, null, null, + null, null ); } @@ -79,11 +82,12 @@ public static ParallelIndexTuningConfig defaultConfig() @JsonCreator public ParallelIndexTuningConfig( @JsonProperty("targetPartitionSize") @Deprecated @Nullable Integer targetPartitionSize, - @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment, + @JsonProperty("maxRowsPerSegment") @Deprecated @Nullable Integer maxRowsPerSegment, @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory, @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory, - @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows, + @JsonProperty("maxTotalRows") @Deprecated @Nullable Long maxTotalRows, @JsonProperty("numShards") @Nullable Integer numShards, + @JsonProperty("partitionsSpec") @Nullable PartitionsSpec partitionsSpec, @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec, @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists, @@ -110,10 +114,10 @@ public ParallelIndexTuningConfig( null, numShards, null, + getValidPartitionsSpec(maxRowsPerSegment, maxTotalRows, partitionsSpec), indexSpec, indexSpecForIntermediatePersists, maxPendingPersists, - null, forceGuaranteedRollup, reportParseExceptions, null, @@ -138,6 +142,22 @@ public ParallelIndexTuningConfig( Preconditions.checkArgument(this.maxNumSubTasks > 0, "maxNumSubTasks must be positive"); } + private static PartitionsSpec getValidPartitionsSpec( + @Nullable Integer maxRowsPerSegment, + @Nullable Long maxTotalRows, + @Nullable PartitionsSpec partitionsSpec + ) + { + if (partitionsSpec != null) { + if (!(partitionsSpec instanceof DynamicPartitionsSpec)) { + throw new UnsupportedOperationException("Parallel index task supports only dynamic partitionsSpec yet"); + } + return partitionsSpec; + } else { + return new DynamicPartitionsSpec(maxRowsPerSegment, maxTotalRows); + } + } + @JsonProperty public int getMaxNumSubTasks() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 356019274b45..ec7f8dc32bd7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -636,7 +636,11 @@ public void run() if (addResult.isOk()) { // If the number of rows in the segment exceeds the threshold after adding a row, // move the segment out from the active segments of BaseAppenderatorDriver to make a new segment. - if (addResult.isPushRequired(tuningConfig) && !sequenceToUse.isCheckpointed()) { + final boolean isPushRequired = addResult.isPushRequired( + tuningConfig.getPartitionsSpec().getMaxRowsPerSegment(), + tuningConfig.getPartitionsSpec().getMaxTotalRows() + ); + if (isPushRequired && !sequenceToUse.isCheckpointed()) { sequenceToCheckpoint = sequenceToUse; } isPersistRequired |= addResult.isPersistRequired(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java index 1d48ad9c8c80..bcd4e312f682 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java @@ -20,6 +20,7 @@ package org.apache.druid.indexing.seekablestream; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.indexing.RealtimeTuningConfig; import org.apache.druid.segment.indexing.TuningConfig; @@ -33,15 +34,12 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfig, AppenderatorConfig { - private static final int DEFAULT_MAX_ROWS_PER_SEGMENT = 5_000_000; private static final boolean DEFAULT_RESET_OFFSET_AUTOMATICALLY = false; private static final boolean DEFAULT_SKIP_SEQUENCE_NUMBER_AVAILABILITY_CHECK = false; private final int maxRowsInMemory; private final long maxBytesInMemory; - private final int maxRowsPerSegment; - @Nullable - private final Long maxTotalRows; + private final DynamicPartitionsSpec partitionsSpec; private final Period intermediatePersistPeriod; private final File basePersistDirectory; @Deprecated @@ -87,8 +85,7 @@ public SeekableStreamIndexTaskTuningConfig( final RealtimeTuningConfig defaults = RealtimeTuningConfig.makeDefaultTuningConfig(basePersistDirectory); this.maxRowsInMemory = maxRowsInMemory == null ? defaults.getMaxRowsInMemory() : maxRowsInMemory; - this.maxRowsPerSegment = maxRowsPerSegment == null ? DEFAULT_MAX_ROWS_PER_SEGMENT : maxRowsPerSegment; - this.maxTotalRows = maxTotalRows; + this.partitionsSpec = new DynamicPartitionsSpec(maxRowsPerSegment, maxTotalRows); // initializing this to 0, it will be lazily initialized to a value // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long) this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory; @@ -151,7 +148,7 @@ public long getMaxBytesInMemory() @JsonProperty public Integer getMaxRowsPerSegment() { - return maxRowsPerSegment; + return partitionsSpec.getMaxRowsPerSegment(); } @JsonProperty @@ -159,7 +156,12 @@ public Integer getMaxRowsPerSegment() @Nullable public Long getMaxTotalRows() { - return maxTotalRows; + return partitionsSpec.getMaxTotalRows(); + } + + public DynamicPartitionsSpec getPartitionsSpec() + { + return partitionsSpec; } @Override @@ -279,7 +281,6 @@ public boolean equals(Object o) SeekableStreamIndexTaskTuningConfig that = (SeekableStreamIndexTaskTuningConfig) o; return maxRowsInMemory == that.maxRowsInMemory && maxBytesInMemory == that.maxBytesInMemory && - maxRowsPerSegment == that.maxRowsPerSegment && maxPendingPersists == that.maxPendingPersists && reportParseExceptions == that.reportParseExceptions && handoffConditionTimeout == that.handoffConditionTimeout && @@ -288,7 +289,7 @@ public boolean equals(Object o) logParseExceptions == that.logParseExceptions && maxParseExceptions == that.maxParseExceptions && maxSavedParseExceptions == that.maxSavedParseExceptions && - Objects.equals(maxTotalRows, that.maxTotalRows) && + Objects.equals(partitionsSpec, that.partitionsSpec) && Objects.equals(intermediatePersistPeriod, that.intermediatePersistPeriod) && Objects.equals(basePersistDirectory, that.basePersistDirectory) && Objects.equals(indexSpec, that.indexSpec) && @@ -303,8 +304,7 @@ public int hashCode() return Objects.hash( maxRowsInMemory, maxBytesInMemory, - maxRowsPerSegment, - maxTotalRows, + partitionsSpec, intermediatePersistPeriod, basePersistDirectory, maxPendingPersists, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index 80eb89345aac..b76c99840500 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -42,6 +42,7 @@ import org.apache.druid.guice.GuiceAnnotationIntrospector; import org.apache.druid.guice.GuiceInjectableValues; import org.apache.druid.guice.GuiceInjectors; +import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.indexing.common.RetryPolicyConfig; import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.SegmentLoaderFactory; @@ -285,6 +286,7 @@ private static IndexTuningConfig createTuningConfig() null, null, null, + null, new IndexSpec( new RoaringBitmapSerdeFactory(true), CompressionStrategy.LZ4, @@ -294,7 +296,6 @@ private static IndexTuningConfig createTuningConfig() null, 5000, true, - true, false, null, 100L, @@ -456,13 +457,14 @@ public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOExceptio { final IndexTuningConfig tuningConfig = new IndexTuningConfig( null, - 6, + null, 500000, 1000000L, null, null, null, null, + new HashedPartitionsSpec(6, null, null), new IndexSpec( new RoaringBitmapSerdeFactory(true), CompressionStrategy.LZ4, @@ -472,7 +474,6 @@ public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOExceptio null, 5000, true, - true, false, null, 100L, @@ -520,10 +521,11 @@ public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException, Segm null, 500000, 1000000L, - 6L, null, null, null, + null, + new HashedPartitionsSpec(null, 6, null), new IndexSpec( new RoaringBitmapSerdeFactory(true), CompressionStrategy.LZ4, @@ -533,7 +535,6 @@ public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException, Segm null, 5000, true, - true, false, null, 100L, @@ -583,8 +584,9 @@ public void testCreateIngestionSchemaWithNumShards() throws IOException, Segment 1000000L, null, null, - 3, null, + null, + new HashedPartitionsSpec(null, 3, null), new IndexSpec( new RoaringBitmapSerdeFactory(true), CompressionStrategy.LZ4, @@ -594,7 +596,6 @@ public void testCreateIngestionSchemaWithNumShards() throws IOException, Segment null, 5000, true, - true, false, null, 100L, @@ -844,13 +845,14 @@ public void testTargetPartitionSizeWithPartitionConfig() throws IOException, Seg { final IndexTuningConfig tuningConfig = new IndexTuningConfig( null, - 6, + null, 500000, 1000000L, null, null, null, null, + new HashedPartitionsSpec(6, null, null), new IndexSpec( new RoaringBitmapSerdeFactory(true), CompressionStrategy.LZ4, @@ -860,7 +862,6 @@ public void testTargetPartitionSizeWithPartitionConfig() throws IOException, Seg null, 5000, true, - true, false, null, 100L, @@ -1033,13 +1034,14 @@ private void assertIngestionSchema( expectedSegmentIntervals, new IndexTuningConfig( null, - 41943040, // automatically computed targetPartitionSize + null, 500000, 1000000L, Long.MAX_VALUE, null, null, null, + new HashedPartitionsSpec(41943040, null, null), // automatically computed targetPartitionSize new IndexSpec( new RoaringBitmapSerdeFactory(true), CompressionStrategy.LZ4, @@ -1049,7 +1051,6 @@ private void assertIngestionSchema( null, 5000, true, - true, false, null, 100L, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java index d3150aeb5249..337da8c8e583 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java @@ -37,6 +37,8 @@ import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.TaskReport; @@ -987,18 +989,18 @@ public void testMultipleParseExceptionsSuccess() throws Exception final IndexTask.IndexTuningConfig tuningConfig = new IndexTask.IndexTuningConfig( null, - 2, null, null, null, null, null, null, + null, + new HashedPartitionsSpec(2, null, null), indexSpec, null, null, true, - true, false, null, null, @@ -1112,17 +1114,17 @@ public void testMultipleParseExceptionsFailure() throws Exception // Allow up to 3 parse exceptions, and save up to 2 parse exceptions final IndexTask.IndexTuningConfig tuningConfig = new IndexTask.IndexTuningConfig( null, - 2, null, null, null, null, null, null, + null, + new DynamicPartitionsSpec(2, null), indexSpec, null, null, - true, false, false, null, @@ -1230,18 +1232,18 @@ public void testMultipleParseExceptionsFailureAtDeterminePartitions() throws Exc // Allow up to 3 parse exceptions, and save up to 2 parse exceptions final IndexTask.IndexTuningConfig tuningConfig = new IndexTask.IndexTuningConfig( null, - 2, null, null, null, null, null, null, + null, + new HashedPartitionsSpec(2, null, null), indexSpec, null, null, true, - true, false, null, null, @@ -1666,10 +1668,10 @@ static IndexTuningConfig createTuningConfig( null, numShards, partitionDimensions, + null, indexSpec, null, null, - true, forceGuaranteedRollup, reportParseException, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java index 189b4df66a95..7a52bdc81e63 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java @@ -27,6 +27,7 @@ import org.apache.druid.guice.FirehoseModule; import org.apache.druid.indexer.HadoopIOConfig; import org.apache.druid.indexer.HadoopIngestionSpec; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig; @@ -129,7 +130,7 @@ public void testIndexTaskTuningConfigTargetPartitionSizeOrNumShards() throws Exc Assert.assertNull(tuningConfig.getNumShards()); tuningConfig = jsonMapper.readValue( - "{\"type\":\"index\", \"numShards\":10}", + "{\"type\":\"index\", \"numShards\":10, \"forceGuaranteedRollup\": true}", IndexTask.IndexTuningConfig.class ); @@ -137,7 +138,7 @@ public void testIndexTaskTuningConfigTargetPartitionSizeOrNumShards() throws Exc Assert.assertEquals(10, (int) tuningConfig.getNumShards()); tuningConfig = jsonMapper.readValue( - "{\"type\":\"index\", \"targetPartitionSize\":-1, \"numShards\":10}", + "{\"type\":\"index\", \"targetPartitionSize\":-1, \"numShards\":10, \"forceGuaranteedRollup\": true}", IndexTask.IndexTuningConfig.class ); @@ -153,7 +154,7 @@ public void testIndexTaskTuningConfigTargetPartitionSizeOrNumShards() throws Exc Assert.assertEquals(10, (int) tuningConfig.getMaxRowsPerSegment()); tuningConfig = jsonMapper.readValue( - "{\"type\":\"index\", \"targetPartitionSize\":-1, \"numShards\":-1}", + "{\"type\":\"index\", \"targetPartitionSize\":-1, \"numShards\":-1, \"forceGuaranteedRollup\": true}", IndexTask.IndexTuningConfig.class ); @@ -167,7 +168,7 @@ public void testIndexTaskTuningConfigTargetPartitionSizeAndNumShards() throws Ex thrown.expectCause(CoreMatchers.isA(IllegalArgumentException.class)); jsonMapper.readValue( - "{\"type\":\"index\", \"targetPartitionSize\":10, \"numShards\":10}", + "{\"type\":\"index\", \"targetPartitionSize\":10, \"numShards\":10, \"forceGuaranteedRollup\": true}", IndexTask.IndexTuningConfig.class ); } @@ -194,17 +195,17 @@ public void testIndexTaskSerde() throws Exception new IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true), new IndexTuningConfig( null, - 10000, + null, 10, null, null, 9999, null, null, + new DynamicPartitionsSpec(10000, null), indexSpec, null, 3, - true, false, null, null, @@ -278,17 +279,17 @@ public void testIndexTaskwithResourceSerde() throws Exception new IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true), new IndexTuningConfig( null, - 10000, + null, 10, null, null, null, null, null, + new DynamicPartitionsSpec(10000, null), indexSpec, null, 3, - true, false, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java index 2d0678a348d9..7d90a062a4af 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java @@ -190,6 +190,7 @@ private ParallelIndexSupervisorTask newTask( null, null, null, + null, numTotalSubTasks, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java index d62512e49b52..f2b151e2521a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java @@ -430,6 +430,7 @@ private TestSupervisorTask newTask( null, null, null, + null, NUM_SUB_TASKS, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java index ccd3bd03517b..2f1c24f5e424 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java @@ -140,6 +140,7 @@ private ParallelIndexSupervisorTask newTask( null, null, null, + null, 2, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java index 14ddf4b19df1..7ff4bacead9c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java @@ -299,6 +299,7 @@ public void testWith1MaxNumSubTasks() throws Exception null, null, null, + null, 1, null, null, @@ -367,6 +368,7 @@ private ParallelIndexSupervisorTask newTask( null, null, null, + null, 2, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java index 2d393020b3a2..d96cf544e497 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.jsontype.NamedType; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy; @@ -60,11 +61,12 @@ public void testSerdeWithMaxRowsPerSegment() { final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig( null, - 100, + null, 10, 1000L, - 100L, null, + null, + new DynamicPartitionsSpec(100, 100L), new IndexSpec( new RoaringBitmapSerdeFactory(true), CompressionStrategy.UNCOMPRESSED, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index cb320e8bcfb6..8db01cbab5f7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -697,10 +697,10 @@ public void testIndexTask() throws Exception null, null, null, + null, indexSpec, null, 3, - true, false, null, null, @@ -780,10 +780,10 @@ public void testIndexTaskFailure() throws Exception null, null, null, + null, indexSpec, null, 3, - true, false, null, null, @@ -1176,10 +1176,10 @@ public void testResumeTasks() throws Exception null, null, null, + null, indexSpec, null, null, - false, null, null, null, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java index 219e5530ff14..62ee3228b359 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java @@ -100,11 +100,6 @@ public boolean isPersistRequired() return isPersistRequired; } - public boolean isPushRequired(AppenderatorConfig tuningConfig) - { - return isPushRequired(tuningConfig.getMaxRowsPerSegment(), tuningConfig.getMaxTotalRows()); - } - public boolean isPushRequired(@Nullable Integer maxRowsPerSegment, @Nullable Long maxTotalRows) { boolean overThreshold = false;