diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SurrogateTaskActionClient.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SurrogateTaskActionClient.java new file mode 100644 index 000000000000..deafd20d7563 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SurrogateTaskActionClient.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.actions; + +import java.io.IOException; + +/** + * A {@link TaskActionClient} that wraps a given {@link TaskAction} with {@link SurrogateAction}. + * All subtasks of {@link org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask} must + * use this client or wrap taskActions manually. + */ +public class SurrogateTaskActionClient implements TaskActionClient +{ + private final String supervisorTaskId; + private final TaskActionClient delegate; + + public SurrogateTaskActionClient(String supervisorTaskId, TaskActionClient delegate) + { + this.supervisorTaskId = supervisorTaskId; + this.delegate = delegate; + } + + @Override + public RetType submit(TaskAction taskAction) throws IOException + { + return delegate.submit(new SurrogateAction<>(supervisorTaskId, taskAction)); + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/OverlordCoordinatingSegmentAllocator.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/OverlordCoordinatingSegmentAllocator.java index 87daaa865506..6efd39ae0ed6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/OverlordCoordinatingSegmentAllocator.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/OverlordCoordinatingSegmentAllocator.java @@ -25,7 +25,8 @@ import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.SegmentAllocateAction; -import org.apache.druid.indexing.common.actions.SurrogateAction; +import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient; +import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.task.TaskLockHelper.OverwritingRootGenerationPartitions; import org.apache.druid.indexing.common.task.batch.parallel.SupervisorTaskAccess; import org.apache.druid.java.util.common.ISE; @@ -58,8 +59,12 @@ public class OverlordCoordinatingSegmentAllocator implements SegmentAllocatorFor final PartitionsSpec partitionsSpec ) { + final TaskActionClient taskActionClient = + supervisorTaskAccess == null + ? toolbox.getTaskActionClient() + : new SurrogateTaskActionClient(supervisorTaskAccess.getSupervisorTaskId(), toolbox.getTaskActionClient()); this.internalAllocator = new ActionBasedSegmentAllocator( - toolbox.getTaskActionClient(), + taskActionClient, dataSchema, (schema, row, sequenceName, previousSegmentId, skipSegmentLineageCheck) -> { final GranularitySpec granularitySpec = schema.getGranularitySpec(); @@ -72,34 +77,17 @@ public class OverlordCoordinatingSegmentAllocator implements SegmentAllocatorFor taskLockHelper, interval ); - if (supervisorTaskAccess != null) { - return new SurrogateAction<>( - supervisorTaskAccess.getSupervisorTaskId(), - new SegmentAllocateAction( - schema.getDataSource(), - row.getTimestamp(), - schema.getGranularitySpec().getQueryGranularity(), - schema.getGranularitySpec().getSegmentGranularity(), - sequenceName, - previousSegmentId, - skipSegmentLineageCheck, - partialShardSpec, - taskLockHelper.getLockGranularityToUse() - ) - ); - } else { - return new SegmentAllocateAction( - schema.getDataSource(), - row.getTimestamp(), - schema.getGranularitySpec().getQueryGranularity(), - schema.getGranularitySpec().getSegmentGranularity(), - sequenceName, - previousSegmentId, - skipSegmentLineageCheck, - partialShardSpec, - taskLockHelper.getLockGranularityToUse() - ); - } + return new SegmentAllocateAction( + schema.getDataSource(), + row.getTimestamp(), + schema.getGranularitySpec().getQueryGranularity(), + schema.getGranularitySpec().getSegmentGranularity(), + sequenceName, + previousSegmentId, + skipSegmentLineageCheck, + partialShardSpec, + taskLockHelper.getLockGranularityToUse() + ); } ); this.sequenceNameFunction = new LinearlyPartitionedSequenceNameFunction(taskId); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIngestionSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIngestionSpec.java index 06baae4b7dd3..5d057cded060 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIngestionSpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIngestionSpec.java @@ -63,6 +63,11 @@ public ParallelIndexIngestionSpec( this.tuningConfig = tuningConfig == null ? ParallelIndexTuningConfig.defaultConfig() : tuningConfig; } + public ParallelIndexIngestionSpec withDataSchema(DataSchema dataSchema) + { + return new ParallelIndexIngestionSpec(dataSchema, ioConfig, tuningConfig); + } + @Override @JsonProperty("dataSchema") public DataSchema getDataSchema() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 5cbfc01635d9..3d272b43813c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -188,10 +188,6 @@ public ParallelIndexSupervisorTask( if (isGuaranteedRollup(ingestionSchema.getIOConfig(), ingestionSchema.getTuningConfig())) { checkPartitionsSpecForForceGuaranteedRollup(ingestionSchema.getTuningConfig().getGivenOrDefaultPartitionsSpec()); - - if (ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals().isEmpty()) { - throw new ISE("forceGuaranteedRollup is set but intervals is missing in granularitySpec"); - } } this.baseInputSource = ingestionSchema.getIOConfig().getNonNullInputSource( @@ -290,7 +286,8 @@ PartialDimensionCardinalityParallelIndexTaskRunner createPartialDimensionCardina @VisibleForTesting PartialHashSegmentGenerateParallelIndexTaskRunner createPartialHashSegmentGenerateRunner( TaskToolbox toolbox, - Integer numShardsOverride + ParallelIndexIngestionSpec ingestionSchema, + @Nullable Map intervalToNumShardsOverride ) { return new PartialHashSegmentGenerateParallelIndexTaskRunner( @@ -299,7 +296,7 @@ PartialHashSegmentGenerateParallelIndexTaskRunner createPartialHashSegmentGenera getGroupId(), ingestionSchema, getContext(), - numShardsOverride + intervalToNumShardsOverride ); } @@ -318,7 +315,8 @@ PartialDimensionDistributionParallelIndexTaskRunner createPartialDimensionDistri @VisibleForTesting PartialRangeSegmentGenerateParallelIndexTaskRunner createPartialRangeSegmentGenerateRunner( TaskToolbox toolbox, - Map intervalToPartitions + Map intervalToPartitions, + ParallelIndexIngestionSpec ingestionSchema ) { return new PartialRangeSegmentGenerateParallelIndexTaskRunner( @@ -334,16 +332,17 @@ PartialRangeSegmentGenerateParallelIndexTaskRunner createPartialRangeSegmentGene @VisibleForTesting PartialGenericSegmentMergeParallelIndexTaskRunner createPartialGenericSegmentMergeRunner( TaskToolbox toolbox, - List ioConfigs + List ioConfigs, + ParallelIndexIngestionSpec ingestionSchema ) { return new PartialGenericSegmentMergeParallelIndexTaskRunner( toolbox, getId(), getGroupId(), - getIngestionSchema().getDataSchema(), + ingestionSchema.getDataSchema(), ioConfigs, - getIngestionSchema().getTuningConfig(), + ingestionSchema.getTuningConfig(), getContext() ); } @@ -529,9 +528,30 @@ private TaskStatus runMultiPhaseParallel(TaskToolbox toolbox) throws Exception : runHashPartitionMultiPhaseParallel(toolbox); } + private static ParallelIndexIngestionSpec rewriteIngestionSpecWithIntervalsIfMissing( + ParallelIndexIngestionSpec ingestionSchema, + Collection intervals + ) + { + if (ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals().isEmpty()) { + return ingestionSchema + .withDataSchema( + ingestionSchema.getDataSchema().withGranularitySpec( + ingestionSchema + .getDataSchema() + .getGranularitySpec() + .withIntervals(new ArrayList<>(intervals)) + ) + ); + } else { + return ingestionSchema; + } + } + private TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throws Exception { TaskState state; + ParallelIndexIngestionSpec ingestionSchemaToUse = ingestionSchema; if (!(ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof HashedPartitionsSpec)) { // only range and hash partitioning is supported for multiphase parallel ingestion, see runMultiPhaseParallel() @@ -541,49 +561,64 @@ private TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throw ); } - final Integer numShardsOverride; + final Map intervalToNumShards; HashedPartitionsSpec partitionsSpec = (HashedPartitionsSpec) ingestionSchema.getTuningConfig().getPartitionsSpec(); - if (partitionsSpec.getNumShards() == null) { - // 0. need to determine numShards by scanning the data - LOG.info("numShards is unspecified, beginning %s phase.", PartialDimensionCardinalityTask.TYPE); + final boolean needsInputSampling = + partitionsSpec.getNumShards() == null + || ingestionSchemaToUse.getDataSchema().getGranularitySpec().inputIntervals().isEmpty(); + if (needsInputSampling) { + // 0. need to determine intervals and numShards by scanning the data + LOG.info("Needs to determine intervals or numShards, beginning %s phase.", PartialDimensionCardinalityTask.TYPE); ParallelIndexTaskRunner cardinalityRunner = createRunner( toolbox, this::createPartialDimensionCardinalityRunner ); - if (cardinalityRunner == null) { - throw new ISE("Could not create cardinality runner for hash partitioning."); - } - state = runNextPhase(cardinalityRunner); if (state.isFailure()) { return TaskStatus.failure(getId()); } - int effectiveMaxRowsPerSegment = partitionsSpec.getMaxRowsPerSegment() == null - ? PartitionsSpec.DEFAULT_MAX_ROWS_PER_SEGMENT - : partitionsSpec.getMaxRowsPerSegment(); - LOG.info("effective maxRowsPerSegment is: " + effectiveMaxRowsPerSegment); + if (cardinalityRunner.getReports().isEmpty()) { + String msg = "No valid rows for hash partitioning." + + " All rows may have invalid timestamps or have been filtered out."; + LOG.warn(msg); + return TaskStatus.success(getId(), msg); + } + + if (partitionsSpec.getNumShards() == null) { + int effectiveMaxRowsPerSegment = partitionsSpec.getMaxRowsPerSegment() == null + ? PartitionsSpec.DEFAULT_MAX_ROWS_PER_SEGMENT + : partitionsSpec.getMaxRowsPerSegment(); + LOG.info("effective maxRowsPerSegment is: " + effectiveMaxRowsPerSegment); - if (cardinalityRunner.getReports() == null) { - throw new ISE("Could not determine cardinalities for hash partitioning."); + intervalToNumShards = determineNumShardsFromCardinalityReport( + cardinalityRunner.getReports().values(), + effectiveMaxRowsPerSegment + ); + } else { + intervalToNumShards = CollectionUtils.mapValues( + mergeCardinalityReports(cardinalityRunner.getReports().values()), + k -> partitionsSpec.getNumShards() + ); } - numShardsOverride = determineNumShardsFromCardinalityReport( - cardinalityRunner.getReports().values(), - effectiveMaxRowsPerSegment - ); - LOG.info("Automatically determined numShards: " + numShardsOverride); + ingestionSchemaToUse = rewriteIngestionSpecWithIntervalsIfMissing( + ingestionSchemaToUse, + intervalToNumShards.keySet() + ); } else { - numShardsOverride = null; + // numShards will be determined in PartialHashSegmentGenerateTask + intervalToNumShards = null; } // 1. Partial segment generation phase + final ParallelIndexIngestionSpec segmentCreateIngestionSpec = ingestionSchemaToUse; ParallelIndexTaskRunner> indexingRunner = createRunner( toolbox, - f -> createPartialHashSegmentGenerateRunner(toolbox, numShardsOverride) + f -> createPartialHashSegmentGenerateRunner(toolbox, segmentCreateIngestionSpec, intervalToNumShards) ); state = runNextPhase(indexingRunner); @@ -600,9 +635,10 @@ private TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throw partitionToLocations ); + final ParallelIndexIngestionSpec segmentMergeIngestionSpec = ingestionSchemaToUse; final ParallelIndexTaskRunner mergeRunner = createRunner( toolbox, - tb -> createPartialGenericSegmentMergeRunner(tb, ioConfigs) + tb -> createPartialGenericSegmentMergeRunner(tb, ioConfigs, segmentMergeIngestionSpec) ); state = runNextPhase(mergeRunner); if (state.isSuccess()) { @@ -615,6 +651,7 @@ private TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throw private TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) throws Exception { + ParallelIndexIngestionSpec ingestionSchemaToUse = ingestionSchema; ParallelIndexTaskRunner distributionRunner = createRunner( toolbox, @@ -631,13 +668,22 @@ private TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) thro if (intervalToPartitions.isEmpty()) { String msg = "No valid rows for single dimension partitioning." - + " All rows may have invalid timestamps or multiple dimension values."; + + " All rows may have invalid timestamps or multiple dimension values."; LOG.warn(msg); return TaskStatus.success(getId(), msg); } + ingestionSchemaToUse = rewriteIngestionSpecWithIntervalsIfMissing( + ingestionSchemaToUse, + intervalToPartitions.keySet() + ); + + final ParallelIndexIngestionSpec segmentCreateIngestionSpec = ingestionSchemaToUse; ParallelIndexTaskRunner> indexingRunner = - createRunner(toolbox, tb -> createPartialRangeSegmentGenerateRunner(tb, intervalToPartitions)); + createRunner( + toolbox, + tb -> createPartialRangeSegmentGenerateRunner(tb, intervalToPartitions, segmentCreateIngestionSpec) + ); TaskState indexingState = runNextPhase(indexingRunner); if (indexingState.isFailure()) { @@ -652,9 +698,10 @@ private TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) thro partitionToLocations ); + final ParallelIndexIngestionSpec segmentMergeIngestionSpec = ingestionSchemaToUse; ParallelIndexTaskRunner mergeRunner = createRunner( toolbox, - tb -> createPartialGenericSegmentMergeRunner(tb, ioConfigs) + tb -> createPartialGenericSegmentMergeRunner(tb, ioConfigs, segmentMergeIngestionSpec) ); TaskState mergeState = runNextPhase(mergeRunner); if (mergeState.isSuccess()) { @@ -664,48 +711,45 @@ private TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) thro return TaskStatus.fromCode(getId(), mergeState); } - @VisibleForTesting - public static int determineNumShardsFromCardinalityReport( - Collection reports, - int maxRowsPerSegment - ) + private static Map mergeCardinalityReports(Collection reports) { - // aggregate all the sub-reports Map finalCollectors = new HashMap<>(); reports.forEach(report -> { Map intervalToCardinality = report.getIntervalToCardinalities(); for (Map.Entry entry : intervalToCardinality.entrySet()) { - Union union = finalCollectors.computeIfAbsent( - entry.getKey(), - (key) -> { - return new Union(DimensionCardinalityReport.HLL_SKETCH_LOG_K); - } - ); HllSketch entryHll = HllSketch.wrap(Memory.wrap(entry.getValue())); - union.update(entryHll); + finalCollectors.computeIfAbsent( + entry.getKey(), + k -> new Union(DimensionCardinalityReport.HLL_SKETCH_LOG_K) + ).update(entryHll); } }); + return finalCollectors; + } - // determine the highest cardinality in any interval - long maxCardinality = 0; - for (Union union : finalCollectors.values()) { - maxCardinality = Math.max(maxCardinality, (long) union.getEstimate()); - } - - LOG.info("Estimated max cardinality: " + maxCardinality); - - // determine numShards based on maxRowsPerSegment and the highest per-interval cardinality - long numShards = maxCardinality / maxRowsPerSegment; - if (maxCardinality % maxRowsPerSegment != 0) { - // if there's a remainder add 1 so we stay under maxRowsPerSegment - numShards += 1; - } - try { - return Math.toIntExact(numShards); - } - catch (ArithmeticException ae) { - throw new ISE("Estimated numShards [%s] exceeds integer bounds.", numShards); - } + @VisibleForTesting + public static Map determineNumShardsFromCardinalityReport( + Collection reports, + int maxRowsPerSegment + ) + { + // aggregate all the sub-reports + Map finalCollectors = mergeCardinalityReports(reports); + + return CollectionUtils.mapValues( + finalCollectors, + union -> { + final double estimatedCardinality = union.getEstimate(); + // determine numShards based on maxRowsPerSegment and the cardinality + final long estimatedNumShards = Math.round(estimatedCardinality / maxRowsPerSegment); + try { + return Math.max(Math.toIntExact(estimatedNumShards), 1); + } + catch (ArithmeticException ae) { + throw new ISE("Estimated numShards [%s] exceeds integer bounds.", estimatedNumShards); + } + } + ); } private Map determineAllRangePartitions(Collection reports) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java index f6759ecd272d..48be3e7059b0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import org.apache.datasketches.hll.HllSketch; import org.apache.druid.data.input.InputFormat; @@ -32,12 +33,12 @@ import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.task.AbstractBatchIndexTask; import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider; import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.java.util.common.granularity.Granularity; -import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.segment.incremental.ParseExceptionHandler; import org.apache.druid.segment.incremental.RowIngestionMeters; @@ -52,11 +53,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask { public static final String TYPE = "partial_dimension_cardinality"; - private static final Logger LOG = new Logger(PartialDimensionCardinalityTask.class); private final int numAttempts; private final ParallelIndexIngestionSpec ingestionSchema; @@ -125,10 +126,14 @@ public String getType() @Override public boolean isReady(TaskActionClient taskActionClient) throws Exception { - return tryTimeChunkLock( - taskActionClient, - getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals() - ); + if (!getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals().isEmpty()) { + return tryTimeChunkLock( + new SurrogateTaskActionClient(supervisorTaskId, taskActionClient), + getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals() + ); + } else { + return true; + } } @Override @@ -159,6 +164,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception tuningConfig.getMaxParseExceptions(), tuningConfig.getMaxSavedParseExceptions() ); + final boolean determineIntervals = granularitySpec.inputIntervals().isEmpty(); try ( final CloseableIterator inputRowIterator = AbstractBatchIndexTask.inputSourceReader( @@ -166,7 +172,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception dataSchema, inputSource, inputFormat, - AbstractBatchIndexTask.defaultRowFilter(granularitySpec), + determineIntervals ? Objects::nonNull : AbstractBatchIndexTask.defaultRowFilter(granularitySpec), buildSegmentsMeters, parseExceptionHandler ); @@ -197,8 +203,15 @@ private Map determineCardinalities( InputRow inputRow = inputRowIterator.next(); // null rows are filtered out by FilteringCloseableInputRowIterator DateTime timestamp = inputRow.getTimestamp(); - //noinspection OptionalGetWithoutIsPresent (InputRowIterator returns rows with present intervals) - Interval interval = granularitySpec.bucketInterval(timestamp).get(); + final Interval interval; + if (granularitySpec.inputIntervals().isEmpty()) { + interval = granularitySpec.getSegmentGranularity().bucket(inputRow.getTimestamp()); + } else { + final Optional optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp()); + // this interval must exist since it passed the rowFilter + assert optInterval.isPresent(); + interval = optInterval.get(); + } Granularity queryGranularity = granularitySpec.getQueryGranularity(); HllSketch hllSketch = intervalToCardinalities.computeIfAbsent( diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java index 53019dda5b4b..6bec35d303f5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import com.google.common.hash.BloomFilter; @@ -34,6 +35,7 @@ import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec; import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.task.AbstractBatchIndexTask; import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider; @@ -55,6 +57,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.function.Supplier; /** @@ -163,10 +166,14 @@ public String getType() @Override public boolean isReady(TaskActionClient taskActionClient) throws Exception { - return tryTimeChunkLock( - taskActionClient, - getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals() - ); + if (!getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals().isEmpty()) { + return tryTimeChunkLock( + new SurrogateTaskActionClient(supervisorTaskId, taskActionClient), + getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals() + ); + } else { + return true; + } } @Override @@ -195,6 +202,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception tuningConfig.getMaxParseExceptions(), tuningConfig.getMaxSavedParseExceptions() ); + final boolean determineIntervals = granularitySpec.inputIntervals().isEmpty(); try ( final CloseableIterator inputRowIterator = AbstractBatchIndexTask.inputSourceReader( @@ -202,7 +210,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception dataSchema, inputSource, inputFormat, - AbstractBatchIndexTask.defaultRowFilter(granularitySpec), + determineIntervals ? Objects::nonNull : AbstractBatchIndexTask.defaultRowFilter(granularitySpec), buildSegmentsMeters, parseExceptionHandler ); @@ -243,10 +251,15 @@ private Map determineDistribution( continue; } - DateTime timestamp = inputRow.getTimestamp(); - - //noinspection OptionalGetWithoutIsPresent (InputRowIterator returns rows with present intervals) - Interval interval = granularitySpec.bucketInterval(timestamp).get(); + final Interval interval; + if (granularitySpec.inputIntervals().isEmpty()) { + interval = granularitySpec.getSegmentGranularity().bucket(inputRow.getTimestamp()); + } else { + final Optional optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp()); + // this interval must exist since it passed the rowFilter + assert optInterval.isPresent(); + interval = optInterval.get(); + } String partitionDimensionValue = Iterables.getOnlyElement(inputRow.getDimension(partitionDimension)); if (inputRowFilter.accept(interval, partitionDimensionValue, inputRow)) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateParallelIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateParallelIndexTaskRunner.java index 39024a1bbd7b..83586062a54d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateParallelIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateParallelIndexTaskRunner.java @@ -21,7 +21,9 @@ import org.apache.druid.data.input.InputSplit; import org.apache.druid.indexing.common.TaskToolbox; +import org.joda.time.Interval; +import javax.annotation.Nullable; import java.util.Map; /** @@ -32,7 +34,8 @@ class PartialHashSegmentGenerateParallelIndexTaskRunner { private static final String PHASE_NAME = "partial segment generation"; - private Integer numShardsOverride; + @Nullable + private final Map intervalToNumShardsOverride; PartialHashSegmentGenerateParallelIndexTaskRunner( TaskToolbox toolbox, @@ -40,11 +43,11 @@ class PartialHashSegmentGenerateParallelIndexTaskRunner String groupId, ParallelIndexIngestionSpec ingestionSchema, Map context, - Integer numShardsOverride + @Nullable Map intervalToNumShardsOverride ) { super(toolbox, taskId, groupId, ingestionSchema, context); - this.numShardsOverride = numShardsOverride; + this.intervalToNumShardsOverride = intervalToNumShardsOverride; } @Override @@ -82,7 +85,7 @@ public PartialHashSegmentGenerateTask newSubTask(int numAttempts) numAttempts, subTaskIngestionSpec, context, - numShardsOverride + intervalToNumShardsOverride ); } }; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java index 3b1f7ba4cd07..b252e5dfb5ee 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.task.SegmentAllocatorForBatch; import org.apache.druid.indexing.common.task.SegmentAllocators; @@ -57,7 +58,8 @@ public class PartialHashSegmentGenerateTask extends PartialSegmentGenerateTask intervalToNumShardsOverride; @JsonCreator public PartialHashSegmentGenerateTask( @@ -69,7 +71,7 @@ public PartialHashSegmentGenerateTask( @JsonProperty("numAttempts") final int numAttempts, // zero-based counting @JsonProperty(PROP_SPEC) final ParallelIndexIngestionSpec ingestionSchema, @JsonProperty("context") final Map context, - @Nullable @JsonProperty("numShardsOverride") final Integer numShardsOverride + @JsonProperty("intervalToNumShardsOverride") @Nullable final Map intervalToNumShardsOverride ) { super( @@ -85,7 +87,7 @@ public PartialHashSegmentGenerateTask( this.numAttempts = numAttempts; this.ingestionSchema = ingestionSchema; this.supervisorTaskId = supervisorTaskId; - this.numShardsOverride = numShardsOverride; + this.intervalToNumShardsOverride = intervalToNumShardsOverride; } @JsonProperty @@ -106,6 +108,13 @@ public String getSupervisorTaskId() return supervisorTaskId; } + @Nullable + @JsonProperty + public Map getIntervalToNumShardsOverride() + { + return intervalToNumShardsOverride; + } + @Override public String getType() { @@ -116,7 +125,7 @@ public String getType() public boolean isReady(TaskActionClient taskActionClient) throws Exception { return tryTimeChunkLock( - taskActionClient, + new SurrogateTaskActionClient(supervisorTaskId, taskActionClient), getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals() ); } @@ -134,7 +143,11 @@ SegmentAllocatorForBatch createSegmentAllocator(TaskToolbox toolbox, ParallelInd getId(), granularitySpec, new SupervisorTaskAccess(supervisorTaskId, taskClient), - createHashPartitionAnalysisFromPartitionsSpec(granularitySpec, partitionsSpec, numShardsOverride) + createHashPartitionAnalysisFromPartitionsSpec( + granularitySpec, + partitionsSpec, + intervalToNumShardsOverride + ) ); } @@ -170,22 +183,24 @@ private GenericPartitionStat createPartitionStat(TaskToolbox toolbox, DataSegmen public static HashPartitionAnalysis createHashPartitionAnalysisFromPartitionsSpec( GranularitySpec granularitySpec, @Nonnull HashedPartitionsSpec partitionsSpec, - @Nullable Integer numShardsOverride + @Nullable Map intervalToNumShardsOverride ) { - final SortedSet intervals = granularitySpec.bucketIntervals().get(); + final HashPartitionAnalysis partitionAnalysis = new HashPartitionAnalysis(partitionsSpec); - final int numBucketsPerInterval; - if (numShardsOverride != null) { - numBucketsPerInterval = numShardsOverride; + if (intervalToNumShardsOverride != null) { + // Some intervals populated from granularitySpec can be missing in intervalToNumShardsOverride + // because intervalToNumShardsOverride contains only the intervals which exist in input data. + // We only care about the intervals in intervalToNumShardsOverride here. + intervalToNumShardsOverride.forEach(partitionAnalysis::updateBucket); } else { - numBucketsPerInterval = partitionsSpec.getNumShards() == null - ? 1 - : partitionsSpec.getNumShards(); - } + final SortedSet intervals = granularitySpec.bucketIntervals().get(); + final int numBucketsPerInterval = partitionsSpec.getNumShards() == null + ? 1 + : partitionsSpec.getNumShards(); - final HashPartitionAnalysis partitionAnalysis = new HashPartitionAnalysis(partitionsSpec); - intervals.forEach(interval -> partitionAnalysis.updateBucket(interval, numBucketsPerInterval)); + intervals.forEach(interval -> partitionAnalysis.updateBucket(interval, numBucketsPerInterval)); + } return partitionAnalysis; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java index 57978f4b8afc..98c84cf4fe53 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java @@ -25,6 +25,7 @@ import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec; import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.task.SegmentAllocatorForBatch; import org.apache.druid.indexing.common.task.SegmentAllocators; @@ -135,9 +136,12 @@ public String getType() } @Override - public boolean isReady(TaskActionClient taskActionClient) + public boolean isReady(TaskActionClient taskActionClient) throws IOException { - return true; + return tryTimeChunkLock( + new SurrogateTaskActionClient(supervisorTaskId, taskActionClient), + getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals() + ); } @Override diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java index ec8530bcf201..a69827cfb6b3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java @@ -19,6 +19,7 @@ package org.apache.druid.indexing.common.task.batch.parallel; +import com.google.common.base.Preconditions; import org.apache.druid.data.input.InputSource; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.partitions.PartitionsSpec; @@ -81,6 +82,10 @@ abstract class PartialSegmentGenerateTask e context ); + Preconditions.checkArgument( + !ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals().isEmpty(), + "Missing intervals in granularitySpec" + ); this.ingestionSchema = ingestionSchema; this.supervisorTaskId = supervisorTaskId; this.inputRowIteratorBuilder = inputRowIteratorBuilder; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java index 099b24af37ca..fad7b07f8314 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java @@ -101,6 +101,10 @@ abstract class PartialSegmentMergeTask intervalToNumShards = ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport( reports, 1 ); - Assert.assertEquals(4L, numShards); + Assert.assertEquals( + ImmutableMap.of( + Intervals.of("1970-01-01/P1D"), + 4, + Intervals.of("1970-01-02/P1D"), + 1 + ), + intervalToNumShards + ); - numShards = ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport( + intervalToNumShards = ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport( reports, 2 ); - Assert.assertEquals(2L, numShards); + Assert.assertEquals( + ImmutableMap.of( + Intervals.of("1970-01-01/P1D"), + 2, + Intervals.of("1970-01-02/P1D"), + 1 + ), + intervalToNumShards + ); - numShards = ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport( + intervalToNumShards = ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport( reports, 3 ); - Assert.assertEquals(2L, numShards); + Assert.assertEquals( + ImmutableMap.of( + Intervals.of("1970-01-01/P1D"), + 1, + Intervals.of("1970-01-02/P1D"), + 1 + ), + intervalToNumShards + ); - numShards = ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport( + intervalToNumShards = ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport( reports, 4 ); - Assert.assertEquals(1L, numShards); + Assert.assertEquals( + ImmutableMap.of( + Intervals.of("1970-01-01/P1D"), + 1, + Intervals.of("1970-01-02/P1D"), + 1 + ), + intervalToNumShards + ); - numShards = ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport( + intervalToNumShards = ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport( reports, 5 ); - Assert.assertEquals(1L, numShards); + Assert.assertEquals( + ImmutableMap.of( + Intervals.of("1970-01-01/P1D"), + 1, + Intervals.of("1970-01-02/P1D"), + 1 + ), + intervalToNumShards + ); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java index 268958444261..e725ab9a25a1 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java @@ -31,8 +31,10 @@ import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexing.common.LockGranularity; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.query.scan.ScanResultValue; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec; @@ -86,35 +88,44 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh private static final Interval INTERVAL_TO_INDEX = Intervals.of("2017-12/P1M"); @Parameterized.Parameters( - name = "lockGranularity={0}, useInputFormatApi={1}, maxNumConcurrentSubTasks={2}, numShards={3}" + name = "lockGranularity={0}, useInputFormatApi={1}, maxNumConcurrentSubTasks={2}, intervalToIndex={3}, numShards={4}" ) public static Iterable constructorFeeder() { return ImmutableList.of( - new Object[]{LockGranularity.TIME_CHUNK, false, 2, 2}, - new Object[]{LockGranularity.TIME_CHUNK, true, 2, 2}, - new Object[]{LockGranularity.TIME_CHUNK, true, 1, 2}, - new Object[]{LockGranularity.SEGMENT, true, 2, 2}, - new Object[]{LockGranularity.TIME_CHUNK, true, 2, null}, - new Object[]{LockGranularity.TIME_CHUNK, true, 1, null}, - new Object[]{LockGranularity.SEGMENT, true, 2, null} + new Object[]{LockGranularity.TIME_CHUNK, false, 2, INTERVAL_TO_INDEX, 2}, + new Object[]{LockGranularity.TIME_CHUNK, true, 2, INTERVAL_TO_INDEX, 2}, + new Object[]{LockGranularity.TIME_CHUNK, true, 2, null, 2}, + new Object[]{LockGranularity.TIME_CHUNK, true, 1, INTERVAL_TO_INDEX, 2}, + new Object[]{LockGranularity.SEGMENT, true, 2, INTERVAL_TO_INDEX, 2}, + new Object[]{LockGranularity.TIME_CHUNK, true, 2, INTERVAL_TO_INDEX, null}, + new Object[]{LockGranularity.TIME_CHUNK, true, 2, null, null}, + new Object[]{LockGranularity.TIME_CHUNK, true, 1, INTERVAL_TO_INDEX, null}, + new Object[]{LockGranularity.SEGMENT, true, 2, INTERVAL_TO_INDEX, null} ); } private final int maxNumConcurrentSubTasks; + @Nullable + private final Interval intervalToIndex; + @Nullable private final Integer numShards; private File inputDir; + // sorted input intervals + private List inputIntervals; public HashPartitionMultiPhaseParallelIndexingTest( LockGranularity lockGranularity, boolean useInputFormatApi, int maxNumConcurrentSubTasks, + @Nullable Interval intervalToIndex, @Nullable Integer numShards ) { super(lockGranularity, useInputFormatApi); this.maxNumConcurrentSubTasks = maxNumConcurrentSubTasks; + this.intervalToIndex = intervalToIndex; this.numShards = numShards; } @@ -122,6 +133,7 @@ public HashPartitionMultiPhaseParallelIndexingTest( public void setup() throws IOException { inputDir = temporaryFolder.newFolder("data"); + final Set intervals = new HashSet<>(); // set up data for (int i = 0; i < 10; i++) { try (final Writer writer = @@ -129,6 +141,8 @@ public void setup() throws IOException for (int j = 0; j < 10; j++) { writer.write(StringUtils.format("2017-12-%d,%d,%d th test file\n", j + 1, i + 10, i)); writer.write(StringUtils.format("2017-12-%d,%d,%d th test file\n", j + 2, i + 11, i)); + intervals.add(SEGMENT_GRANULARITY.bucket(DateTimes.of(StringUtils.format("2017-12-%d", j + 1)))); + intervals.add(SEGMENT_GRANULARITY.bucket(DateTimes.of(StringUtils.format("2017-12-%d", j + 2)))); } } } @@ -139,33 +153,70 @@ public void setup() throws IOException writer.write(StringUtils.format("2017-12-%d,%d,%d th test file\n", i + 1, i + 10, i)); } } + inputIntervals = new ArrayList<>(intervals); + inputIntervals.sort(Comparators.intervalsByStartThenEnd()); } @Test public void testRun() throws Exception { + final Integer maxRowsPerSegment = numShards == null ? 10 : null; final Set publishedSegments = runTestTask( - new HashedPartitionsSpec(null, numShards, ImmutableList.of("dim1", "dim2")), + new HashedPartitionsSpec( + maxRowsPerSegment, + numShards, + ImmutableList.of("dim1", "dim2") + ), TaskState.SUCCESS, false ); - // we don't specify maxRowsPerSegment so it defaults to DEFAULT_MAX_ROWS_PER_SEGMENT, - // which is 5 million, so assume that there will only be 1 shard if numShards is not set. - int expectedSegmentCount = numShards != null ? numShards : 1; - - assertHashedPartition(publishedSegments, expectedSegmentCount); + final Map expectedIntervalToNumSegments = computeExpectedIntervalToNumSegments( + maxRowsPerSegment, + numShards + ); + assertHashedPartition(publishedSegments, expectedIntervalToNumSegments); } @Test public void testRunWithHashPartitionFunction() throws Exception { + final Integer maxRowsPerSegment = numShards == null ? 10 : null; final Set publishedSegments = runTestTask( - new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2"), HashPartitionFunction.MURMUR3_32_ABS), + new HashedPartitionsSpec( + maxRowsPerSegment, + numShards, + ImmutableList.of("dim1", "dim2"), + HashPartitionFunction.MURMUR3_32_ABS + ), TaskState.SUCCESS, false ); - assertHashedPartition(publishedSegments, 2); + final Map expectedIntervalToNumSegments = computeExpectedIntervalToNumSegments( + maxRowsPerSegment, + numShards + ); + assertHashedPartition(publishedSegments, expectedIntervalToNumSegments); + } + + private Map computeExpectedIntervalToNumSegments( + @Nullable Integer maxRowsPerSegment, + @Nullable Integer numShards + ) + { + final Map expectedIntervalToNumSegments = new HashMap<>(); + for (int i = 0; i < inputIntervals.size(); i++) { + if (numShards == null) { + if (i == 0 || i == inputIntervals.size() - 1) { + expectedIntervalToNumSegments.put(inputIntervals.get(i), Math.round((float) 10 / maxRowsPerSegment)); + } else { + expectedIntervalToNumSegments.put(inputIntervals.get(i), Math.round((float) 20 / maxRowsPerSegment)); + } + } else { + expectedIntervalToNumSegments.put(inputIntervals.get(i), numShards); + } + } + return expectedIntervalToNumSegments; } @Test @@ -236,7 +287,7 @@ private Set runTestTask( DIMENSIONS_SPEC, INPUT_FORMAT, null, - INTERVAL_TO_INDEX, + intervalToIndex, inputDir, "test_*", partitionsSpec, @@ -250,7 +301,7 @@ private Set runTestTask( null, null, PARSE_SPEC, - INTERVAL_TO_INDEX, + intervalToIndex, inputDir, "test_*", partitionsSpec, @@ -261,15 +312,21 @@ private Set runTestTask( } } - private void assertHashedPartition(Set publishedSegments, int expectedNumSegments) throws IOException + private void assertHashedPartition( + Set publishedSegments, + Map expectedIntervalToNumSegments + ) throws IOException { final Map> intervalToSegments = new HashMap<>(); publishedSegments.forEach( segment -> intervalToSegments.computeIfAbsent(segment.getInterval(), k -> new ArrayList<>()).add(segment) ); + Assert.assertEquals(new HashSet<>(inputIntervals), intervalToSegments.keySet()); final File tempSegmentDir = temporaryFolder.newFolder(); - for (List segmentsInInterval : intervalToSegments.values()) { - Assert.assertEquals(expectedNumSegments, segmentsInInterval.size()); + for (Entry> entry : intervalToSegments.entrySet()) { + Interval interval = entry.getKey(); + List segmentsInInterval = entry.getValue(); + Assert.assertEquals(expectedIntervalToNumSegments.get(interval).intValue(), segmentsInInterval.size()); for (DataSegment segment : segmentsInInterval) { Assert.assertSame(HashBasedNumberedShardSpec.class, segment.getShardSpec().getClass()); final HashBasedNumberedShardSpec shardSpec = (HashBasedNumberedShardSpec) segment.getShardSpec(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java index a80e3a8452b7..b4a712580062 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java @@ -91,25 +91,6 @@ public void serde() throws IOException Assert.assertEquals(task, OBJECT_MAPPER.readValue(json, Task.class)); } - @Test - public void forceGuaranteedRollupWithMissingIntervals() - { - expectedException.expect(IllegalStateException.class); - expectedException.expectMessage( - "forceGuaranteedRollup is set but intervals is missing in granularitySpec" - ); - - Integer numShards = 2; - new ParallelIndexSupervisorTaskBuilder() - .ingestionSpec( - new ParallelIndexIngestionSpecBuilder() - .forceGuaranteedRollup(true) - .partitionsSpec(new HashedPartitionsSpec(null, numShards, null)) - .build() - ) - .build(); - } - @Test public void forceGuaranteedRollupWithHashPartitionsMissingNumShards() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTaskTest.java index 0ad4ddea2b55..0d65f861ec7a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTaskTest.java @@ -108,19 +108,6 @@ public void requiresHashedPartitions() .build(); } - @Test - public void requiresGranularitySpecInputIntervals() - { - exception.expect(IllegalArgumentException.class); - exception.expectMessage("Missing intervals in granularitySpec"); - - DataSchema dataSchema = ParallelIndexTestingFactory.createDataSchema(Collections.emptyList()); - - new PartialDimensionCardinalityTaskBuilder() - .dataSchema(dataSchema) - .build(); - } - @Test public void serializesDeserializes() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java index 87de8c21e240..7dbb6e3450c8 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java @@ -111,19 +111,6 @@ public void requiresSingleDimensionPartitions() .build(); } - @Test - public void requiresGranularitySpecInputIntervals() - { - exception.expect(IllegalArgumentException.class); - exception.expectMessage("Missing intervals in granularitySpec"); - - DataSchema dataSchema = ParallelIndexTestingFactory.createDataSchema(Collections.emptyList()); - - new PartialDimensionDistributionTaskBuilder() - .dataSchema(dataSchema) - .build(); - } - @Test public void serializesDeserializes() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTaskTest.java index 71a48589fce5..a579c18db92d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTaskTest.java @@ -24,7 +24,9 @@ import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.util.Collections; @@ -54,6 +56,9 @@ public class PartialGenericSegmentMergeTaskTest extends AbstractParallelIndexSup .build() ); + @Rule + public ExpectedException exception = ExpectedException.none(); + private PartialGenericSegmentMergeTask target; @Before @@ -82,4 +87,27 @@ public void hasCorrectPrefixForAutomaticId() String id = target.getId(); Assert.assertThat(id, Matchers.startsWith(PartialGenericSegmentMergeTask.TYPE)); } + + @Test + public void requiresGranularitySpecInputIntervals() + { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Missing intervals in granularitySpec"); + + new PartialGenericSegmentMergeTask( + ParallelIndexTestingFactory.AUTOMATIC_ID, + ParallelIndexTestingFactory.GROUP_ID, + ParallelIndexTestingFactory.TASK_RESOURCE, + ParallelIndexTestingFactory.SUPERVISOR_TASK_ID, + ParallelIndexTestingFactory.NUM_ATTEMPTS, + new PartialGenericSegmentMergeIngestionSpec( + ParallelIndexTestingFactory.createDataSchema(null), + IO_CONFIG, + new ParallelIndexTestingFactory.TuningConfigBuilder() + .partitionsSpec(PARTITIONS_SPEC) + .build() + ), + ParallelIndexTestingFactory.CONTEXT + ); + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTaskTest.java index ac32c8135f53..845fc44bda5b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTaskTest.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.data.input.impl.LocalInputSource; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; @@ -33,10 +34,13 @@ import org.joda.time.Interval; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.io.File; import java.util.List; +import java.util.Map; public class PartialHashSegmentGenerateTaskTest { @@ -48,6 +52,9 @@ public class PartialHashSegmentGenerateTaskTest ParallelIndexTestingFactory.createDataSchema(ParallelIndexTestingFactory.INPUT_INTERVALS) ); + @Rule + public ExpectedException expectedException = ExpectedException.none(); + private PartialHashSegmentGenerateTask target; @Before @@ -102,4 +109,62 @@ public void testCreateHashPartitionAnalysisFromPartitionsSpecWithNumShardsReturn Assert.assertEquals(expectedNumBuckets, partitionAnalysis.getBucketAnalysis(interval).intValue()); } } + + @Test + public void testCreateHashPartitionAnalysisFromPartitionsSpecWithNumShardsMap() + { + final List intervals = ImmutableList.of( + Intervals.of("2020-01-01/2020-01-02"), + Intervals.of("2020-01-02/2020-01-03"), + Intervals.of("2020-01-03/2020-01-04") + ); + final Map intervalToNumShards = ImmutableMap.of( + Intervals.of("2020-01-01/2020-01-02"), + 1, + Intervals.of("2020-01-02/2020-01-03"), + 2, + Intervals.of("2020-01-03/2020-01-04"), + 3 + ); + final HashPartitionAnalysis partitionAnalysis = PartialHashSegmentGenerateTask + .createHashPartitionAnalysisFromPartitionsSpec( + new UniformGranularitySpec( + Granularities.DAY, + Granularities.NONE, + intervals + ), + new HashedPartitionsSpec(null, null, null), + intervalToNumShards + ); + Assert.assertEquals(intervals.size(), partitionAnalysis.getNumTimePartitions()); + for (Interval interval : intervals) { + Assert.assertEquals( + intervalToNumShards.get(interval).intValue(), + partitionAnalysis.getBucketAnalysis(interval).intValue() + ); + } + } + + @Test + public void requiresGranularitySpecInputIntervals() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Missing intervals in granularitySpec"); + + new PartialHashSegmentGenerateTask( + ParallelIndexTestingFactory.AUTOMATIC_ID, + ParallelIndexTestingFactory.GROUP_ID, + ParallelIndexTestingFactory.TASK_RESOURCE, + ParallelIndexTestingFactory.SUPERVISOR_TASK_ID, + ParallelIndexTestingFactory.NUM_ATTEMPTS, + ParallelIndexTestingFactory.createIngestionSpec( + new LocalInputSource(new File("baseDir"), "filer"), + new JsonInputFormat(null, null, null), + new ParallelIndexTestingFactory.TuningConfigBuilder().build(), + ParallelIndexTestingFactory.createDataSchema(null) + ), + ParallelIndexTestingFactory.CONTEXT, + null + ); + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTaskTest.java index af4cdf31107b..783aadcc12f3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTaskTest.java @@ -63,17 +63,6 @@ public void succeedsWithUnspecifiedNumShards() .build(); } - @Test - public void requiresGranularitySpecInputIntervals() - { - exception.expect(IllegalArgumentException.class); - exception.expectMessage("Missing intervals in granularitySpec"); - - new PerfectRollupWorkerTaskBuilder() - .granularitySpecInputIntervals(Collections.emptyList()) - .build(); - } - @Test public void succeedsWithValidPartitionsSpec() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java index 20e4f347de79..598aed906db7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java @@ -108,15 +108,16 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP 0 ); - @Parameterized.Parameters(name = "{0}, useInputFormatApi={1}, maxNumConcurrentSubTasks={2}, useMultiValueDim={3}") + @Parameterized.Parameters(name = "{0}, useInputFormatApi={1}, maxNumConcurrentSubTasks={2}, useMultiValueDim={3}, intervalToIndex={4}") public static Iterable constructorFeeder() { return ImmutableList.of( - new Object[]{LockGranularity.TIME_CHUNK, !USE_INPUT_FORMAT_API, 2, !USE_MULTIVALUE_DIM}, - new Object[]{LockGranularity.TIME_CHUNK, USE_INPUT_FORMAT_API, 2, !USE_MULTIVALUE_DIM}, - new Object[]{LockGranularity.SEGMENT, USE_INPUT_FORMAT_API, 2, !USE_MULTIVALUE_DIM}, - new Object[]{LockGranularity.SEGMENT, USE_INPUT_FORMAT_API, 1, !USE_MULTIVALUE_DIM}, // will spawn subtask - new Object[]{LockGranularity.SEGMENT, USE_INPUT_FORMAT_API, 2, USE_MULTIVALUE_DIM} // expected to fail + new Object[]{LockGranularity.TIME_CHUNK, !USE_INPUT_FORMAT_API, 2, !USE_MULTIVALUE_DIM, INTERVAL_TO_INDEX}, + new Object[]{LockGranularity.TIME_CHUNK, USE_INPUT_FORMAT_API, 2, !USE_MULTIVALUE_DIM, INTERVAL_TO_INDEX}, + new Object[]{LockGranularity.TIME_CHUNK, USE_INPUT_FORMAT_API, 2, !USE_MULTIVALUE_DIM, null}, + new Object[]{LockGranularity.SEGMENT, USE_INPUT_FORMAT_API, 2, !USE_MULTIVALUE_DIM, INTERVAL_TO_INDEX}, + new Object[]{LockGranularity.SEGMENT, USE_INPUT_FORMAT_API, 1, !USE_MULTIVALUE_DIM, INTERVAL_TO_INDEX}, // will spawn subtask + new Object[]{LockGranularity.SEGMENT, USE_INPUT_FORMAT_API, 2, USE_MULTIVALUE_DIM, INTERVAL_TO_INDEX} // expected to fail ); } @@ -132,17 +133,21 @@ public static Iterable constructorFeeder() private final int maxNumConcurrentSubTasks; private final boolean useMultivalueDim; + @Nullable + private final Interval intervalToIndex; public RangePartitionMultiPhaseParallelIndexingTest( LockGranularity lockGranularity, boolean useInputFormatApi, int maxNumConcurrentSubTasks, - boolean useMultivalueDim + boolean useMultivalueDim, + @Nullable Interval intervalToIndex ) { super(lockGranularity, useInputFormatApi); this.maxNumConcurrentSubTasks = maxNumConcurrentSubTasks; this.useMultivalueDim = useMultivalueDim; + this.intervalToIndex = intervalToIndex; } @Before @@ -309,7 +314,7 @@ private Set runTestTask( DIMENSIONS_SPEC, INPUT_FORMAT, null, - INTERVAL_TO_INDEX, + intervalToIndex, inputDir, TEST_FILE_NAME_PREFIX + "*", partitionsSpec, @@ -323,7 +328,7 @@ private Set runTestTask( null, null, PARSE_SPEC, - INTERVAL_TO_INDEX, + intervalToIndex, inputDir, TEST_FILE_NAME_PREFIX + "*", partitionsSpec,