diff --git a/core/src/main/java/org/apache/druid/data/input/FiniteFirehoseFactory.java b/core/src/main/java/org/apache/druid/data/input/FiniteFirehoseFactory.java index 42a16837b97a..09f943bf3674 100644 --- a/core/src/main/java/org/apache/druid/data/input/FiniteFirehoseFactory.java +++ b/core/src/main/java/org/apache/druid/data/input/FiniteFirehoseFactory.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import org.apache.druid.data.input.impl.InputRowParser; +import javax.annotation.Nullable; import java.io.IOException; import java.util.stream.Stream; @@ -52,13 +53,13 @@ default boolean isSplittable() * lazily so that the listing overhead could be amortized. */ @JsonIgnore - Stream> getSplits() throws IOException; + Stream> getSplits(@Nullable SplitHintSpec splitHintSpec) throws IOException; /** - * Returns number of splits returned by {@link #getSplits()}. + * Returns number of splits returned by {@link #getSplits}. */ @JsonIgnore - int getNumSplits() throws IOException; + int getNumSplits(@Nullable SplitHintSpec splitHintSpec) throws IOException; /** * Returns the same {@link FiniteFirehoseFactory} but with the given {@link InputSplit}. The returned diff --git a/core/src/main/java/org/apache/druid/data/input/SegmentsSplitHintSpec.java b/core/src/main/java/org/apache/druid/data/input/SegmentsSplitHintSpec.java new file mode 100644 index 000000000000..6cca8ab71915 --- /dev/null +++ b/core/src/main/java/org/apache/druid/data/input/SegmentsSplitHintSpec.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import javax.annotation.Nullable; +import java.util.Objects; + +/** + * {@link SplitHintSpec} for IngestSegmentFirehoseFactory. + */ +public class SegmentsSplitHintSpec implements SplitHintSpec +{ + public static final String TYPE = "segments"; + + private static final long DEFAULT_MAX_INPUT_SEGMENT_BYTES_PER_TASK = 500 * 1024 * 1024; + + /** + * Maximum number of bytes of input segments to process in a single task. + * If a single segment is larger than this number, it will be processed by itself in a single task. + */ + private final long maxInputSegmentBytesPerTask; + + @JsonCreator + public SegmentsSplitHintSpec( + @JsonProperty("maxInputSegmentBytesPerTask") @Nullable Long maxInputSegmentBytesPerTask + ) + { + this.maxInputSegmentBytesPerTask = maxInputSegmentBytesPerTask == null + ? DEFAULT_MAX_INPUT_SEGMENT_BYTES_PER_TASK + : maxInputSegmentBytesPerTask; + } + + @JsonProperty + public long getMaxInputSegmentBytesPerTask() + { + return maxInputSegmentBytesPerTask; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SegmentsSplitHintSpec that = (SegmentsSplitHintSpec) o; + return maxInputSegmentBytesPerTask == that.maxInputSegmentBytesPerTask; + } + + @Override + public int hashCode() + { + return Objects.hash(maxInputSegmentBytesPerTask); + } + + @Override + public String toString() + { + return "SegmentsSplitHintSpec{" + + "maxInputSegmentBytesPerTask=" + maxInputSegmentBytesPerTask + + '}'; + } +} diff --git a/core/src/main/java/org/apache/druid/data/input/SplitHintSpec.java b/core/src/main/java/org/apache/druid/data/input/SplitHintSpec.java new file mode 100644 index 000000000000..69042a74d92f --- /dev/null +++ b/core/src/main/java/org/apache/druid/data/input/SplitHintSpec.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonSubTypes.Type; +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +/** + * In native parallel indexing, the supervisor task partitions input data into splits and assigns each of them + * to a single sub task. How to create splits could mainly depend on the input file format, but sometimes druid users + * want to give some hints to control the amount of data each sub task will read. SplitHintSpec can be used for this + * purpose. Implementations can ignore the given hint. + * + * @see FiniteFirehoseFactory#getSplits(SplitHintSpec) + * @see FiniteFirehoseFactory#getNumSplits(SplitHintSpec) + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes(value = { + @Type(name = SegmentsSplitHintSpec.TYPE, value = SegmentsSplitHintSpec.class) +}) +public interface SplitHintSpec +{ +} diff --git a/core/src/main/java/org/apache/druid/data/input/impl/AbstractTextFilesFirehoseFactory.java b/core/src/main/java/org/apache/druid/data/input/impl/AbstractTextFilesFirehoseFactory.java index ea6ac62c00bb..4fe2e2884e59 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/AbstractTextFilesFirehoseFactory.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/AbstractTextFilesFirehoseFactory.java @@ -26,8 +26,10 @@ import org.apache.druid.data.input.FiniteFirehoseFactory; import org.apache.druid.data.input.Firehose; import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.java.util.common.logger.Logger; +import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -102,14 +104,14 @@ public List getObjects() } @Override - public Stream> getSplits() throws IOException + public Stream> getSplits(@Nullable SplitHintSpec splitHintSpec) throws IOException { initializeObjectsIfNeeded(); return getObjects().stream().map(InputSplit::new); } @Override - public int getNumSplits() throws IOException + public int getNumSplits(@Nullable SplitHintSpec splitHintSpec) throws IOException { initializeObjectsIfNeeded(); return getObjects().size(); diff --git a/docs/configuration/index.md b/docs/configuration/index.md index fefc9108c487..c94eee086762 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -813,9 +813,11 @@ If you see this problem, it's recommended to set `skipOffsetFromLatest` to some |`maxRowsInMemory`|See [tuningConfig for indexTask](../ingestion/native-batch.md#tuningconfig)|no (default = 1000000)| |`maxBytesInMemory`|See [tuningConfig for indexTask](../ingestion/native-batch.md#tuningconfig)|no (1/6 of max JVM memory)| |`maxTotalRows`|See [tuningConfig for indexTask](../ingestion/native-batch.md#tuningconfig)|no (default = 20000000)| +|`splitHintSpec`|See [tuningConfig for indexTask](../ingestion/native-batch.md#tuningconfig)|no (default = null)| |`indexSpec`|See [IndexSpec](../ingestion/index.md#indexspec)|no| |`maxPendingPersists`|See [tuningConfig for indexTask](../ingestion/native-batch.md#tuningconfig)|no (default = 0 (meaning one persist can be running concurrently with ingestion, and none can be queued up))| |`pushTimeout`|See [tuningConfig for indexTask](../ingestion/native-batch.md#tuningconfig)|no (default = 0)| +|`maxNumConcurrentSubTasks`|See [tuningConfig for indexTask](../ingestion/native-batch.md#tuningconfig)|no (default = 1)| ### Overlord diff --git a/docs/ingestion/data-management.md b/docs/ingestion/data-management.md index 893c186a0a28..054e9f00ae5f 100644 --- a/docs/ingestion/data-management.md +++ b/docs/ingestion/data-management.md @@ -100,9 +100,10 @@ Compaction tasks merge all segments of the given interval. The syntax is: "id": , "dataSource": , "ioConfig": , - "dimensions" , + "dimensionsSpec" , + "metricsSpec" , "segmentGranularity": , - "tuningConfig" , + "tuningConfig" , "context": } ``` @@ -116,7 +117,7 @@ Compaction tasks merge all segments of the given interval. The syntax is: |`dimensionsSpec`|Custom dimensionsSpec. Compaction task will use this dimensionsSpec if exist instead of generating one. See below for more details.|No| |`metricsSpec`|Custom metricsSpec. Compaction task will use this metricsSpec if specified rather than generating one.|No| |`segmentGranularity`|If this is set, compactionTask will change the segment granularity for the given interval. See `segmentGranularity` of [`granularitySpec`](index.md#granularityspec) for more details. See the below table for the behavior.|No| -|`tuningConfig`|[Index task tuningConfig](../ingestion/native-batch.md#tuningconfig)|No| +|`tuningConfig`|[Parallel indexing task tuningConfig](../ingestion/native-batch.md#tuningconfig)|No| |`context`|[Task context](../ingestion/tasks.md#context)|No| diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index 9da957708fc8..90cc4c75d2d0 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -204,6 +204,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |maxBytesInMemory|Used in determining when intermediate persists to disk should occur. Normally this is computed internally and user does not need to set it. This value represents number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists)|1/6 of max JVM memory|no| |maxTotalRows|Deprecated. Use `partitionsSpec` instead. Total number of rows in segments waiting for being pushed. Used in determining when intermediate pushing should occur.|20000000|no| |numShards|Deprecated. Use `partitionsSpec` instead. Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data. `numShards` cannot be specified if `maxRowsPerSegment` is set.|null|no| +|splitHintSpec|Used to give a hint to control the amount of data that each first phase task reads. This hint could be ignored depending on the implementation of firehose. See [SplitHintSpec](#splithintspec) for more details.|null|no| |partitionsSpec|Defines how to partition data in each timeChunk, see [PartitionsSpec](#partitionsspec)|`dynamic` if `forceGuaranteedRollup` = false, `hashed` if `forceGuaranteedRollup` = true|no| |indexSpec|Defines segment storage format options to be used at indexing time, see [IndexSpec](index.md#indexspec)|null|no| |indexSpecForIntermediatePersists|Defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](index.md#indexspec) for possible values.|same as indexSpec|no| @@ -212,7 +213,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |reportParseExceptions|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|false|no| |pushTimeout|Milliseconds to wait for pushing segments. It must be >= 0, where 0 means to wait forever.|0|no| |segmentWriteOutMediumFactory|Segment write-out medium to use when creating segments. See [SegmentWriteOutMediumFactory](#segmentwriteoutmediumfactory).|Not specified, the value from `druid.peon.defaultSegmentWriteOutMediumFactory.type` is used|no| -|maxNumConcurrentSubTasks|Maximum number of tasks which can be run in parallel at the same time. The supervisor task would spawn worker tasks up to `maxNumConcurrentSubTasks` regardless of the current available task slots. If this value is set to 1, the supervisor task processes data ingestion on its own instead of spawning worker tasks. If this value is set to too large, too many worker tasks can be created which might block other ingestion. Check [Capacity Planning](#capacity-planning) for more details.|1|no| +|maxNumConcurrentSubTasks|Maximum number of sub tasks which can be run in parallel at the same time. The supervisor task would spawn worker tasks up to `maxNumConcurrentSubTasks` regardless of the current available task slots. If this value is set to 1, the supervisor task processes data ingestion on its own instead of spawning worker tasks. If this value is set to too large, too many worker tasks can be created which might block other ingestion. Check [Capacity Planning](#capacity-planning) for more details.|1|no| |maxRetry|Maximum number of retries on task failures.|3|no| |maxNumSegmentsToMerge|Max limit for the number of segments that a single task can merge at the same time in the second phase. Used only `forceGuaranteedRollup` is set.|100|no| |totalNumMergeTasks|Total number of tasks to merge segments in the second phase when `forceGuaranteedRollup` is set.|10|no| @@ -220,6 +221,22 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |chatHandlerTimeout|Timeout for reporting the pushed segments in worker tasks.|PT10S|no| |chatHandlerNumRetries|Retries for reporting the pushed segments in worker tasks.|5|no| +### `splitHintSpec` + +`SplitHintSpec` is used to give a hint when the supervisor task creates input splits. +Note that each sub task processes a single input split. You can control the amount of data each sub task will read during the first phase. + +Currently only one splitHintSpec, i.e., `segments`, is available. + +#### `SegmentsSplitHintSpec` + +`SegmentsSplitHintSpec` is used only for `IngestSegmentFirehose`. + +|property|description|default|required?| +|--------|-----------|-------|---------| +|type|This should always be `segments`.|none|yes| +|maxInputSegmentBytesPerTask|Maximum number of bytes of input segments to process in a single task. If a single segment is larger than this number, it will be processed by itself in a single task (input segments are never split across tasks).|150MB|no| + ### `partitionsSpec` PartitionsSpec is to describe the secondary partitioning method. @@ -785,7 +802,7 @@ This firehose will accept any type of parser, but will only utilize the list of |dimensions|The list of dimensions to select. If left empty, no dimensions are returned. If left null or not defined, all dimensions are returned. |no| |metrics|The list of metrics to select. If left empty, no metrics are returned. If left null or not defined, all metrics are selected.|no| |filter| See [Filters](../querying/filters.md)|no| -|maxInputSegmentBytesPerTask|When used with the native parallel index task, the maximum number of bytes of input segments to process in a single task. If a single segment is larger than this number, it will be processed by itself in a single task (input segments are never split across tasks). Defaults to 150MB.|no| +|maxInputSegmentBytesPerTask|Deprecated. Use [SegmentsSplitHintSpec](#segmentssplithintspec) instead. When used with the native parallel index task, the maximum number of bytes of input segments to process in a single task. If a single segment is larger than this number, it will be processed by itself in a single task (input segments are never split across tasks). Defaults to 150MB.|no| diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/firehose/s3/StaticS3FirehoseFactoryTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/firehose/s3/StaticS3FirehoseFactoryTest.java index 3c33bb7b48d9..5f89860fbee3 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/firehose/s3/StaticS3FirehoseFactoryTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/firehose/s3/StaticS3FirehoseFactoryTest.java @@ -115,7 +115,7 @@ public void testWithSplit() throws IOException 5 ); final List> subFactories = factory - .getSplits() + .getSplits(null) .map(factory::withSplit) .sorted(Comparator.comparing(eachFactory -> { final StaticS3FirehoseFactory staticS3FirehoseFactory = (StaticS3FirehoseFactory) eachFactory; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 492c7125cd1d..115ff8103b08 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -31,6 +31,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.client.indexing.IndexingServiceClient; import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionSchema.MultiValueHandling; import org.apache.druid.data.input.impl.DimensionsSpec; @@ -52,9 +53,10 @@ import org.apache.druid.indexing.common.actions.SegmentListUsedAction; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; -import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig; -import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec; -import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig; import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.JodaUtils; @@ -129,7 +131,7 @@ public class CompactionTask extends AbstractBatchIndexTask @Nullable private final Granularity segmentGranularity; @Nullable - private final IndexTuningConfig tuningConfig; + private final ParallelIndexTuningConfig tuningConfig; private final ObjectMapper jsonMapper; @JsonIgnore private final SegmentProvider segmentProvider; @@ -148,6 +150,8 @@ public class CompactionTask extends AbstractBatchIndexTask @JsonIgnore private final CoordinatorClient coordinatorClient; + private final IndexingServiceClient indexingServiceClient; + @JsonIgnore private final SegmentLoaderFactory segmentLoaderFactory; @@ -160,14 +164,11 @@ public class CompactionTask extends AbstractBatchIndexTask @JsonIgnore private final CurrentSubTaskHolder currentSubTaskHolder = new CurrentSubTaskHolder( (taskObject, config) -> { - final IndexTask indexTask = (IndexTask) taskObject; + final ParallelIndexSupervisorTask indexTask = (ParallelIndexSupervisorTask) taskObject; indexTask.stopGracefully(config); } ); - @JsonIgnore - private List indexTaskSpecs; - @JsonCreator public CompactionTask( @JsonProperty("id") final String id, @@ -180,13 +181,14 @@ public CompactionTask( @JsonProperty("dimensionsSpec") @Nullable final DimensionsSpec dimensionsSpec, @JsonProperty("metricsSpec") @Nullable final AggregatorFactory[] metricsSpec, @JsonProperty("segmentGranularity") @Nullable final Granularity segmentGranularity, - @JsonProperty("tuningConfig") @Nullable final IndexTuningConfig tuningConfig, + @JsonProperty("tuningConfig") @Nullable final ParallelIndexTuningConfig tuningConfig, @JsonProperty("context") @Nullable final Map context, @JacksonInject ObjectMapper jsonMapper, @JacksonInject AuthorizerMapper authorizerMapper, @JacksonInject ChatHandlerProvider chatHandlerProvider, @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory, @JacksonInject CoordinatorClient coordinatorClient, + @JacksonInject @Nullable IndexingServiceClient indexingServiceClient, @JacksonInject SegmentLoaderFactory segmentLoaderFactory, @JacksonInject RetryPolicyFactory retryPolicyFactory, @JacksonInject AppenderatorsManager appenderatorsManager @@ -222,6 +224,7 @@ public CompactionTask( this.authorizerMapper = authorizerMapper; this.chatHandlerProvider = chatHandlerProvider; this.rowIngestionMetersFactory = rowIngestionMetersFactory; + this.indexingServiceClient = indexingServiceClient; this.coordinatorClient = coordinatorClient; this.segmentLoaderFactory = segmentLoaderFactory; this.retryPolicyFactory = retryPolicyFactory; @@ -258,7 +261,7 @@ public Granularity getSegmentGranularity() @Nullable @JsonProperty - public IndexTuningConfig getTuningConfig() + public ParallelIndexTuningConfig getTuningConfig() { return tuningConfig; } @@ -304,36 +307,36 @@ public boolean isPerfectRollup() @Override public TaskStatus runTask(TaskToolbox toolbox) throws Exception { - if (indexTaskSpecs == null) { - final List ingestionSpecs = createIngestionSchema( - toolbox, - segmentProvider, - partitionConfigurationManager, - dimensionsSpec, - metricsSpec, - segmentGranularity, - jsonMapper, - coordinatorClient, - segmentLoaderFactory, - retryPolicyFactory - ); - indexTaskSpecs = IntStream - .range(0, ingestionSpecs.size()) - .mapToObj(i -> new IndexTask( - createIndexTaskSpecId(i), - getGroupId(), - getTaskResource(), - getDataSource(), - ingestionSpecs.get(i), - createContextForSubtask(), - authorizerMapper, - chatHandlerProvider, - rowIngestionMetersFactory, - appenderatorsManager - - )) - .collect(Collectors.toList()); - } + final List ingestionSpecs = createIngestionSchema( + toolbox, + segmentProvider, + partitionConfigurationManager, + dimensionsSpec, + metricsSpec, + segmentGranularity, + jsonMapper, + coordinatorClient, + segmentLoaderFactory, + retryPolicyFactory + ); + final List indexTaskSpecs = IntStream + .range(0, ingestionSpecs.size()) + .mapToObj(i -> { + // taskId is used for different purposes in parallel indexing and local indexing. + // In parallel indexing, it's the taskId of the supervisor task. This supervisor taskId must be + // a valid taskId to communicate with sub tasks properly. We use the ID of the compaction task in this case. + // + // In local indexing, it's used as the sequence name for Appenderator. Even though a compaction task can run + // multiple index tasks (one per interval), the appenderator is not shared by those tasks. Each task creates + // a new Appenderator on its own instead. As a result, they should use different sequence names to allocate + // new segmentIds properly. See IndexerSQLMetadataStorageCoordinator.allocatePendingSegments() for details. + // In this case, we use different fake IDs for each created index task. + final String subtaskId = tuningConfig == null || tuningConfig.getMaxNumConcurrentSubTasks() == 1 + ? createIndexTaskSpecId(i) + : getId(); + return newTask(subtaskId, ingestionSpecs.get(i)); + }) + .collect(Collectors.toList()); if (indexTaskSpecs.isEmpty()) { log.warn("Can't find segments from inputSpec[%s], nothing to do.", ioConfig.getInputSpec()); @@ -344,7 +347,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception log.info("Generated [%d] compaction task specs", totalNumSpecs); int failCnt = 0; - for (IndexTask eachSpec : indexTaskSpecs) { + for (ParallelIndexSupervisorTask eachSpec : indexTaskSpecs) { final String json = jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(eachSpec); if (!currentSubTaskHolder.setTask(eachSpec)) { log.info("Task is asked to stop. Finish as failed."); @@ -374,10 +377,30 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception } } - private Map createContextForSubtask() + @VisibleForTesting + ParallelIndexSupervisorTask newTask(String taskId, ParallelIndexIngestionSpec ingestionSpec) + { + return new ParallelIndexSupervisorTask( + taskId, + getGroupId(), + getTaskResource(), + ingestionSpec, + createContextForSubtask(), + indexingServiceClient, + chatHandlerProvider, + authorizerMapper, + rowIngestionMetersFactory, + appenderatorsManager + ); + } + + @VisibleForTesting + Map createContextForSubtask() { final Map newContext = new HashMap<>(getContext()); newContext.put(CTX_KEY_APPENDERATOR_TRACKING_TASK_ID, getId()); + // Set the priority of the compaction task. + newContext.put(Tasks.PRIORITY_KEY, getPriority()); return newContext; } @@ -387,12 +410,12 @@ private String createIndexTaskSpecId(int i) } /** - * Generate {@link IndexIngestionSpec} from input segments. + * Generate {@link ParallelIndexIngestionSpec} from input segments. * * @return an empty list if input segments don't exist. Otherwise, a generated ingestionSpec. */ @VisibleForTesting - static List createIngestionSchema( + static List createIngestionSchema( final TaskToolbox toolbox, final SegmentProvider segmentProvider, final PartitionConfigurationManager partitionConfigurationManager, @@ -424,7 +447,7 @@ static List createIngestionSchema( toolbox.getIndexIO() ); - final IndexTuningConfig compactionTuningConfig = partitionConfigurationManager.computeTuningConfig(); + final ParallelIndexTuningConfig compactionTuningConfig = partitionConfigurationManager.computeTuningConfig(); if (segmentGranularity == null) { // original granularity @@ -437,7 +460,7 @@ static List createIngestionSchema( .add(p) ); - final List specs = new ArrayList<>(intervalToSegments.size()); + final List specs = new ArrayList<>(intervalToSegments.size()); for (Entry>> entry : intervalToSegments.entrySet()) { final Interval interval = entry.getKey(); final List> segmentsToCompact = entry.getValue(); @@ -451,7 +474,7 @@ static List createIngestionSchema( ); specs.add( - new IndexIngestionSpec( + new ParallelIndexIngestionSpec( dataSchema, createIoConfig( toolbox, @@ -479,7 +502,7 @@ static List createIngestionSchema( ); return Collections.singletonList( - new IndexIngestionSpec( + new ParallelIndexIngestionSpec( dataSchema, createIoConfig( toolbox, @@ -495,7 +518,7 @@ static List createIngestionSchema( } } - private static IndexIOConfig createIoConfig( + private static ParallelIndexIOConfig createIoConfig( TaskToolbox toolbox, DataSchema dataSchema, Interval interval, @@ -504,7 +527,7 @@ private static IndexIOConfig createIoConfig( RetryPolicyFactory retryPolicyFactory ) { - return new IndexIOConfig( + return new ParallelIndexIOConfig( new IngestSegmentFirehoseFactory( dataSchema.getDataSource(), interval, @@ -787,18 +810,18 @@ List checkAndGetSegments(TaskActionClient actionClient) throws IOEx static class PartitionConfigurationManager { @Nullable - private final IndexTuningConfig tuningConfig; + private final ParallelIndexTuningConfig tuningConfig; - PartitionConfigurationManager(@Nullable IndexTuningConfig tuningConfig) + PartitionConfigurationManager(@Nullable ParallelIndexTuningConfig tuningConfig) { this.tuningConfig = tuningConfig; } @Nullable - IndexTuningConfig computeTuningConfig() + ParallelIndexTuningConfig computeTuningConfig() { - IndexTuningConfig newTuningConfig = tuningConfig == null - ? IndexTuningConfig.createDefault() + ParallelIndexTuningConfig newTuningConfig = tuningConfig == null + ? ParallelIndexTuningConfig.defaultConfig() : tuningConfig; PartitionsSpec partitionsSpec = newTuningConfig.getGivenOrDefaultPartitionsSpec(); if (partitionsSpec instanceof DynamicPartitionsSpec) { @@ -822,6 +845,7 @@ public static class Builder private final AuthorizerMapper authorizerMapper; private final ChatHandlerProvider chatHandlerProvider; private final RowIngestionMetersFactory rowIngestionMetersFactory; + private final IndexingServiceClient indexingServiceClient; private final CoordinatorClient coordinatorClient; private final SegmentLoaderFactory segmentLoaderFactory; private final RetryPolicyFactory retryPolicyFactory; @@ -835,7 +859,7 @@ public static class Builder @Nullable private Granularity segmentGranularity; @Nullable - private IndexTuningConfig tuningConfig; + private ParallelIndexTuningConfig tuningConfig; @Nullable private Map context; @@ -845,6 +869,7 @@ public Builder( AuthorizerMapper authorizerMapper, ChatHandlerProvider chatHandlerProvider, RowIngestionMetersFactory rowIngestionMetersFactory, + IndexingServiceClient indexingServiceClient, CoordinatorClient coordinatorClient, SegmentLoaderFactory segmentLoaderFactory, RetryPolicyFactory retryPolicyFactory, @@ -856,6 +881,7 @@ public Builder( this.authorizerMapper = authorizerMapper; this.chatHandlerProvider = chatHandlerProvider; this.rowIngestionMetersFactory = rowIngestionMetersFactory; + this.indexingServiceClient = indexingServiceClient; this.coordinatorClient = coordinatorClient; this.segmentLoaderFactory = segmentLoaderFactory; this.retryPolicyFactory = retryPolicyFactory; @@ -896,7 +922,7 @@ public Builder segmentGranularity(Granularity segmentGranularity) return this; } - public Builder tuningConfig(IndexTuningConfig tuningConfig) + public Builder tuningConfig(ParallelIndexTuningConfig tuningConfig) { this.tuningConfig = tuningConfig; return this; @@ -928,6 +954,7 @@ public CompactionTask build() chatHandlerProvider, rowIngestionMetersFactory, coordinatorClient, + indexingServiceClient, segmentLoaderFactory, retryPolicyFactory, appenderatorsManager diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java index 7f5697a9b6e1..e3a7632b48a2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java @@ -28,6 +28,7 @@ import org.apache.druid.data.input.FiniteFirehoseFactory; import org.apache.druid.data.input.FirehoseFactory; import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexing.common.TaskToolbox; @@ -196,7 +197,10 @@ public TaskState run() throws Exception (SinglePhaseSubTaskSpec) taskCompleteEvent.getSpec(); LOG.error( "Failed to run sub tasks for inputSplits[%s]", - getSplitsIfSplittable(spec.getIngestionSpec().getIOConfig().getFirehoseFactory()) + getSplitsIfSplittable( + spec.getIngestionSpec().getIOConfig().getFirehoseFactory(), + tuningConfig.getSplitHintSpec() + ) ); } break; @@ -250,11 +254,14 @@ public void onFailure(Throwable t) ); } - private static List getSplitsIfSplittable(FirehoseFactory firehoseFactory) throws IOException + private static List getSplitsIfSplittable( + FirehoseFactory firehoseFactory, + @Nullable SplitHintSpec splitHintSpec + ) throws IOException { if (firehoseFactory instanceof FiniteFirehoseFactory) { final FiniteFirehoseFactory finiteFirehoseFactory = (FiniteFirehoseFactory) firehoseFactory; - return finiteFirehoseFactory.getSplits().collect(Collectors.toList()); + return finiteFirehoseFactory.getSplits(splitHintSpec).collect(Collectors.toList()); } else { throw new ISE("firehoseFactory[%s] is not splittable", firehoseFactory.getClass().getSimpleName()); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 7f593aeddca0..265d38dd6053 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -158,6 +158,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen @JsonCreator public ParallelIndexSupervisorTask( @JsonProperty("id") String id, + @JsonProperty("groupId") @Nullable String groupId, @JsonProperty("resource") TaskResource taskResource, @JsonProperty("spec") ParallelIndexIngestionSpec ingestionSchema, @JsonProperty("context") Map context, @@ -170,7 +171,7 @@ public ParallelIndexSupervisorTask( { super( getOrMakeId(id, TYPE, ingestionSchema.getDataSchema().getDataSource()), - null, + groupId, taskResource, ingestionSchema.getDataSchema().getDataSource(), context @@ -396,10 +397,12 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception "firehoseFactory[%s] is not splittable. Running sequentially.", baseFirehoseFactory.getClass().getSimpleName() ); - } else if (ingestionSchema.getTuningConfig().getMaxNumConcurrentSubTasks() == 1) { + } else if (ingestionSchema.getTuningConfig().getMaxNumConcurrentSubTasks() <= 1) { LOG.warn( - "maxNumConcurrentSubTasks is 1. Running sequentially. " - + "Please set maxNumConcurrentSubTasks to something higher than 1 if you want to run in parallel ingestion mode." + "maxNumConcurrentSubTasks[%s] is less than or equal to 1. Running sequentially. " + + "Please set maxNumConcurrentSubTasks to something higher than 1 if you want to run in parallel " + + "ingestion mode.", + ingestionSchema.getTuningConfig().getMaxNumConcurrentSubTasks() ); } else { throw new ISE("Unknown reason for sequentail mode. Failing this task."); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java index 64159d9f3f43..552c4048a80d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; +import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig; import org.apache.druid.java.util.common.IAE; @@ -46,6 +47,8 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig private static final int DEFAULT_MAX_NUM_SEGMENTS_TO_MERGE = 100; private static final int DEFAULT_TOTAL_NUM_MERGE_TASKS = 10; + private final SplitHintSpec splitHintSpec; + private final int maxNumConcurrentSubTasks; private final int maxRetry; private final long taskStatusCheckPeriodMs; @@ -67,7 +70,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig */ private final int totalNumMergeTasks; - static ParallelIndexTuningConfig defaultConfig() + public static ParallelIndexTuningConfig defaultConfig() { return new ParallelIndexTuningConfig( null, @@ -94,6 +97,7 @@ static ParallelIndexTuningConfig defaultConfig() null, null, null, + null, null ); } @@ -106,6 +110,7 @@ public ParallelIndexTuningConfig( @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory, @JsonProperty("maxTotalRows") @Deprecated @Nullable Long maxTotalRows, @JsonProperty("numShards") @Deprecated @Nullable Integer numShards, + @JsonProperty("splitHintSpec") @Nullable SplitHintSpec splitHintSpec, @JsonProperty("partitionsSpec") @Nullable PartitionsSpec partitionsSpec, @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec, @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, @@ -117,7 +122,7 @@ public ParallelIndexTuningConfig( @JsonProperty("maxNumSubTasks") @Deprecated @Nullable Integer maxNumSubTasks, @JsonProperty("maxNumConcurrentSubTasks") @Nullable Integer maxNumConcurrentSubTasks, @JsonProperty("maxRetry") @Nullable Integer maxRetry, - @JsonProperty("taskStatusCheckPeriodMs") @Nullable Integer taskStatusCheckPeriodMs, + @JsonProperty("taskStatusCheckPeriodMs") @Nullable Long taskStatusCheckPeriodMs, @JsonProperty("chatHandlerTimeout") @Nullable Duration chatHandlerTimeout, @JsonProperty("chatHandlerNumRetries") @Nullable Integer chatHandlerNumRetries, @JsonProperty("maxNumSegmentsToMerge") @Nullable Integer maxNumSegmentsToMerge, @@ -154,6 +159,8 @@ public ParallelIndexTuningConfig( throw new IAE("Can't use both maxNumSubTasks and maxNumConcurrentSubTasks. Use maxNumConcurrentSubTasks instead"); } + this.splitHintSpec = splitHintSpec; + if (maxNumConcurrentSubTasks == null) { this.maxNumConcurrentSubTasks = maxNumSubTasks == null ? DEFAULT_MAX_NUM_CONCURRENT_SUB_TASKS : maxNumSubTasks; } else { @@ -182,6 +189,13 @@ public ParallelIndexTuningConfig( Preconditions.checkArgument(this.totalNumMergeTasks > 0, "totalNumMergeTasks must be positive"); } + @Nullable + @JsonProperty + public SplitHintSpec getSplitHintSpec() + { + return splitHintSpec; + } + @JsonProperty public int getMaxNumConcurrentSubTasks() { @@ -224,6 +238,39 @@ public int getTotalNumMergeTasks() return totalNumMergeTasks; } + @Override + public ParallelIndexTuningConfig withPartitionsSpec(PartitionsSpec partitionsSpec) + { + return new ParallelIndexTuningConfig( + null, + null, + getMaxRowsInMemory(), + getMaxBytesInMemory(), + null, + null, + getSplitHintSpec(), + partitionsSpec, + getIndexSpec(), + getIndexSpecForIntermediatePersists(), + getMaxPendingPersists(), + isForceGuaranteedRollup(), + isReportParseExceptions(), + getPushTimeout(), + getSegmentWriteOutMediumFactory(), + null, + getMaxNumConcurrentSubTasks(), + getMaxRetry(), + getTaskStatusCheckPeriodMs(), + getChatHandlerTimeout(), + getChatHandlerNumRetries(), + getMaxNumSegmentsToMerge(), + getTotalNumMergeTasks(), + isLogParseExceptions(), + getMaxParseExceptions(), + getMaxSavedParseExceptions() + ); + } + @Override public boolean equals(Object o) { @@ -243,6 +290,7 @@ public boolean equals(Object o) chatHandlerNumRetries == that.chatHandlerNumRetries && maxNumSegmentsToMerge == that.maxNumSegmentsToMerge && totalNumMergeTasks == that.totalNumMergeTasks && + Objects.equals(splitHintSpec, that.splitHintSpec) && Objects.equals(chatHandlerTimeout, that.chatHandlerTimeout); } @@ -251,6 +299,7 @@ public int hashCode() { return Objects.hash( super.hashCode(), + splitHintSpec, maxNumConcurrentSubTasks, maxRetry, taskStatusCheckPeriodMs, @@ -260,4 +309,19 @@ public int hashCode() totalNumMergeTasks ); } + + @Override + public String toString() + { + return "ParallelIndexTuningConfig{" + + "splitHintSpec=" + splitHintSpec + + ", maxNumConcurrentSubTasks=" + maxNumConcurrentSubTasks + + ", maxRetry=" + maxRetry + + ", taskStatusCheckPeriodMs=" + taskStatusCheckPeriodMs + + ", chatHandlerTimeout=" + chatHandlerTimeout + + ", chatHandlerNumRetries=" + chatHandlerNumRetries + + ", maxNumSegmentsToMerge=" + maxNumSegmentsToMerge + + ", totalNumMergeTasks=" + totalNumMergeTasks + + "} " + super.toString(); + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateParallelIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateParallelIndexTaskRunner.java index 35061a1df524..edb00aa100d3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateParallelIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateParallelIndexTaskRunner.java @@ -70,13 +70,13 @@ public String getName() @Override Iterator> subTaskSpecIterator() throws IOException { - return baseFirehoseFactory.getSplits().map(this::newTaskSpec).iterator(); + return baseFirehoseFactory.getSplits(getTuningConfig().getSplitHintSpec()).map(this::newTaskSpec).iterator(); } @Override int getTotalNumSubTasks() throws IOException { - return baseFirehoseFactory.getNumSplits(); + return baseFirehoseFactory.getNumSplits(getTuningConfig().getSplitHintSpec()); } @VisibleForTesting diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java index 9139e797def4..302f3fa6db1f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java @@ -79,13 +79,13 @@ ParallelIndexIngestionSpec getIngestionSchema() @Override Iterator> subTaskSpecIterator() throws IOException { - return baseFirehoseFactory.getSplits().map(this::newTaskSpec).iterator(); + return baseFirehoseFactory.getSplits(getTuningConfig().getSplitHintSpec()).map(this::newTaskSpec).iterator(); } @Override int getTotalNumSubTasks() throws IOException { - return baseFirehoseFactory.getNumSplits(); + return baseFirehoseFactory.getNumSplits(getTuningConfig().getSplitHintSpec()); } @VisibleForTesting diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java index 53969484128c..c8211511fd02 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java @@ -34,6 +34,8 @@ import org.apache.druid.data.input.FiniteFirehoseFactory; import org.apache.druid.data.input.Firehose; import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.SegmentsSplitHintSpec; +import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.indexing.common.RetryPolicy; import org.apache.druid.indexing.common.RetryPolicyFactory; @@ -78,7 +80,6 @@ public class IngestSegmentFirehoseFactory implements FiniteFirehoseFactory> { private static final EmittingLogger log = new EmittingLogger(IngestSegmentFirehoseFactory.class); - private static final long DEFAULT_MAX_INPUT_SEGMENT_BYTES_PER_TASK = 150 * 1024 * 1024; private final String dataSource; // Exactly one of interval and segmentIds should be non-null. Typically 'interval' is specified directly // by the user creating this firehose and 'segmentIds' is used for sub-tasks if it is split for parallel @@ -90,7 +91,8 @@ public class IngestSegmentFirehoseFactory implements FiniteFirehoseFactory dimensions; private final List metrics; - private final long maxInputSegmentBytesPerTask; + @Nullable + private final Long maxInputSegmentBytesPerTask; private final IndexIO indexIO; private final CoordinatorClient coordinatorClient; private final SegmentLoaderFactory segmentLoaderFactory; @@ -108,7 +110,7 @@ public IngestSegmentFirehoseFactory( @JsonProperty("filter") DimFilter dimFilter, @JsonProperty("dimensions") List dimensions, @JsonProperty("metrics") List metrics, - @JsonProperty("maxInputSegmentBytesPerTask") Long maxInputSegmentBytesPerTask, + @JsonProperty("maxInputSegmentBytesPerTask") @Deprecated @Nullable Long maxInputSegmentBytesPerTask, @JacksonInject IndexIO indexIO, @JacksonInject CoordinatorClient coordinatorClient, @JacksonInject SegmentLoaderFactory segmentLoaderFactory, @@ -125,9 +127,7 @@ public IngestSegmentFirehoseFactory( this.dimFilter = dimFilter; this.dimensions = dimensions; this.metrics = metrics; - this.maxInputSegmentBytesPerTask = maxInputSegmentBytesPerTask == null - ? DEFAULT_MAX_INPUT_SEGMENT_BYTES_PER_TASK - : maxInputSegmentBytesPerTask; + this.maxInputSegmentBytesPerTask = maxInputSegmentBytesPerTask; this.indexIO = Preconditions.checkNotNull(indexIO, "null IndexIO"); this.coordinatorClient = Preconditions.checkNotNull(coordinatorClient, "null CoordinatorClient"); this.segmentLoaderFactory = Preconditions.checkNotNull(segmentLoaderFactory, "null SegmentLoaderFactory"); @@ -190,8 +190,9 @@ public List getMetrics() return metrics; } + @Nullable @JsonProperty - public long getMaxInputSegmentBytesPerTask() + public Long getMaxInputSegmentBytesPerTask() { return maxInputSegmentBytesPerTask; } @@ -388,12 +389,26 @@ private List> getTimelineForSegmentIds return new ArrayList<>(timeline.values()); } - private void initializeSplitsIfNeeded() + private void initializeSplitsIfNeeded(@Nullable SplitHintSpec splitHintSpec) { if (splits != null) { return; } + final SegmentsSplitHintSpec nonNullSplitHintSpec; + if (!(splitHintSpec instanceof SegmentsSplitHintSpec)) { + if (splitHintSpec != null) { + log.warn("Given splitHintSpec[%s] is not a SegmentsSplitHintSpec. Ingoring it.", splitHintSpec); + } + nonNullSplitHintSpec = new SegmentsSplitHintSpec(null); + } else { + nonNullSplitHintSpec = (SegmentsSplitHintSpec) splitHintSpec; + } + + final long maxInputSegmentBytesPerTask = this.maxInputSegmentBytesPerTask == null + ? nonNullSplitHintSpec.getMaxInputSegmentBytesPerTask() + : this.maxInputSegmentBytesPerTask; + // isSplittable() ensures this is only called when we have an interval. final List> timelineSegments = getTimelineForInterval(); @@ -456,16 +471,16 @@ public boolean isSplittable() } @Override - public Stream>> getSplits() + public Stream>> getSplits(@Nullable SplitHintSpec splitHintSpec) { - initializeSplitsIfNeeded(); + initializeSplitsIfNeeded(splitHintSpec); return splits.stream(); } @Override - public int getNumSplits() + public int getNumSplits(@Nullable SplitHintSpec splitHintSpec) { - initializeSplitsIfNeeded(); + initializeSplitsIfNeeded(splitHintSpec); return splits.size(); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactQuerySerdeTest.java index 079fb3155081..a4ac7672b257 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactQuerySerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactQuerySerdeTest.java @@ -27,6 +27,9 @@ import org.apache.druid.client.indexing.ClientCompactQueryTuningConfig; import org.apache.druid.client.indexing.ClientCompactionIOConfig; import org.apache.druid.client.indexing.ClientCompactionIntervalSpec; +import org.apache.druid.client.indexing.IndexingServiceClient; +import org.apache.druid.client.indexing.NoopIndexingServiceClient; +import org.apache.druid.data.input.SegmentsSplitHintSpec; import org.apache.druid.guice.GuiceAnnotationIntrospector; import org.apache.druid.guice.GuiceInjectableValues; import org.apache.druid.guice.GuiceInjectors; @@ -74,6 +77,7 @@ public void testSerde() throws IOException 40000, 2000L, 30000L, + new SegmentsSplitHintSpec(100000L), new IndexSpec( new DefaultBitmapSerdeFactory(), CompressionStrategy.LZ4, @@ -81,7 +85,8 @@ public void testSerde() throws IOException LongEncodingStrategy.LONGS ), null, - 1000L + 1000L, + 100 ), new HashMap<>() ); @@ -100,22 +105,36 @@ public void testSerde() throws IOException ((CompactionIntervalSpec) task.getIoConfig().getInputSpec()).getSha256OfSortedSegmentIds() ); Assert.assertEquals( - query.getTuningConfig().getMaxRowsInMemory().intValue(), task.getTuningConfig().getMaxRowsInMemory() + query.getTuningConfig().getMaxRowsInMemory().intValue(), + task.getTuningConfig().getMaxRowsInMemory() ); Assert.assertEquals( - query.getTuningConfig().getMaxBytesInMemory().longValue(), task.getTuningConfig().getMaxBytesInMemory() + query.getTuningConfig().getMaxBytesInMemory().longValue(), + task.getTuningConfig().getMaxBytesInMemory() ); Assert.assertEquals( - query.getTuningConfig().getMaxRowsPerSegment(), task.getTuningConfig().getMaxRowsPerSegment() + query.getTuningConfig().getMaxRowsPerSegment(), + task.getTuningConfig().getMaxRowsPerSegment() ); Assert.assertEquals( - query.getTuningConfig().getMaxTotalRows(), task.getTuningConfig().getMaxTotalRows() + query.getTuningConfig().getMaxTotalRows(), + task.getTuningConfig().getMaxTotalRows() ); Assert.assertEquals( - query.getTuningConfig().getIndexSpec(), task.getTuningConfig().getIndexSpec() + query.getTuningConfig().getSplitHintSpec(), + task.getTuningConfig().getSplitHintSpec() ); Assert.assertEquals( - query.getTuningConfig().getPushTimeout().longValue(), task.getTuningConfig().getPushTimeout() + query.getTuningConfig().getIndexSpec(), + task.getTuningConfig().getIndexSpec() + ); + Assert.assertEquals( + query.getTuningConfig().getPushTimeout().longValue(), + task.getTuningConfig().getPushTimeout() + ); + Assert.assertEquals( + query.getTuningConfig().getMaxNumConcurrentSubTasks().intValue(), + task.getTuningConfig().getMaxNumConcurrentSubTasks() ); Assert.assertEquals(query.getContext(), task.getContext()); } @@ -143,6 +162,7 @@ private static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper objectMa binder.bind(CoordinatorClient.class).toInstance(COORDINATOR_CLIENT); binder.bind(SegmentLoaderFactory.class).toInstance(new SegmentLoaderFactory(null, objectMapper)); binder.bind(AppenderatorsManager.class).toInstance(APPENDERATORS_MANAGER); + binder.bind(IndexingServiceClient.class).toInstance(new NoopIndexingServiceClient()); } ) ) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java new file mode 100644 index 000000000000..330fed26701d --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.io.Files; +import org.apache.druid.client.ImmutableDruidDataSource; +import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.client.indexing.IndexingServiceClient; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.indexer.TaskState; +import org.apache.druid.indexing.common.LockGranularity; +import org.apache.druid.indexing.common.RetryPolicyConfig; +import org.apache.druid.indexing.common.RetryPolicyFactory; +import org.apache.druid.indexing.common.SegmentLoaderFactory; +import org.apache.druid.indexing.common.TestUtils; +import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; +import org.apache.druid.indexing.common.task.batch.parallel.AbstractParallelIndexSupervisorTaskTest; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig; +import org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseParallelIndexingTest.TestSupervisorTask; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; +import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; +import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider; +import org.apache.druid.server.security.AuthTestUtils; +import org.apache.druid.server.security.AuthorizerMapper; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; +import org.joda.time.Interval; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import javax.annotation.Nullable; +import java.io.BufferedWriter; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; + +@RunWith(Parameterized.class) +public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervisorTaskTest +{ + @Parameterized.Parameters(name = "{0}") + public static Iterable constructorFeeder() + { + return ImmutableList.of( + new Object[]{LockGranularity.TIME_CHUNK}, + new Object[]{LockGranularity.SEGMENT} + ); + } + + private static final String DATA_SOURCE = "test"; + private static final RetryPolicyFactory RETRY_POLICY_FACTORY = new RetryPolicyFactory(new RetryPolicyConfig()); + + private final AppenderatorsManager appenderatorsManager = new TestAppenderatorsManager(); + private final LockGranularity lockGranularity; + private final RowIngestionMetersFactory rowIngestionMetersFactory; + private final CoordinatorClient coordinatorClient; + + public CompactionTaskParallelRunTest(LockGranularity lockGranularity) + { + this.lockGranularity = lockGranularity; + this.rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory(); + coordinatorClient = new CoordinatorClient(null, null) + { + @Override + public List getDatabaseSegmentDataSourceSegments(String dataSource, List intervals) + { + return getStorageCoordinator().getUsedSegmentsForIntervals(dataSource, intervals); + } + + @Override + public DataSegment getDatabaseSegmentDataSourceSegment(String dataSource, String segmentId) + { + ImmutableDruidDataSource druidDataSource = getMetadataSegmentManager().getImmutableDataSourceWithUsedSegments( + dataSource + ); + if (druidDataSource == null) { + throw new ISE("Unknown datasource[%s]", dataSource); + } + + for (SegmentId possibleSegmentId : SegmentId.iteratePossibleParsingsWithDataSource(dataSource, segmentId)) { + DataSegment segment = druidDataSource.getSegment(possibleSegmentId); + if (segment != null) { + return segment; + } + } + throw new ISE("Can't find segment for id[%s]", segmentId); + } + }; + } + + @Before + public void setup() throws IOException + { + indexingServiceClient = new LocalIndexingServiceClient(); + localDeepStorage = temporaryFolder.newFolder(); + } + + @After + public void teardown() + { + indexingServiceClient.shutdown(); + temporaryFolder.delete(); + } + + @Test + public void testRunParallel() throws Exception + { + runIndexTask(); + + final CompactionTask compactionTask = new TestCompactionTask( + null, + null, + DATA_SOURCE, + new CompactionIOConfig(new CompactionIntervalSpec(Intervals.of("2014-01-01/2014-01-02"), null)), + null, + null, + null, + null, + newTuningConfig(), + null, + getObjectMapper(), + AuthTestUtils.TEST_AUTHORIZER_MAPPER, + new NoopChatHandlerProvider(), + rowIngestionMetersFactory, + coordinatorClient, + indexingServiceClient, + getSegmentLoaderFactory(), + RETRY_POLICY_FACTORY, + appenderatorsManager + ); + + runTask(compactionTask); + } + + private void runIndexTask() throws Exception + { + File tmpDir = temporaryFolder.newFolder(); + File tmpFile = File.createTempFile("druid", "index", tmpDir); + + try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + writer.write("2014-01-01T00:00:10Z,a,1\n"); + writer.write("2014-01-01T00:00:10Z,b,2\n"); + writer.write("2014-01-01T00:00:10Z,c,3\n"); + writer.write("2014-01-01T01:00:20Z,a,1\n"); + writer.write("2014-01-01T01:00:20Z,b,2\n"); + writer.write("2014-01-01T01:00:20Z,c,3\n"); + writer.write("2014-01-01T02:00:30Z,a,1\n"); + writer.write("2014-01-01T02:00:30Z,b,2\n"); + writer.write("2014-01-01T02:00:30Z,c,3\n"); + } + + IndexTask indexTask = new IndexTask( + null, + null, + IndexTaskTest.createIngestionSpec( + getObjectMapper(), + tmpDir, + CompactionTaskRunTest.DEFAULT_PARSE_SPEC, + new UniformGranularitySpec( + Granularities.HOUR, + Granularities.MINUTE, + null + ), + IndexTaskTest.createTuningConfig(2, 2, null, 2L, null, null, false, true), + false + ), + null, + AuthTestUtils.TEST_AUTHORIZER_MAPPER, + new NoopChatHandlerProvider(), + rowIngestionMetersFactory, + appenderatorsManager + ); + + runTask(indexTask); + } + + private void runTask(Task task) throws Exception + { + actionClient = createActionClient(task); + toolbox = createTaskToolbox(task); + prepareTaskForLocking(task); + task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK); + Assert.assertTrue(task.isReady(actionClient)); + Assert.assertEquals(TaskState.SUCCESS, task.run(toolbox).getStatusCode()); + shutdownTask(task); + } + + private static ParallelIndexTuningConfig newTuningConfig() + { + return new ParallelIndexTuningConfig( + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + 2, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + } + + private static class TestCompactionTask extends CompactionTask + { + private final IndexingServiceClient indexingServiceClient; + + TestCompactionTask( + String id, + TaskResource taskResource, + String dataSource, + @Nullable CompactionIOConfig ioConfig, + @Nullable DimensionsSpec dimensions, + @Nullable DimensionsSpec dimensionsSpec, + @Nullable AggregatorFactory[] metricsSpec, + @Nullable Granularity segmentGranularity, + @Nullable ParallelIndexTuningConfig tuningConfig, + @Nullable Map context, + ObjectMapper jsonMapper, + AuthorizerMapper authorizerMapper, + ChatHandlerProvider chatHandlerProvider, + RowIngestionMetersFactory rowIngestionMetersFactory, + CoordinatorClient coordinatorClient, + @Nullable IndexingServiceClient indexingServiceClient, + SegmentLoaderFactory segmentLoaderFactory, + RetryPolicyFactory retryPolicyFactory, + AppenderatorsManager appenderatorsManager + ) + { + super( + id, + taskResource, + dataSource, + null, + null, + ioConfig, + dimensions, + dimensionsSpec, + metricsSpec, + segmentGranularity, + tuningConfig, + context, + jsonMapper, + authorizerMapper, + chatHandlerProvider, + rowIngestionMetersFactory, + coordinatorClient, + indexingServiceClient, + segmentLoaderFactory, + retryPolicyFactory, + appenderatorsManager + ); + this.indexingServiceClient = indexingServiceClient; + } + + @Override + ParallelIndexSupervisorTask newTask(String taskId, ParallelIndexIngestionSpec ingestionSpec) + { + return new TestSupervisorTask( + taskId, + null, + ingestionSpec, + createContextForSubtask(), + indexingServiceClient + ); + } + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index fd0e9aa5263a..9a7fa51bad8e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -25,6 +25,8 @@ import com.google.common.collect.ImmutableMap; import com.google.common.io.Files; import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.client.indexing.IndexingServiceClient; +import org.apache.druid.client.indexing.NoopIndexingServiceClient; import org.apache.druid.data.input.impl.CSVParseSpec; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.ParseSpec; @@ -56,6 +58,7 @@ import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager; import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; +import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider; import org.apache.druid.server.security.AuthTestUtils; import org.apache.druid.timeline.CompactionState; import org.apache.druid.timeline.DataSegment; @@ -76,6 +79,7 @@ import javax.annotation.Nullable; import java.io.BufferedWriter; import java.io.File; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; @@ -90,15 +94,13 @@ @RunWith(Parameterized.class) public class CompactionTaskRunTest extends IngestionTestBase { - public static final String DATA_SOURCE = "test"; - @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @Rule public ExpectedException expectedException = ExpectedException.none(); - private static final ParseSpec DEFAULT_PARSE_SPEC = new CSVParseSpec( + public static final ParseSpec DEFAULT_PARSE_SPEC = new CSVParseSpec( new TimestampSpec( "ts", "auto", @@ -137,18 +139,23 @@ public static Iterable constructorFeeder() ); } + private static final String DATA_SOURCE = "test"; private static final RetryPolicyFactory RETRY_POLICY_FACTORY = new RetryPolicyFactory(new RetryPolicyConfig()); private final RowIngestionMetersFactory rowIngestionMetersFactory; + private final IndexingServiceClient indexingServiceClient; private final CoordinatorClient coordinatorClient; private final SegmentLoaderFactory segmentLoaderFactory; private final LockGranularity lockGranularity; + private final AppenderatorsManager appenderatorsManager; + private ExecutorService exec; - private AppenderatorsManager appenderatorsManager; + private File localDeepStorage; public CompactionTaskRunTest(LockGranularity lockGranularity) { TestUtils testUtils = new TestUtils(); rowIngestionMetersFactory = testUtils.getRowIngestionMetersFactory(); + indexingServiceClient = new NoopIndexingServiceClient(); coordinatorClient = new CoordinatorClient(null, null) { @Override @@ -163,15 +170,17 @@ public List getDatabaseSegmentDataSourceSegments(String dataSource, } @Before - public void setup() + public void setup() throws IOException { exec = Execs.multiThreaded(2, "compaction-task-run-test-%d"); + localDeepStorage = temporaryFolder.newFolder(); } @After public void teardown() { exec.shutdownNow(); + temporaryFolder.delete(); } @Test @@ -183,8 +192,9 @@ public void testRun() throws Exception DATA_SOURCE, getObjectMapper(), AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null, + new NoopChatHandlerProvider(), rowIngestionMetersFactory, + indexingServiceClient, coordinatorClient, segmentLoaderFactory, RETRY_POLICY_FACTORY, @@ -228,8 +238,9 @@ public void testRunCompactionTwice() throws Exception DATA_SOURCE, getObjectMapper(), AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null, + new NoopChatHandlerProvider(), rowIngestionMetersFactory, + indexingServiceClient, coordinatorClient, segmentLoaderFactory, RETRY_POLICY_FACTORY, @@ -306,8 +317,9 @@ public void testRunIndexAndCompactAtTheSameTimeForDifferentInterval() throws Exc DATA_SOURCE, getObjectMapper(), AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null, + new NoopChatHandlerProvider(), rowIngestionMetersFactory, + indexingServiceClient, coordinatorClient, segmentLoaderFactory, RETRY_POLICY_FACTORY, @@ -350,7 +362,7 @@ public void testRunIndexAndCompactAtTheSameTimeForDifferentInterval() throws Exc ), null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null, + new NoopChatHandlerProvider(), rowIngestionMetersFactory, appenderatorsManager ); @@ -404,8 +416,9 @@ public void testWithSegmentGranularity() throws Exception DATA_SOURCE, getObjectMapper(), AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null, + new NoopChatHandlerProvider(), rowIngestionMetersFactory, + indexingServiceClient, coordinatorClient, segmentLoaderFactory, RETRY_POLICY_FACTORY, @@ -459,8 +472,9 @@ public void testCompactThenAppend() throws Exception DATA_SOURCE, getObjectMapper(), AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null, + new NoopChatHandlerProvider(), rowIngestionMetersFactory, + indexingServiceClient, coordinatorClient, segmentLoaderFactory, RETRY_POLICY_FACTORY, @@ -506,8 +520,9 @@ public void testRunIndexAndCompactForSameSegmentAtTheSameTime() throws Exception DATA_SOURCE, getObjectMapper(), AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null, + new NoopChatHandlerProvider(), rowIngestionMetersFactory, + indexingServiceClient, coordinatorClient, segmentLoaderFactory, RETRY_POLICY_FACTORY, @@ -564,8 +579,9 @@ public void testRunIndexAndCompactForSameSegmentAtTheSameTime2() throws Exceptio DATA_SOURCE, getObjectMapper(), AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null, + new NoopChatHandlerProvider(), rowIngestionMetersFactory, + indexingServiceClient, coordinatorClient, segmentLoaderFactory, RETRY_POLICY_FACTORY, @@ -675,7 +691,7 @@ private Pair> runIndexTask( ), null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null, + new NoopChatHandlerProvider(), rowIngestionMetersFactory, appenderatorsManager ); @@ -696,31 +712,54 @@ private Pair> runTask( { getLockbox().add(task); getTaskStorage().insert(task, TaskStatus.running(task.getId())); - final TestLocalTaskActionClient actionClient = createActionClient(task); - final File deepStorageDir = temporaryFolder.newFolder(); final ObjectMapper objectMapper = getObjectMapper(); objectMapper.registerSubtypes( new NamedType(LocalLoadSpec.class, "local") ); objectMapper.registerSubtypes(LocalDataSegmentPuller.class); + final TaskToolbox box = createTaskToolbox(objectMapper, task); + + task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK); + task.addToContext(Tasks.STORE_COMPACTION_STATE_KEY, true); + if (task.isReady(box.getTaskActionClient())) { + if (readyLatchToCountDown != null) { + readyLatchToCountDown.countDown(); + } + if (latchToAwaitBeforeRun != null) { + latchToAwaitBeforeRun.await(); + } + TaskStatus status = task.run(box); + shutdownTask(task); + final List segments = new ArrayList<>( + ((TestLocalTaskActionClient) box.getTaskActionClient()).getPublishedSegments() + ); + Collections.sort(segments); + return Pair.of(status, segments); + } else { + throw new ISE("task[%s] is not ready", task.getId()); + } + } + + private TaskToolbox createTaskToolbox(ObjectMapper objectMapper, Task task) throws IOException + { final SegmentLoader loader = new SegmentLoaderLocalCacheManager( getIndexIO(), new SegmentLoaderConfig() { @Override public List getLocations() { - return ImmutableList.of(new StorageLocationConfig(deepStorageDir, null, null)); + return ImmutableList.of(new StorageLocationConfig(localDeepStorage, null, null)); } }, objectMapper ); - final TaskToolbox box = new TaskToolbox( + return new TaskToolbox( null, null, - actionClient, + createActionClient(task), null, new LocalDataSegmentPusher(new LocalDataSegmentPusherConfig()), new NoopDataSegmentKiller(), @@ -747,23 +786,5 @@ public List getLocations() new NoopTestTaskReportFileWriter(), null ); - - task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK); - task.addToContext(Tasks.STORE_COMPACTION_STATE_KEY, true); - if (task.isReady(box.getTaskActionClient())) { - if (readyLatchToCountDown != null) { - readyLatchToCountDown.countDown(); - } - if (latchToAwaitBeforeRun != null) { - latchToAwaitBeforeRun.await(); - } - TaskStatus status = task.run(box); - shutdownTask(task); - final List segments = new ArrayList<>(actionClient.getPublishedSegments()); - Collections.sort(segments); - return Pair.of(status, segments); - } else { - throw new ISE("task[%s] is not ready", task.getId()); - } } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index f8fe11a53a9f..3253fdc4f672 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -29,6 +29,8 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.client.indexing.IndexingServiceClient; +import org.apache.druid.client.indexing.NoopIndexingServiceClient; import org.apache.druid.data.input.FirehoseFactory; import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionsSpec; @@ -55,9 +57,9 @@ import org.apache.druid.indexing.common.task.CompactionTask.Builder; import org.apache.druid.indexing.common.task.CompactionTask.PartitionConfigurationManager; import org.apache.druid.indexing.common.task.CompactionTask.SegmentProvider; -import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig; -import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec; -import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig; import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.ISE; @@ -148,12 +150,13 @@ public class CompactionTaskTest Intervals.of("2017-06-01/2017-07-01") ); private static final Map MIXED_TYPE_COLUMN_MAP = new HashMap<>(); - private static final IndexTuningConfig TUNING_CONFIG = createTuningConfig(); + private static final ParallelIndexTuningConfig TUNING_CONFIG = createTuningConfig(); private static final RowIngestionMetersFactory ROW_INGESTION_METERS_FACTORY = new TestUtils() .getRowIngestionMetersFactory(); private static final Map SEGMENT_MAP = new HashMap<>(); private static final CoordinatorClient COORDINATOR_CLIENT = new TestCoordinatorClient(SEGMENT_MAP); + private static IndexingServiceClient INDEXING_SERVICE_CLIENT = new NoopIndexingServiceClient(); private static final AppenderatorsManager APPENDERATORS_MANAGER = new TestAppenderatorsManager(); private static final ObjectMapper OBJECT_MAPPER = setupInjectablesInObjectMapper(new DefaultObjectMapper()); private static final RetryPolicyFactory RETRY_POLICY_FACTORY = new RetryPolicyFactory(new RetryPolicyConfig()); @@ -250,6 +253,7 @@ private static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper objectMa binder.bind(CoordinatorClient.class).toInstance(COORDINATOR_CLIENT); binder.bind(SegmentLoaderFactory.class).toInstance(new SegmentLoaderFactory(null, objectMapper)); binder.bind(AppenderatorsManager.class).toInstance(APPENDERATORS_MANAGER); + binder.bind(IndexingServiceClient.class).toInstance(INDEXING_SERVICE_CLIENT); } ) ) @@ -277,9 +281,9 @@ private static List findDimensions(int startIndex, Interval segmentInter return dimensions; } - private static IndexTuningConfig createTuningConfig() + private static ParallelIndexTuningConfig createTuningConfig() { - return new IndexTuningConfig( + return new ParallelIndexTuningConfig( null, null, // null to compute maxRowsPerSegment automatically 500000, @@ -288,7 +292,6 @@ private static IndexTuningConfig createTuningConfig() null, null, null, - null, new IndexSpec( new RoaringBitmapSerdeFactory(true), CompressionStrategy.LZ4, @@ -296,11 +299,18 @@ private static IndexTuningConfig createTuningConfig() LongEncodingStrategy.LONGS ), null, - 5000, + null, true, false, + 5000L, + null, + null, + null, + null, + null, + null, + null, null, - 100L, null, null, null, @@ -332,6 +342,7 @@ public void testSerdeWithInterval() throws IOException AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, ROW_INGESTION_METERS_FACTORY, + INDEXING_SERVICE_CLIENT, COORDINATOR_CLIENT, segmentLoaderFactory, RETRY_POLICY_FACTORY, @@ -359,6 +370,7 @@ public void testSerdeWithSegments() throws IOException AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, ROW_INGESTION_METERS_FACTORY, + INDEXING_SERVICE_CLIENT, COORDINATOR_CLIENT, segmentLoaderFactory, RETRY_POLICY_FACTORY, @@ -384,6 +396,7 @@ public void testSerdeWithDimensions() throws IOException AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, ROW_INGESTION_METERS_FACTORY, + INDEXING_SERVICE_CLIENT, COORDINATOR_CLIENT, segmentLoaderFactory, RETRY_POLICY_FACTORY, @@ -424,7 +437,7 @@ private static void assertEquals(CompactionTask expected, CompactionTask actual) @Test public void testCreateIngestionSchema() throws IOException, SegmentLoadingException { - final List ingestionSpecs = CompactionTask.createIngestionSchema( + final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), new PartitionConfigurationManager(TUNING_CONFIG), @@ -457,8 +470,8 @@ public void testCreateIngestionSchema() throws IOException, SegmentLoadingExcept @Test public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOException, SegmentLoadingException { - final IndexTuningConfig tuningConfig = new IndexTuningConfig( - null, + final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig( + 100000, null, 500000, 1000000L, @@ -466,7 +479,6 @@ public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOExceptio null, null, null, - new HashedPartitionsSpec(6, null, null), new IndexSpec( new RoaringBitmapSerdeFactory(true), CompressionStrategy.LZ4, @@ -474,17 +486,24 @@ public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOExceptio LongEncodingStrategy.LONGS ), null, - 5000, + null, true, false, null, - 100L, + null, + null, + 10, + null, + null, + null, + null, + null, null, null, null, null ); - final List ingestionSpecs = CompactionTask.createIngestionSchema( + final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), new PartitionConfigurationManager(tuningConfig), @@ -518,16 +537,15 @@ public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOExceptio @Test public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException, SegmentLoadingException { - final IndexTuningConfig tuningConfig = new IndexTuningConfig( + final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig( null, null, 500000, 1000000L, + 1000000L, null, null, null, - null, - new HashedPartitionsSpec(null, 6, null), new IndexSpec( new RoaringBitmapSerdeFactory(true), CompressionStrategy.LZ4, @@ -535,17 +553,24 @@ public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException, Segm LongEncodingStrategy.LONGS ), null, - 5000, - true, + null, + false, false, + 5000L, + null, + null, + null, + null, + null, + null, + null, null, - 100L, null, null, null, null ); - final List ingestionSpecs = CompactionTask.createIngestionSchema( + final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), new PartitionConfigurationManager(tuningConfig), @@ -579,7 +604,7 @@ public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException, Segm @Test public void testCreateIngestionSchemaWithNumShards() throws IOException, SegmentLoadingException { - final IndexTuningConfig tuningConfig = new IndexTuningConfig( + final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig( null, null, 500000, @@ -587,7 +612,6 @@ public void testCreateIngestionSchemaWithNumShards() throws IOException, Segment null, null, null, - null, new HashedPartitionsSpec(null, 3, null), new IndexSpec( new RoaringBitmapSerdeFactory(true), @@ -596,17 +620,24 @@ public void testCreateIngestionSchemaWithNumShards() throws IOException, Segment LongEncodingStrategy.LONGS ), null, - 5000, + null, true, false, + 5000L, + null, + null, + 10, + null, + null, + null, + null, null, - 100L, null, null, null, null ); - final List ingestionSpecs = CompactionTask.createIngestionSchema( + final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), new PartitionConfigurationManager(tuningConfig), @@ -667,7 +698,7 @@ public void testCreateIngestionSchemaWithCustomDimensionsSpec() throws IOExcepti ) ); - final List ingestionSpecs = CompactionTask.createIngestionSchema( + final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), new PartitionConfigurationManager(TUNING_CONFIG), @@ -708,7 +739,7 @@ public void testCreateIngestionSchemaWithCustomMetricsSpec() throws IOException, new DoubleMaxAggregatorFactory("custom_double_max", "agg_4") }; - final List ingestionSpecs = CompactionTask.createIngestionSchema( + final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), new PartitionConfigurationManager(TUNING_CONFIG), @@ -742,7 +773,7 @@ public void testCreateIngestionSchemaWithCustomMetricsSpec() throws IOException, @Test public void testCreateIngestionSchemaWithCustomSegments() throws IOException, SegmentLoadingException { - final List ingestionSpecs = CompactionTask.createIngestionSchema( + final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, new SegmentProvider(DATA_SOURCE, SpecificSegmentsSpec.fromSegments(SEGMENTS)), new PartitionConfigurationManager(TUNING_CONFIG), @@ -831,6 +862,7 @@ public void testEmptyInterval() AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, ROW_INGESTION_METERS_FACTORY, + INDEXING_SERVICE_CLIENT, COORDINATOR_CLIENT, segmentLoaderFactory, RETRY_POLICY_FACTORY, @@ -845,7 +877,7 @@ public void testEmptyInterval() @Test public void testSegmentGranularity() throws IOException, SegmentLoadingException { - final List ingestionSpecs = CompactionTask.createIngestionSchema( + final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), new PartitionConfigurationManager(TUNING_CONFIG), @@ -880,7 +912,7 @@ public void testSegmentGranularity() throws IOException, SegmentLoadingException @Test public void testNullSegmentGranularityAnd() throws IOException, SegmentLoadingException { - final List ingestionSpecs = CompactionTask.createIngestionSchema( + final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), new PartitionConfigurationManager(TUNING_CONFIG), @@ -955,7 +987,7 @@ private static List getDimensionSchema(DimensionSchema mixedTyp } private void assertIngestionSchema( - List ingestionSchemas, + List ingestionSchemas, List expectedDimensionsSpecs, List expectedMetricsSpec, List expectedSegmentIntervals, @@ -967,7 +999,7 @@ private void assertIngestionSchema( expectedDimensionsSpecs, expectedMetricsSpec, expectedSegmentIntervals, - new IndexTuningConfig( + new ParallelIndexTuningConfig( null, null, 500000, @@ -975,8 +1007,7 @@ private void assertIngestionSchema( Long.MAX_VALUE, null, null, - null, - new HashedPartitionsSpec(null, null, null), // automatically computed targetPartitionSize + new HashedPartitionsSpec(5000000, null, null), // automatically computed targetPartitionSize new IndexSpec( new RoaringBitmapSerdeFactory(true), CompressionStrategy.LZ4, @@ -984,11 +1015,18 @@ private void assertIngestionSchema( LongEncodingStrategy.LONGS ), null, - 5000, + null, true, false, + 5000L, + null, + null, + null, + null, + null, + null, + null, null, - 100L, null, null, null, @@ -999,11 +1037,11 @@ private void assertIngestionSchema( } private void assertIngestionSchema( - List ingestionSchemas, + List ingestionSchemas, List expectedDimensionsSpecs, List expectedMetricsSpec, List expectedSegmentIntervals, - IndexTuningConfig expectedTuningConfig, + ParallelIndexTuningConfig expectedTuningConfig, Granularity expectedSegmentGranularity ) { @@ -1015,7 +1053,7 @@ private void assertIngestionSchema( ); for (int i = 0; i < ingestionSchemas.size(); i++) { - final IndexIngestionSpec ingestionSchema = ingestionSchemas.get(i); + final ParallelIndexIngestionSpec ingestionSchema = ingestionSchemas.get(i); final DimensionsSpec expectedDimensionsSpec = expectedDimensionsSpecs.get(i); // assert dataSchema @@ -1048,7 +1086,7 @@ private void assertIngestionSchema( ); // assert ioConfig - final IndexIOConfig ioConfig = ingestionSchema.getIOConfig(); + final ParallelIndexIOConfig ioConfig = ingestionSchema.getIOConfig(); Assert.assertFalse(ioConfig.isAppendToExisting()); final FirehoseFactory firehoseFactory = ioConfig.getFirehoseFactory(); Assert.assertTrue(firehoseFactory instanceof IngestSegmentFirehoseFactory); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java index f49a90f27b10..dcaffb395ded 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java @@ -24,6 +24,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.SegmentLoaderFactory; import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TestUtils; @@ -58,6 +59,7 @@ import org.apache.druid.segment.loading.LocalDataSegmentPusher; import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig; import org.apache.druid.segment.loading.NoopDataSegmentKiller; +import org.apache.druid.segment.loading.SegmentLoader; import org.apache.druid.server.DruidNode; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; @@ -86,6 +88,7 @@ public abstract class IngestionTestBase private final TestUtils testUtils = new TestUtils(); private final ObjectMapper objectMapper = testUtils.getTestObjectMapper(); + private SegmentLoaderFactory segmentLoaderFactory; private TaskStorage taskStorage; private IndexerSQLMetadataStorageCoordinator storageCoordinator; private MetadataSegmentManager segmentManager; @@ -112,6 +115,7 @@ public void setUp() throws IOException derbyConnectorRule.getConnector() ); lockbox = new TaskLockbox(taskStorage, storageCoordinator); + segmentLoaderFactory = new SegmentLoaderFactory(getIndexIO(), getObjectMapper()); } @After @@ -136,6 +140,11 @@ public void shutdownTask(Task task) lockbox.remove(task); } + public SegmentLoader newSegmentLoader(File storageDir) + { + return segmentLoaderFactory.manufacturate(storageDir); + } + public ObjectMapper getObjectMapper() { return objectMapper; @@ -146,6 +155,11 @@ public TaskStorage getTaskStorage() return taskStorage; } + public SegmentLoaderFactory getSegmentLoaderFactory() + { + return segmentLoaderFactory; + } + public IndexerMetadataStorageCoordinator getMetadataStorageCoordinator() { return storageCoordinator; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java index a8e282ed58da..974cfb3687db 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java @@ -99,10 +99,10 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase 0 ); - TestLocalTaskActionClient actionClient; - LocalIndexingServiceClient indexingServiceClient; - TaskToolbox toolbox; - File localDeepStorage; + protected TestLocalTaskActionClient actionClient; + protected LocalIndexingServiceClient indexingServiceClient; + protected TaskToolbox toolbox; + protected File localDeepStorage; @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -128,7 +128,7 @@ protected void initializeIntermeidaryDataManager() throws IOException ); } - class LocalIndexingServiceClient extends NoopIndexingServiceClient + public class LocalIndexingServiceClient extends NoopIndexingServiceClient { private final ConcurrentMap> tasks = new ConcurrentHashMap<>(); private final ListeningExecutorService service = MoreExecutors.listeningDecorator( @@ -246,13 +246,13 @@ public String killTask(String taskId) } } - void shutdown() + public void shutdown() { service.shutdownNow(); } } - TaskToolbox createTaskToolbox(Task task) throws IOException + protected TaskToolbox createTaskToolbox(Task task) throws IOException { return new TaskToolbox( null, @@ -278,7 +278,7 @@ public File getStorageDirectory() null, null, null, - null, + newSegmentLoader(temporaryFolder.newFolder()), getObjectMapper(), temporaryFolder.newFolder(task.getId()), getIndexIO(), @@ -307,6 +307,7 @@ static class TestParallelIndexSupervisorTask extends ParallelIndexSupervisorTask { super( id, + null, taskResource, ingestionSchema, context, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingTest.java index eb984c879acc..6501479e0f5b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingTest.java @@ -192,6 +192,7 @@ private ParallelIndexSupervisorTask newTask( null, null, null, + null, partitionsSpec, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java index ee33a7ad9c53..463ef4db5e67 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java @@ -23,6 +23,7 @@ import org.apache.druid.client.indexing.IndexingServiceClient; import org.apache.druid.data.input.FiniteFirehoseFactory; import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; @@ -161,7 +162,7 @@ private ParallelIndexSupervisorTask newTask( ) { final TestFirehoseFactory firehoseFactory = (TestFirehoseFactory) ioConfig.getFirehoseFactory(); - final int numTotalSubTasks = firehoseFactory.getNumSplits(); + final int numTotalSubTasks = firehoseFactory.getNumSplits(null); // set up ingestion spec final ParallelIndexIngestionSpec ingestionSpec = new ParallelIndexIngestionSpec( new DataSchema( @@ -201,6 +202,7 @@ private ParallelIndexSupervisorTask newTask( null, null, null, + null, numTotalSubTasks, null, null, @@ -256,13 +258,13 @@ private TestFirehoseFactory(InputSplit split) } @Override - public Stream> getSplits() + public Stream> getSplits(@Nullable SplitHintSpec splitHintSpec) { return splits.stream(); } @Override - public int getNumSplits() + public int getNumSplits(@Nullable SplitHintSpec splitHintSpec) { return splits.size(); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java index 94f67781f807..d55ce70a61b1 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java @@ -25,6 +25,7 @@ import org.apache.druid.data.input.FiniteFirehoseFactory; import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskLocation; @@ -432,6 +433,7 @@ private TestSupervisorTask newTask( null, null, null, + null, NUM_SUB_TASKS, null, null, @@ -465,13 +467,13 @@ private static class TestFirehose implements FiniteFirehoseFactory> getSplits() + public Stream> getSplits(@Nullable SplitHintSpec splitHintSpec) { return ids.stream().map(InputSplit::new); } @Override - public int getNumSplits() + public int getNumSplits(@Nullable SplitHintSpec splitHintSpec) { return ids.size(); } @@ -596,7 +598,7 @@ public SinglePhaseSubTask newSubTask(int numAttempts) new LocalParallelIndexTaskClientFactory(supervisorTask) ); final TestFirehose firehose = (TestFirehose) getIngestionSpec().getIOConfig().getFirehoseFactory(); - final InputSplit split = firehose.getSplits().findFirst().orElse(null); + final InputSplit split = firehose.getSplits(null).findFirst().orElse(null); if (split == null) { throw new ISE("Split is null"); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java index a5c1e97ba41d..c8857aaf105b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java @@ -156,6 +156,7 @@ ParallelIndexSupervisorTask build() { return new ParallelIndexSupervisorTask( ID, + null, taskResource, ingestionSpec, context, @@ -248,6 +249,7 @@ ParallelIndexIngestionSpec build() null, null, null, + null, partitionsSpec, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java index b8d081a2878b..f9577df97b6b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java @@ -71,6 +71,7 @@ public void testSerdeWithMaxRowsPerSegment() 1000L, null, null, + null, new DynamicPartitionsSpec(100, 100L), new IndexSpec( new RoaringBitmapSerdeFactory(true), @@ -87,7 +88,7 @@ public void testSerdeWithMaxRowsPerSegment() null, 250, 100, - 20, + 20L, new Duration(3600), 128, null, @@ -112,6 +113,7 @@ public void testSerdeWithMaxNumConcurrentSubTasks() throws IOException 1000L, null, null, + null, new DynamicPartitionsSpec(100, 100L), new IndexSpec( new RoaringBitmapSerdeFactory(true), @@ -128,7 +130,7 @@ public void testSerdeWithMaxNumConcurrentSubTasks() throws IOException null, maxNumConcurrentSubTasks, 100, - 20, + 20L, new Duration(3600), 128, null, @@ -153,6 +155,7 @@ public void testSerdeWithMaxNumSubTasks() throws IOException 1000L, null, null, + null, new DynamicPartitionsSpec(100, 100L), new IndexSpec( new RoaringBitmapSerdeFactory(true), @@ -169,7 +172,7 @@ public void testSerdeWithMaxNumSubTasks() throws IOException maxNumSubTasks, null, 100, - 20, + 20L, new Duration(3600), 128, null, @@ -196,6 +199,7 @@ public void testSerdeWithMaxNumSubTasksAndMaxNumConcurrentSubTasks() 1000L, null, null, + null, new DynamicPartitionsSpec(100, 100L), new IndexSpec( new RoaringBitmapSerdeFactory(true), @@ -212,7 +216,7 @@ public void testSerdeWithMaxNumSubTasksAndMaxNumConcurrentSubTasks() maxNumSubTasks, maxNumSubTasks, 100, - 20, + 20L, new Duration(3600), 128, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java index 24f19cd35039..0c17b8c77abf 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java @@ -301,6 +301,7 @@ public void testWith1MaxNumConcurrentSubTasks() throws Exception null, null, null, + null, 1, null, null, @@ -373,6 +374,7 @@ private ParallelIndexSupervisorTask newTask( null, null, null, + null, 2, null, null, @@ -431,9 +433,9 @@ private ParallelIndexSupervisorTask newTask( ); } - private static class TestSupervisorTask extends TestParallelIndexSupervisorTask + public static class TestSupervisorTask extends TestParallelIndexSupervisorTask { - TestSupervisorTask( + public TestSupervisorTask( String id, TaskResource taskResource, ParallelIndexIngestionSpec ingestionSchema, @@ -451,7 +453,7 @@ SinglePhaseParallelIndexTaskRunner createSinglePhaseTaskRunner(TaskToolbox toolb } } - private static class TestSinglePhaseRunner extends TestSinglePhaseParallelIndexTaskRunner + public static class TestSinglePhaseRunner extends TestSinglePhaseParallelIndexTaskRunner { private final ParallelIndexSupervisorTask supervisorTask; @@ -496,7 +498,7 @@ SinglePhaseSubTaskSpec newTaskSpec(InputSplit split) } } - private static class TestSinglePhaseSubTaskSpec extends SinglePhaseSubTaskSpec + public static class TestSinglePhaseSubTaskSpec extends SinglePhaseSubTaskSpec { private final ParallelIndexSupervisorTask supervisorTask; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java index 4944d2765221..ec250c4217d3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java @@ -163,11 +163,11 @@ private void testSimple() throws Exception private void testSplit() throws Exception { Assert.assertTrue(factory.isSplittable()); - final int numSplits = factory.getNumSplits(); + final int numSplits = factory.getNumSplits(null); // We set maxInputSegmentBytesPerSplit to 2 so each segment should become a byte. Assert.assertEquals(segmentCount, numSplits); final List>> splits = - factory.getSplits().collect(Collectors.toList()); + factory.getSplits(null).collect(Collectors.toList()); Assert.assertEquals(numSplits, splits.size()); int count = 0; diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQueryTuningConfig.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQueryTuningConfig.java index cedacc8498dc..c99a7cb0bf49 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQueryTuningConfig.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQueryTuningConfig.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.segment.IndexSpec; import org.apache.druid.server.coordinator.DataSourceCompactionConfig.UserCompactTuningConfig; @@ -38,28 +39,46 @@ public class ClientCompactQueryTuningConfig @Nullable private final Long maxTotalRows; @Nullable + private final SplitHintSpec splitHintSpec; + @Nullable private final IndexSpec indexSpec; @Nullable private final Integer maxPendingPersists; @Nullable private final Long pushTimeout; + @Nullable + private final Integer maxNumConcurrentSubTasks; public static ClientCompactQueryTuningConfig from( @Nullable UserCompactTuningConfig userCompactionTaskQueryTuningConfig, @Nullable Integer maxRowsPerSegment ) { - return new ClientCompactQueryTuningConfig( - maxRowsPerSegment, - userCompactionTaskQueryTuningConfig == null ? null : userCompactionTaskQueryTuningConfig.getMaxRowsInMemory(), - userCompactionTaskQueryTuningConfig == null ? null : userCompactionTaskQueryTuningConfig.getMaxBytesInMemory(), - userCompactionTaskQueryTuningConfig == null ? null : userCompactionTaskQueryTuningConfig.getMaxTotalRows(), - userCompactionTaskQueryTuningConfig == null ? null : userCompactionTaskQueryTuningConfig.getIndexSpec(), - userCompactionTaskQueryTuningConfig == null - ? null - : userCompactionTaskQueryTuningConfig.getMaxPendingPersists(), - userCompactionTaskQueryTuningConfig == null ? null : userCompactionTaskQueryTuningConfig.getPushTimeout() - ); + if (userCompactionTaskQueryTuningConfig == null) { + return new ClientCompactQueryTuningConfig( + maxRowsPerSegment, + null, + null, + null, + null, + null, + null, + null, + null + ); + } else { + return new ClientCompactQueryTuningConfig( + maxRowsPerSegment, + userCompactionTaskQueryTuningConfig.getMaxRowsInMemory(), + userCompactionTaskQueryTuningConfig.getMaxBytesInMemory(), + userCompactionTaskQueryTuningConfig.getMaxTotalRows(), + userCompactionTaskQueryTuningConfig.getSplitHintSpec(), + userCompactionTaskQueryTuningConfig.getIndexSpec(), + userCompactionTaskQueryTuningConfig.getMaxPendingPersists(), + userCompactionTaskQueryTuningConfig.getPushTimeout(), + userCompactionTaskQueryTuningConfig.getMaxNumConcurrentSubTasks() + ); + } } @JsonCreator @@ -68,24 +87,28 @@ public ClientCompactQueryTuningConfig( @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory, @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory, @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows, + @JsonProperty("splitHintSpec") @Nullable SplitHintSpec splitHintSpec, @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec, @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists, - @JsonProperty("pushTimeout") @Nullable Long pushTimeout + @JsonProperty("pushTimeout") @Nullable Long pushTimeout, + @JsonProperty("maxNumConcurrentSubTasks") @Nullable Integer maxNumConcurrentSubTasks ) { this.maxRowsPerSegment = maxRowsPerSegment; this.maxBytesInMemory = maxBytesInMemory; this.maxRowsInMemory = maxRowsInMemory; this.maxTotalRows = maxTotalRows; + this.splitHintSpec = splitHintSpec; this.indexSpec = indexSpec; this.maxPendingPersists = maxPendingPersists; this.pushTimeout = pushTimeout; + this.maxNumConcurrentSubTasks = maxNumConcurrentSubTasks; } @JsonProperty public String getType() { - return "index"; + return "index_parallel"; } @JsonProperty @@ -116,6 +139,13 @@ public Long getMaxTotalRows() return maxTotalRows; } + @Nullable + @JsonProperty + public SplitHintSpec getSplitHintSpec() + { + return splitHintSpec; + } + public long getMaxTotalRowsOr(long defaultMaxTotalRows) { return maxTotalRows == null ? defaultMaxTotalRows : maxTotalRows; @@ -142,6 +172,13 @@ public Long getPushTimeout() return pushTimeout; } + @JsonProperty + @Nullable + public Integer getMaxNumConcurrentSubTasks() + { + return maxNumConcurrentSubTasks; + } + @Override public boolean equals(Object o) { @@ -158,7 +195,8 @@ public boolean equals(Object o) Objects.equals(maxTotalRows, that.maxTotalRows) && Objects.equals(indexSpec, that.indexSpec) && Objects.equals(maxPendingPersists, that.maxPendingPersists) && - Objects.equals(pushTimeout, that.pushTimeout); + Objects.equals(pushTimeout, that.pushTimeout) && + Objects.equals(maxNumConcurrentSubTasks, that.maxNumConcurrentSubTasks); } @Override @@ -171,7 +209,8 @@ public int hashCode() maxTotalRows, indexSpec, maxPendingPersists, - pushTimeout + pushTimeout, + maxNumConcurrentSubTasks ); } @@ -186,6 +225,7 @@ public String toString() ", indexSpec=" + indexSpec + ", maxPendingPersists=" + maxPendingPersists + ", pushTimeout=" + pushTimeout + + ", maxNumConcurrentSubTasks=" + maxNumConcurrentSubTasks + '}'; } } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseFactory.java index dec2ab045bbe..31cb38c8a58f 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseFactory.java @@ -25,6 +25,7 @@ import org.apache.druid.data.input.FiniteFirehoseFactory; import org.apache.druid.data.input.Firehose; import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.data.input.impl.StringInputRowParser; import javax.annotation.Nullable; @@ -84,13 +85,13 @@ public boolean isSplittable() } @Override - public Stream> getSplits() + public Stream> getSplits(@Nullable SplitHintSpec splitHintSpec) { return Stream.of(new InputSplit<>(data)); } @Override - public int getNumSplits() + public int getNumSplits(@Nullable SplitHintSpec splitHintSpec) { return 1; } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java index da5cde019a38..c59de7f11028 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import org.apache.druid.client.indexing.ClientCompactQueryTuningConfig; +import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.segment.IndexSpec; import org.joda.time.Period; @@ -158,9 +159,11 @@ public UserCompactTuningConfig( @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory, @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory, @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows, + @JsonProperty("splitHintSpec") @Nullable SplitHintSpec splitHintSpec, @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec, @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists, - @JsonProperty("pushTimeout") @Nullable Long pushTimeout + @JsonProperty("pushTimeout") @Nullable Long pushTimeout, + @JsonProperty("maxNumConcurrentSubTasks") @Nullable Integer maxNumConcurrentSubTasks ) { super( @@ -168,9 +171,11 @@ public UserCompactTuningConfig( maxRowsInMemory, maxBytesInMemory, maxTotalRows, + splitHintSpec, indexSpec, maxPendingPersists, - pushTimeout + pushTimeout, + maxNumConcurrentSubTasks ); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java index e6a6705385b7..ae9a9ac88fb3 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java @@ -92,6 +92,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) final List compactTasks = filterNonCompactTasks(indexingServiceClient.getActiveTasks()); // dataSource -> list of intervals of compact tasks final Map> compactTaskIntervals = new HashMap<>(compactionConfigList.size()); + int numEstimatedNonCompleteCompactionTasks = 0; for (TaskStatusPlus status : compactTasks) { final TaskPayloadResponse response = indexingServiceClient.getTaskPayload(status.getId()); if (response == null) { @@ -101,6 +102,8 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) final ClientCompactQuery compactQuery = (ClientCompactQuery) response.getPayload(); final Interval interval = compactQuery.getIoConfig().getInputSpec().getInterval(); compactTaskIntervals.computeIfAbsent(status.getDataSource(), k -> new ArrayList<>()).add(interval); + final int numSubTasks = findNumMaxConcurrentSubTasks(compactQuery.getTuningConfig()); + numEstimatedNonCompleteCompactionTasks += numSubTasks + 1; // count the compaction task itself } else { throw new ISE("WTH? task[%s] is not a compactTask?", status.getId()); } @@ -112,13 +115,19 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) indexingServiceClient.getTotalWorkerCapacity() * dynamicConfig.getCompactionTaskSlotRatio(), dynamicConfig.getMaxCompactionTaskSlots() ); - final int numNonCompleteCompactionTasks = compactTasks.size(); - final int numAvailableCompactionTaskSlots = numNonCompleteCompactionTasks > 0 - ? Math.max(0, compactionTaskCapacity - numNonCompleteCompactionTasks) - // compactionTaskCapacity might be 0 if totalWorkerCapacity is low. - // This guarantees that at least one slot is available if - // compaction is enabled and numRunningCompactTasks is 0. - : Math.max(1, compactionTaskCapacity); + final int numAvailableCompactionTaskSlots; + if (numEstimatedNonCompleteCompactionTasks > 0) { + numAvailableCompactionTaskSlots = Math.max( + 0, + compactionTaskCapacity - numEstimatedNonCompleteCompactionTasks + ); + } else { + // compactionTaskCapacity might be 0 if totalWorkerCapacity is low. + // This guarantees that at least one slot is available if + // compaction is enabled and numRunningCompactTasks is 0. + numAvailableCompactionTaskSlots = Math.max(1, compactionTaskCapacity); + } + LOG.info( "Found [%d] available task slots for compaction out of [%d] max compaction task capacity", numAvailableCompactionTaskSlots, @@ -141,6 +150,25 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) .build(); } + /** + * Each compaction task can run a parallel indexing task. When we count the number of current running + * compaction tasks, we should count the sub tasks of the parallel indexing task as well. However, we currently + * don't have a good way to get the number of current running sub tasks except poking each supervisor task, + * which is complex to handle all kinds of failures. Here, we simply return {@code maxNumConcurrentSubTasks} instead + * to estimate the number of sub tasks conservatively. This should be ok since it won't affect to the performance of + * other ingestion types. + */ + private int findNumMaxConcurrentSubTasks(@Nullable ClientCompactQueryTuningConfig tuningConfig) + { + if (tuningConfig != null && tuningConfig.getMaxNumConcurrentSubTasks() != null) { + // The actual number of subtasks might be smaller than the configured max. + // However, we use the max to simplify the estimation here. + return tuningConfig.getMaxNumConcurrentSubTasks(); + } else { + return 0; + } + } + private static List filterNonCompactTasks(List taskStatuses) { return taskStatuses @@ -164,7 +192,7 @@ private CoordinatorStats doRun( { int numSubmittedTasks = 0; - for (; iterator.hasNext() && numSubmittedTasks < numAvailableCompactionTaskSlots; numSubmittedTasks++) { + for (; iterator.hasNext() && numSubmittedTasks < numAvailableCompactionTaskSlots;) { final List segmentsToCompact = iterator.next(); if (!segmentsToCompact.isEmpty()) { @@ -182,6 +210,8 @@ private CoordinatorStats doRun( taskId, Iterables.transform(segmentsToCompact, DataSegment::getId) ); + // Count the compaction task itself + its sub tasks + numSubmittedTasks += findNumMaxConcurrentSubTasks(config.getTuningConfig()) + 1; } else { throw new ISE("segmentsToCompact is empty?"); } diff --git a/server/src/test/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseFactoryTest.java b/server/src/test/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseFactoryTest.java index 6bc9a0e12136..58671e82a525 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseFactoryTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseFactoryTest.java @@ -85,7 +85,7 @@ public void testInterfaceImplementation() { Assert.assertTrue(target instanceof FiniteFirehoseFactory); Assert.assertFalse(target.isSplittable()); - Assert.assertEquals(1, target.getNumSplits()); + Assert.assertEquals(1, target.getNumSplits(null)); } @Test(expected = NullPointerException.class) @@ -115,7 +115,7 @@ public void testConnect() throws IOException @Test public void testForcedSplitAndClone() { - Optional> inputSplitOptional = target.getSplits().findFirst(); + Optional> inputSplitOptional = target.getSplits(null).findFirst(); Assert.assertTrue(inputSplitOptional.isPresent()); FiniteFirehoseFactory cloneWithSplit = target.withSplit(inputSplitOptional.get()); Assert.assertTrue(cloneWithSplit instanceof InlineFirehoseFactory); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java index 5d5b9df3a098..7b9e3b2698bd 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; +import org.apache.druid.data.input.SegmentsSplitHintSpec; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy; @@ -93,7 +94,7 @@ public void testSerdeWithMaxRowsPerSegment() throws IOException @Test public void testSerdeUserCompactTuningConfig() throws IOException { - final UserCompactTuningConfig config = new UserCompactTuningConfig(null, null, null, null, null, null); + final UserCompactTuningConfig config = new UserCompactTuningConfig(null, null, null, null, null, null, null, null); final String json = OBJECT_MAPPER.writeValueAsString(config); // Check maxRowsPerSegment doesn't exist in the JSON string Assert.assertFalse(json.contains("maxRowsPerSegment")); @@ -116,6 +117,8 @@ public void testSerdeWithMaxTotalRows() throws IOException 10000L, null, null, + null, + null, null ), ImmutableMap.of("key", "val") @@ -147,6 +150,8 @@ public void testSerdeMaxTotalRowsWithMaxRowsPerSegment() throws IOException 10000L, null, null, + null, + null, null ), ImmutableMap.of("key", "val") @@ -171,6 +176,7 @@ public void testSerdeUserCompactionTuningConfig() throws IOException 1000, 10000L, 2000L, + new SegmentsSplitHintSpec(10000L), new IndexSpec( new RoaringBitmapSerdeFactory(false), CompressionStrategy.LZF, @@ -178,7 +184,8 @@ public void testSerdeUserCompactionTuningConfig() throws IOException LongEncodingStrategy.LONGS ), 1, - 3000L + 3000L, + null ); final String json = OBJECT_MAPPER.writeValueAsString(tuningConfig); diff --git a/website/.spelling b/website/.spelling index 0ed9a2649027..f1fdcdb6fdb7 100644 --- a/website/.spelling +++ b/website/.spelling @@ -918,6 +918,8 @@ InlineFirehose LocalFirehose PartitionsSpec PasswordProviders +SegmentsSplitHintSpec +SplitHintSpec appendToExisting baseDir chatHandlerNumRetries @@ -938,6 +940,7 @@ reportParseExceptions segmentWriteOutMediumFactory sql sqls +splitHintSpec taskStatusCheckPeriodMs timeChunk totalNumMergeTasks