diff --git a/docs/data-management/manual-compaction.md b/docs/data-management/manual-compaction.md index e6e34dba82ef..4c13d4b40f24 100644 --- a/docs/data-management/manual-compaction.md +++ b/docs/data-management/manual-compaction.md @@ -117,9 +117,9 @@ The compaction `ioConfig` requires specifying `inputSpec` as follows: |Field|Description|Default|Required| |-----|-----------|-------|--------| |`type`|Task type. Set the value to `compact`.|none|Yes| -|`inputSpec`|Specification of the target [interval](#interval-inputspec) or [segments](#segments-inputspec).|none|Yes| -|`dropExisting`|If `true`, the task replaces all existing segments fully contained by either of the following:
- the `interval` in the `interval` type `inputSpec`.
- the umbrella interval of the `segments` in the `segment` type `inputSpec`.
If compaction fails, Druid does not change any of the existing segments.
**WARNING**: `dropExisting` in `ioConfig` is a beta feature. |false|No| -|`allowNonAlignedInterval`|If `true`, the task allows an explicit [`segmentGranularity`](#compaction-granularity-spec) that is not aligned with the provided [interval](#interval-inputspec) or [segments](#segments-inputspec). This parameter is only used if [`segmentGranularity`](#compaction-granularity-spec) is explicitly provided.

This parameter is provided for backwards compatibility. In most scenarios it should not be set, as it can lead to data being accidentally overshadowed. This parameter may be removed in a future release.|false|No| +|`inputSpec`|Specification of the target [interval](#interval-inputspec) or [minor](#minor-inputspec).|none|Yes| +|`dropExisting`|If `true`, the task replaces all existing segments fully contained by the `interval` in the `interval` type `inputSpec`. If compaction fails, Druid does not change any of the existing segments.
**WARNING**: `dropExisting` in `ioConfig` is a beta feature. |false|No| +|`allowNonAlignedInterval`|If `true`, the task allows an explicit [`segmentGranularity`](#compaction-granularity-spec) that is not aligned with the provided [interval](#interval-inputspec) or [minor](#minor-inputspec). This parameter is only used if [`segmentGranularity`](#compaction-granularity-spec) is explicitly provided.

This parameter is provided for backwards compatibility. In most scenarios it should not be set, as it can lead to data being accidentally overshadowed. This parameter may be removed in a future release.|false|No| The compaction task has two kinds of `inputSpec`: @@ -127,15 +127,55 @@ The compaction task has two kinds of `inputSpec`: |Field|Description|Required| |-----|-----------|--------| -|`type`|Task type. Set the value to `interval`.|Yes| +|`type`|Task type. Set the value to `interval` to trigger major compaction.|Yes| |`interval`|Interval to compact.|Yes| -### Segments `inputSpec` +### Minor `inputSpec` |Field|Description|Required| |-----|-----------|--------| -|`type`|Task type. Set the value to `segments`.|Yes| -|`segments`|A list of segment IDs.|Yes| +|`type`|Task type. Set the value to `minor` to trigger minor compaction.|Yes| +|`interval`|Interval to compact.|Yes| +|`segments`|A list of segment descriptors.|Yes| + +The required segment descriptor fields can be retrieved from the "Segments" section in the web console. + +|Field|Description|Required| +|-----|-----------|--------| +|`itvl`|Interval of segment to compact.|Yes| +|`ver`|Version of the segment.|Yes| +|`part`|Partition number of the segment.|Yes| + +#### Example minor inputSpec + +```json +{ + "type": "minor", + "interval": "2020-01-01T00:00:00.000Z/2020-01-01T01:00:00.000Z", + "segments": [ + { + "itvl": "2020-01-01T00:00:00.000Z/2020-01-01T01:00:00.000Z", + "ver": "2020-01-01T00:07:18.186Z", + "part": 0 + }, + { + "itvl": "2020-01-01T00:00:00.000Z/2020-01-01T01:00:00.000Z", + "ver": "2020-01-01T00:07:18.186Z", + "part": 1 + } + ] +} +``` + +When using the minor `inputSpec`, the task compacts only the specified segments. Segments in the same interval that are not in the spec are upgraded in place rather than compacted. This allows compacting a subset of segments while preserving others. + +There are some requirements when triggering a minor compaction: +- Set `useConcurrentLocks: true` in the task context. Minor compaction uses REPLACE locks over the entire interval. +- Set `dropExisting: true` in the ioConfig. + +### Segment `inputSpec` + +No longer documented as this is deprecated. Please use `interval` or `minor` specs instead. ## Compaction dimensions spec diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 6cbdb1dc6a40..82310272b369 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -223,6 +223,23 @@ public CompactionTask( //noinspection ConstantConditions this.ioConfig = new CompactionIOConfig(SpecificSegmentsSpec.fromSegments(segments), false, null); } + + if (ioConfig != null && ioConfig.getInputSpec() != null && ioConfig.getInputSpec() instanceof MinorCompactionInputSpec) { + if (computeCompactionIngestionMode(ioConfig) != IngestionMode.REPLACE) { + throw DruidException.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.INVALID_INPUT) + .build("Minor compaction is only used with REPLACE ingestion mode. Please set ioconfig[isDropExisting] to true."); + } + + boolean usingConcurrentLocks = this.getContextValue(Tasks.USE_CONCURRENT_LOCKS, Tasks.DEFAULT_USE_CONCURRENT_LOCKS); + + if (!usingConcurrentLocks) { + throw DruidException.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.INVALID_INPUT) + .build("Minor compaction is only used with REPLACE ingestion mode. Please set ioconfig[%s] to true.", Tasks.USE_CONCURRENT_LOCKS); + } + } + this.dimensionsSpec = dimensionsSpec == null ? dimensions : dimensionsSpec; this.transformSpec = transformSpec; this.metricsSpec = metricsSpec; @@ -1284,6 +1301,11 @@ static Granularity compareWithCurrent(Granularity queryGranularity, Granularity } } + /** + * Provides segment discovery and validation for compaction. + * For minor compaction (MinorCompactionInputSpec), finds all segments + * in the interval and partitions them into 'compact and upgrade metadata' vs 'upgrade metadata only' via {@link #shouldUpgradeSegment}. + */ @VisibleForTesting static class SegmentProvider { @@ -1292,7 +1314,7 @@ static class SegmentProvider private final Interval interval; private final boolean minorCompaction; - private final Set uncompactedSegments; + private final Set segmentsToCompact; SegmentProvider(String dataSource, CompactionInputSpec inputSpec) { @@ -1301,17 +1323,17 @@ static class SegmentProvider this.interval = inputSpec.findInterval(dataSource); if (inputSpec instanceof MinorCompactionInputSpec) { minorCompaction = true; - uncompactedSegments = Set.copyOf(((MinorCompactionInputSpec) inputSpec).getUncompactedSegments()); + segmentsToCompact = Set.copyOf(((MinorCompactionInputSpec) inputSpec).getSegments()); } else { minorCompaction = false; - uncompactedSegments = null; + segmentsToCompact = null; } } private boolean shouldUpgradeSegment(DataSegment s) { if (minorCompaction) { - return !uncompactedSegments.contains(s.toDescriptor()) && this.interval.contains(s.getInterval()); + return !segmentsToCompact.contains(s.toDescriptor()) && this.interval.contains(s.getInterval()); } else { return false; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MinorCompactionInputSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MinorCompactionInputSpec.java index 0176386bf075..16fbcd4e5ca7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MinorCompactionInputSpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MinorCompactionInputSpec.java @@ -38,40 +38,40 @@ */ public class MinorCompactionInputSpec implements CompactionInputSpec { - public static final String TYPE = "uncompacted"; + public static final String TYPE = "minor"; private final Interval interval; - private final List uncompactedSegments; + private final List segments; @JsonCreator public MinorCompactionInputSpec( @JsonProperty("interval") Interval interval, - @JsonProperty("uncompactedSegments") List uncompactedSegments + @JsonProperty("segments") List segments ) { - InvalidInput.conditionalException(interval != null, "Uncompacted interval must not be null"); + InvalidInput.conditionalException(interval != null, "Minor compaction interval must not be null"); InvalidInput.conditionalException( interval.toDurationMillis() > 0, - "Uncompacted interval[%s] is empty, must specify a nonempty interval", + "Minor compaction interval[%s] is empty, must specify a nonempty interval", interval ); InvalidInput.conditionalException( - uncompactedSegments != null && !uncompactedSegments.isEmpty(), - "Uncompacted segments must not be null or empty" + segments != null && !segments.isEmpty(), + "Minor compaction specified segments must not be null or empty" ); // Validate that all segments are within the interval List segmentsNotInInterval = - uncompactedSegments.stream().filter(s -> !interval.contains(s.getInterval())).collect(Collectors.toList()); + segments.stream().filter(s -> !interval.contains(s.getInterval())).collect(Collectors.toList()); InvalidInput.conditionalException( segmentsNotInInterval.isEmpty(), - "All uncompacted segments must be within interval[%s], got segments outside interval: %s", + "All segments must be within interval[%s], got segments outside interval: %s", interval, segmentsNotInInterval ); this.interval = interval; - this.uncompactedSegments = uncompactedSegments; + this.segments = segments; } @JsonProperty @@ -81,9 +81,9 @@ public Interval getInterval() } @JsonProperty - public List getUncompactedSegments() + public List getSegments() { - return uncompactedSegments; + return segments; } @Override @@ -112,13 +112,13 @@ public boolean equals(Object o) } MinorCompactionInputSpec that = (MinorCompactionInputSpec) o; return Objects.equals(interval, that.interval) && - Objects.equals(uncompactedSegments, that.uncompactedSegments); + Objects.equals(segments, that.segments); } @Override public int hashCode() { - return Objects.hash(interval, uncompactedSegments); + return Objects.hash(interval, segments); } @Override @@ -126,7 +126,7 @@ public String toString() { return "MinorCompactionInputSpec{" + "interval=" + interval + - ", uncompactedSegments=" + uncompactedSegments + + ", segments=" + segments + '}'; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java index afde45cdb136..74a9b0aaef44 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java @@ -35,20 +35,24 @@ import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask; import org.apache.druid.indexing.input.DruidInputSource; +import org.apache.druid.indexing.input.WindowedSegmentId; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.spec.QuerySegmentSpec; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.server.coordinator.CompactionConfigValidationResult; import org.apache.druid.server.coordinator.duty.CompactSegments; +import org.apache.druid.timeline.SegmentId; import org.apache.druid.utils.CollectionUtils; import org.joda.time.Interval; import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -99,9 +103,14 @@ public CompactionConfigValidationResult validateCompactionTask( "Virtual columns in filter rules are not supported by the Native compaction engine. Use MSQ compaction engine instead." ); } + if (compactionTask.getIoConfig().getInputSpec() instanceof MinorCompactionInputSpec) { - return CompactionConfigValidationResult.failure( - "Minor compaction is not supported by Native compaction engine. Use MSQ compaction engine instead."); + boolean usingConcurrentLocks = compactionTask.getContextValue(Tasks.USE_CONCURRENT_LOCKS, Tasks.DEFAULT_USE_CONCURRENT_LOCKS); + if (!usingConcurrentLocks) { + return CompactionConfigValidationResult.failure( + "Task context[%s] must be true when using native minor compaction", Tasks.USE_CONCURRENT_LOCKS + ); + } } return CompactionConfigValidationResult.success(); } @@ -153,19 +162,32 @@ private static ParallelIndexIOConfig createIoConfig( SegmentCacheManagerFactory segmentCacheManagerFactory, CompactionIOConfig compactionIOConfig ) + { + return (compactionIOConfig.getInputSpec() instanceof MinorCompactionInputSpec) + ? createMinorCompactionIoConfig(toolbox, dataSchema, interval, coordinatorClient, segmentCacheManagerFactory, compactionIOConfig) + : createMajorCompactionIoConfig(toolbox, dataSchema, interval, coordinatorClient, segmentCacheManagerFactory, compactionIOConfig); + } + + private static ParallelIndexIOConfig createMajorCompactionIoConfig( + TaskToolbox toolbox, + DataSchema dataSchema, + Interval inputInterval, + CoordinatorClient coordinatorClient, + SegmentCacheManagerFactory segmentCacheManagerFactory, + CompactionIOConfig compactionIOConfig + ) { if (!compactionIOConfig.isAllowNonAlignedInterval()) { - // Validate interval alignment. final Granularity segmentGranularity = dataSchema.getGranularitySpec().getSegmentGranularity(); final Interval widenedInterval = Intervals.utc( - segmentGranularity.bucketStart(interval.getStart()).getMillis(), - segmentGranularity.bucketEnd(interval.getEnd().minus(1)).getMillis() + segmentGranularity.bucketStart(inputInterval.getStart()).getMillis(), + segmentGranularity.bucketEnd(inputInterval.getEnd().minus(1)).getMillis() ); - if (!interval.equals(widenedInterval)) { + if (!inputInterval.equals(widenedInterval)) { throw new IAE( "Interval[%s] to compact is not aligned with segmentGranularity[%s]", - interval, + inputInterval, segmentGranularity ); } @@ -174,7 +196,7 @@ private static ParallelIndexIOConfig createIoConfig( return new ParallelIndexIOConfig( new DruidInputSource( dataSchema.getDataSource(), - interval, + inputInterval, null, null, null, @@ -190,6 +212,67 @@ private static ParallelIndexIOConfig createIoConfig( ); } + private static ParallelIndexIOConfig createMinorCompactionIoConfig( + TaskToolbox toolbox, + DataSchema dataSchema, + Interval interval, + CoordinatorClient coordinatorClient, + SegmentCacheManagerFactory segmentCacheManagerFactory, + CompactionIOConfig compactionIOConfig + ) + { + final List segmentIds = resolveSegmentIdsForMinorCompaction( + (MinorCompactionInputSpec) compactionIOConfig.getInputSpec(), + dataSchema.getDataSource(), + interval + ); + + return new ParallelIndexIOConfig( + new DruidInputSource( + dataSchema.getDataSource(), + null, + segmentIds, + null, + null, + null, + toolbox.getIndexIO(), + coordinatorClient, + segmentCacheManagerFactory, + toolbox.getConfig() + ).withTaskToolbox(toolbox), + null, + false, + compactionIOConfig.isDropExisting() + ); + } + + /** + * When using {@link MinorCompactionInputSpec}, resolves segment descriptors to compact that belong + * to the given interval and returns them as {@link WindowedSegmentId} objects. + */ + private static List resolveSegmentIdsForMinorCompaction( + MinorCompactionInputSpec inputSpec, + String dataSource, + Interval interval + ) + { + final List segmentIds = new ArrayList<>(); + for (SegmentDescriptor desc : inputSpec.getSegments()) { + if (interval.contains(desc.getInterval())) { + final SegmentId segmentId = SegmentId.of( + dataSource, + desc.getInterval(), + desc.getVersion(), + desc.getPartitionNumber() + ); + segmentIds.add( + new WindowedSegmentId(segmentId.toString(), List.of(desc.getInterval())) + ); + } + } + return segmentIds; + } + @Override public TaskStatus runCompactionTasks( CompactionTask compactionTask, @@ -318,6 +401,10 @@ Map createContextForSubtask(CompactionTask compactionTask) newContext.putIfAbsent(CompactSegments.STORE_COMPACTION_STATE_KEY, STORE_COMPACTION_STATE); // Set the priority of the compaction task. newContext.put(Tasks.PRIORITY_KEY, compactionTask.getPriority()); + // Native minor compaction uses REPLACE ingestion mode, which uses time chunk lock. + if (compactionTask.getIoConfig().getInputSpec() instanceof MinorCompactionInputSpec) { + newContext.put(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true); + } return newContext; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SpecificSegmentsSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SpecificSegmentsSpec.java index 7b26c22a4a54..93f181d0dcaf 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SpecificSegmentsSpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SpecificSegmentsSpec.java @@ -33,6 +33,10 @@ import java.util.Objects; import java.util.stream.Collectors; +/** + * @deprecated Use {@link MinorCompactionInputSpec} for minor compaction instead. + */ +@Deprecated public class SpecificSegmentsSpec implements CompactionInputSpec { public static final String TYPE = "segments"; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index 3a51d5b9fe21..10c27c0b268c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -29,9 +29,6 @@ import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -50,6 +47,7 @@ import org.apache.druid.data.input.impl.LongDimensionSchema; import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.error.DruidException; import org.apache.druid.guice.GuiceAnnotationIntrospector; import org.apache.druid.guice.GuiceInjectableValues; import org.apache.druid.guice.GuiceInjectors; @@ -62,9 +60,12 @@ import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TestUtils; +import org.apache.druid.indexing.common.actions.LocalTaskActionClient; +import org.apache.druid.indexing.common.actions.MarkSegmentToUpgradeAction; import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.actions.TaskAction; import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.common.actions.TaskActionTestKit; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.config.TaskConfigBuilder; import org.apache.druid.indexing.common.task.CompactionTask.Builder; @@ -73,6 +74,7 @@ import org.apache.druid.indexing.common.task.NativeCompactionRunner.PartitionConfigurationManager; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig; import org.apache.druid.indexing.input.DruidInputSource; import org.apache.druid.jackson.DefaultObjectMapper; @@ -97,6 +99,7 @@ import org.apache.druid.query.aggregation.firstlast.last.DoubleLastAggregatorFactory; import org.apache.druid.query.expression.TestExprMacroTable; import org.apache.druid.query.filter.SelectorDimFilter; +import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.query.spec.QuerySegmentSpec; import org.apache.druid.segment.AutoTypeColumnSchema; import org.apache.druid.segment.IndexIO; @@ -186,7 +189,7 @@ public class CompactionTaskTest private static final String TIMESTAMP_COLUMN = "timestamp"; private static final String MIXED_TYPE_COLUMN = "string_to_double"; private static final Interval COMPACTION_INTERVAL = Intervals.of("2017-01-01/2017-07-01"); - private static final List SEGMENT_INTERVALS = ImmutableList.of( + private static final List SEGMENT_INTERVALS = List.of( Intervals.of("2017-01-01/2017-02-01"), Intervals.of("2017-02-01/2017-03-01"), Intervals.of("2017-03-01/2017-04-01"), @@ -271,7 +274,7 @@ public static void setupClass() DATA_SOURCE, SEGMENT_INTERVALS.get(i), "version_" + i, - ImmutableMap.of(), + Map.of(), findDimensions(i, SEGMENT_INTERVALS.get(i)), AGGREGATORS.stream().map(AggregatorFactory::getName).collect(Collectors.toList()), new NumberedShardSpec(0, 1), @@ -299,7 +302,7 @@ private static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper objectMa ); GuiceInjectableValues injectableValues = new GuiceInjectableValues( GuiceInjectors.makeStartupInjectorWithModules( - ImmutableList.of( + List.of( binder -> { binder.bind(AuthorizerMapper.class).toInstance(AuthTestUtils.TEST_AUTHORIZER_MAPPER); binder.bind(ChatHandlerProvider.class).toInstance(new NoopChatHandlerProvider()); @@ -359,6 +362,9 @@ private static CompactionTask.CompactionTuningConfig createTuningConfig() @Rule public ExpectedException expectedException = ExpectedException.none(); + @Rule + public TaskActionTestKit taskActionTestKit = new TaskActionTestKit(); + private StubServiceEmitter emitter; @Before @@ -520,7 +526,7 @@ public void testSerdeWithInterval() throws IOException new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS)) ) .tuningConfig(createTuningConfig()) - .context(ImmutableMap.of("testKey", "testContext")) + .context(Map.of("testKey", "testContext")) .build(); final byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(task); @@ -538,7 +544,7 @@ public void testSerdeWithSegments() throws IOException final CompactionTask task = builder .segments(SEGMENTS) .tuningConfig(createTuningConfig()) - .context(ImmutableMap.of("testKey", "testContext")) + .context(Map.of("testKey", "testContext")) .build(); final byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(task); @@ -558,7 +564,7 @@ public void testSerdeWithDimensions() throws IOException .segments(SEGMENTS) .dimensionsSpec( new DimensionsSpec( - ImmutableList.of( + List.of( new StringDimensionSchema("dim1"), new StringDimensionSchema("dim2"), new StringDimensionSchema("dim3") @@ -566,7 +572,7 @@ public void testSerdeWithDimensions() throws IOException ) ) .tuningConfig(createTuningConfig()) - .context(ImmutableMap.of("testKey", "testVal")) + .context(Map.of("testKey", "testVal")) .build(); final byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(task); @@ -675,7 +681,7 @@ public void testInputSourceResources() new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS)) ) .tuningConfig(createTuningConfig()) - .context(ImmutableMap.of("testKey", "testContext")) + .context(Map.of("testKey", "testContext")) .build(); Assert.assertTrue(task.getInputSourceResources().isEmpty()); @@ -783,7 +789,7 @@ public void testSegmentProviderFindSegmentsWithEmptySegmentsThrowException() expectedException.expectMessage( "No segments found for compaction. Please check that datasource name and interval are correct." ); - provider.checkSegments(LockGranularity.TIME_CHUNK, ImmutableList.of()); + provider.checkSegments(LockGranularity.TIME_CHUNK, List.of()); } @Test @@ -1233,11 +1239,10 @@ public void testMissingMetadata() throws IOException final TestIndexIO indexIO = (TestIndexIO) toolbox.getIndexIO(); indexIO.removeMetadata(Iterables.getFirst(indexIO.getQueryableIndexMap().keySet(), null)); - final List segments = new ArrayList<>(SEGMENTS); final Map inputSchemas = CompactionTask.createInputDataSchemas( toolbox, LockGranularity.TIME_CHUNK, - new SegmentProvider(DATA_SOURCE, SpecificSegmentsSpec.fromSegments(segments)), + new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), null, null, null, @@ -1298,7 +1303,7 @@ public void testSegmentGranularityAndNullQueryGranularity() throws IOException COORDINATOR_CLIENT, segmentCacheManagerFactory ); - final List expectedDimensionsSpec = ImmutableList.of( + final List expectedDimensionsSpec = List.of( new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double"))) ); @@ -1393,7 +1398,7 @@ public void testQueryGranularityAndSegmentGranularityNonNull() throws IOExceptio ); - final List expectedDimensionsSpec = ImmutableList.of( + final List expectedDimensionsSpec = List.of( new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double"))) ); @@ -1626,7 +1631,7 @@ public void testMSQRangePartitionWithNoDimensionsSpecNeedsMVDInfo() new DimensionRangePartitionsSpec( 3, null, - ImmutableList.of( + List.of( "string_dim_1"), false )) @@ -1647,7 +1652,7 @@ public void testMSQRollupOnStringNeedsMVDInfo() builder.granularitySpec(new ClientCompactionTaskGranularitySpec(null, null, true)); DimensionSchema stringDim = new StringDimensionSchema("string_dim_1", null, null); - builder.dimensionsSpec(new DimensionsSpec(ImmutableList.of(stringDim))); + builder.dimensionsSpec(new DimensionsSpec(List.of(stringDim))); final CompactionTask compactionTask = builder.build(); // A string dimension with rollup=true should need MVD info Assert.assertTrue(compactionTask.identifyMultiValuedDimensions()); @@ -1670,12 +1675,12 @@ public void testMSQRangePartitionOnStringNeedsMVDInfo() new DimensionRangePartitionsSpec( 3, null, - ImmutableList.of( + List.of( stringDim.getName()), false )) .build()); - builder.dimensionsSpec(new DimensionsSpec(ImmutableList.of(stringDim))); + builder.dimensionsSpec(new DimensionsSpec(List.of(stringDim))); CompactionTask compactionTask = builder.build(); Assert.assertTrue(compactionTask.identifyMultiValuedDimensions()); } @@ -1702,7 +1707,7 @@ public void testMSQRangePartitionOnAutoStringDoesNotNeedMVDInfo() ) ) .build()); - builder.dimensionsSpec(new DimensionsSpec(ImmutableList.of(stringDim))); + builder.dimensionsSpec(new DimensionsSpec(List.of(stringDim))); CompactionTask compactionTask = builder.build(); Assert.assertFalse(compactionTask.identifyMultiValuedDimensions()); } @@ -1727,7 +1732,7 @@ public void testChooseFinestGranularityWithNulls() @Test public void testChooseFinestGranularityNone() { - List input = ImmutableList.of( + List input = List.of( Granularities.DAY, Granularities.SECOND, Granularities.MINUTE, @@ -1792,7 +1797,7 @@ private Granularity chooseFinestGranularityHelper(List granularitie private static List getExpectedDimensionsSpecForAutoGeneration() { - return ImmutableList.of( + return List.of( new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double", DimensionSchema.MultiValueHandling.ARRAY, null))), new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double", DimensionSchema.MultiValueHandling.ARRAY, null))), new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double", DimensionSchema.MultiValueHandling.ARRAY, null))), @@ -1955,7 +1960,7 @@ public ListenableFuture> fetchUsedSegments( List intervals ) { - return Futures.immediateFuture(ImmutableList.copyOf(segmentMap.keySet())); + return Futures.immediateFuture(List.copyOf(segmentMap.keySet())); } } @@ -2032,6 +2037,236 @@ public void drop(DataSegment segment) .build(); } + @Test + public void testMinorCompactionChecksIfSegmentsToCompactIsEmpty() + { + Assert.assertThrows( + DruidException.class, + () -> new MinorCompactionInputSpec(COMPACTION_INTERVAL, List.of()) + ); + } + + @Test + public void testMinorCompactionShouldAlwaysUseReplaceIngestionMode() + { + final Interval testInterval = Intervals.of("2024-11-18T00:00:00.000Z/2024-11-25T00:00:00.000Z"); + final String version = "2024-11-17T23:49:06.823Z"; + final DataSegment segment = createSegmentWithPartition(testInterval, version, 1); + + final MinorCompactionInputSpec minorSpec = new MinorCompactionInputSpec( + testInterval, + List.of(segment.toDescriptor()) + ); + + Assert.assertThrows( + DruidException.class, + // Setting dropExisting == false disables REPLACE mode. + () -> new Builder(DATA_SOURCE, segmentCacheManagerFactory) + .inputSpec(minorSpec, false) + .context(Map.of(Tasks.USE_CONCURRENT_LOCKS, true)).build() + ); + } + + @Test + public void testMinorCompactionUsesTimeChunkLockWithConcurrentLocks() throws Exception + { + final Interval testInterval = Intervals.of("2024-11-18T00:00:00.000Z/2024-11-25T00:00:00.000Z"); + final List segments = List.of( + createSegmentWithPartition(testInterval, "v1", 0), + createSegmentWithPartition(testInterval, "v1", 1) + ); + final MinorCompactionInputSpec spec = new MinorCompactionInputSpec( + testInterval, + segments.stream().map(DataSegment::toDescriptor).collect(Collectors.toList()) + ); + + final CompactionTask task = new Builder(DATA_SOURCE, segmentCacheManagerFactory) + .inputSpec(spec, true) + .context(Map.of(Tasks.USE_CONCURRENT_LOCKS, true)) + .build(); + + taskActionTestKit.getTaskLockbox().add(task); + final TaskActionClient taskActionClient = new LocalTaskActionClient( + task, + taskActionTestKit.getTaskActionToolbox() + ); + // Use a client that returns segments for RetrieveUsedSegmentsAction - wrap to inject segments + final TaskActionClient segmentAwareClient = new TaskActionClient() + { + @Override + public RetType submit(TaskAction action) throws IOException + { + if (action instanceof RetrieveUsedSegmentsAction) { + @SuppressWarnings("unchecked") + RetType retVal = (RetType) segments; + return retVal; + } + return taskActionClient.submit(action); + } + }; + + task.determineLockGranularityAndTryLock(segmentAwareClient, List.of(testInterval)); + Assert.assertEquals(LockGranularity.TIME_CHUNK, task.getTaskLockHelper().getLockGranularityToUse()); + } + + @Test + public void testNativeMinorCompactionSubtaskUsesTimeChunkLock() throws Exception + { + final Interval testInterval = Intervals.of("2024-11-18T00:00:00.000Z/2024-11-25T00:00:00.000Z"); + final List segments = List.of( + createSegmentWithPartition(testInterval, "v1", 0), + createSegmentWithPartition(testInterval, "v1", 1) + ); + final MinorCompactionInputSpec spec = new MinorCompactionInputSpec( + testInterval, + segments.stream().map(DataSegment::toDescriptor).collect(Collectors.toList()) + ); + + final CompactionTask compactionTask = new Builder(DATA_SOURCE, segmentCacheManagerFactory) + .inputSpec(spec, true) + .context(Map.of(Tasks.USE_CONCURRENT_LOCKS, true)) + .build(); + + final NativeCompactionRunner runner = new NativeCompactionRunner(segmentCacheManagerFactory); + final Map subtaskContext = runner.createContextForSubtask(compactionTask); + + final DataSchema dataSchema = DataSchema.builder() + .withDataSource(DATA_SOURCE) + .withTimestamp(new TimestampSpec(TIMESTAMP_COLUMN, null, null)) + .withDimensions( + new DimensionsSpec(List.of(new StringDimensionSchema("dim1"), new StringDimensionSchema("dim2"))) + ) + .withGranularity( + new UniformGranularitySpec(Granularities.DAY, Granularities.HOUR, false, List.of(testInterval)) + ) + .build(); + + final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( + Map.of(new MultipleIntervalSegmentSpec(List.of(testInterval)), dataSchema), + toolbox, + new CompactionIOConfig(spec, false, null), + new PartitionConfigurationManager(null), + COORDINATOR_CLIENT, + segmentCacheManagerFactory + ); + + final ParallelIndexSupervisorTask subtask = new ParallelIndexSupervisorTask( + "test_subtask", + "test_group", + null, + ingestionSpecs.get(0), + "base_0", + subtaskContext, + true + ); + + taskActionTestKit.getTaskLockbox().add(subtask); + final TaskActionClient taskActionClient = new LocalTaskActionClient( + subtask, + taskActionTestKit.getTaskActionToolbox() + ); + final TaskActionClient segmentAwareClient = new TaskActionClient() + { + @Override + public RetType submit(TaskAction action) throws IOException + { + if (action instanceof RetrieveUsedSegmentsAction) { + @SuppressWarnings("unchecked") + RetType retVal = (RetType) segments; + return retVal; + } + return taskActionClient.submit(action); + } + }; + subtask.determineLockGranularityAndTryLock(segmentAwareClient, List.of(testInterval)); + + Assert.assertEquals( + LockGranularity.TIME_CHUNK, + subtask.getTaskLockHelper().getLockGranularityToUse() + ); + } + + @Test + public void testSegmentProviderCheckSegmentsAllowsSubsetForTimeChunk() throws Exception + { + final Interval testInterval = Intervals.of("2024-11-18T00:00:00.000Z/2024-11-25T00:00:00.000Z"); + final List allSegments = List.of( + createSegmentWithPartition(testInterval, "v1", 0), + createSegmentWithPartition(testInterval, "v1", 1), + createSegmentWithPartition(testInterval, "v1", 2) + ); + final MinorCompactionInputSpec spec = new MinorCompactionInputSpec( + testInterval, + List.of(allSegments.get(0).toDescriptor(), allSegments.get(1).toDescriptor()) + ); + + final SegmentProvider provider = new SegmentProvider(DATA_SOURCE, spec); + final TestTaskActionClient client = new TestTaskActionClient(allSegments); + provider.findSegments(client); + + // Should not throw: specified segments (0,1) exist; segment 2 is not in spec but is in interval (will be upgraded) + provider.checkSegments(LockGranularity.TIME_CHUNK, allSegments); + } + + @Test + public void testDruidInputSourceReceivesSegmentIdsForMinorCompaction() + { + final Interval interval = Intervals.of("2024-01-01/2024-01-02"); + final List segments = List.of( + createSegmentWithPartition(interval, "v1", 0), + createSegmentWithPartition(interval, "v1", 1) + ); + final MinorCompactionInputSpec spec = new MinorCompactionInputSpec( + interval, + segments.stream().map(DataSegment::toDescriptor).collect(Collectors.toList()) + ); + + final DataSchema dataSchema = DataSchema.builder() + .withDataSource(DATA_SOURCE) + .withTimestamp(new TimestampSpec(TIMESTAMP_COLUMN, null, null)) + .withDimensions( + new DimensionsSpec( + List.of( + new StringDimensionSchema("dim1"), + new StringDimensionSchema("dim2") + ) + ) + ) + .withGranularity( + new UniformGranularitySpec( + Granularities.DAY, + Granularities.HOUR, + false, + List.of(interval) + ) + ) + .build(); + + final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( + Map.of(new MultipleIntervalSegmentSpec(List.of(interval)), dataSchema), + toolbox, + new CompactionIOConfig(spec, false, null), + new PartitionConfigurationManager(null), + COORDINATOR_CLIENT, + segmentCacheManagerFactory + ); + + Assert.assertEquals(1, ingestionSpecs.size()); + final InputSource inputSource = ingestionSpecs.get(0).getIOConfig().getInputSource(); + Assert.assertTrue(inputSource instanceof DruidInputSource); + final DruidInputSource druidInputSource = (DruidInputSource) inputSource; + Assert.assertNotNull(druidInputSource.getSegmentIds()); + Assert.assertEquals(2, druidInputSource.getSegmentIds().size()); + } + + private DataSegment createSegmentWithPartition(Interval interval, String version, int partitionNum) + { + return DataSegment.builder(SegmentId.of(DATA_SOURCE, interval, version, partitionNum)) + .shardSpec(new NumberedShardSpec(partitionNum, 0)) + .size(100) + .build(); + } + private static class TestTaskActionClient implements TaskActionClient { private final List segments; @@ -2045,10 +2280,13 @@ private static class TestTaskActionClient implements TaskActionClient @Override public RetType submit(TaskAction taskAction) { - if (!(taskAction instanceof RetrieveUsedSegmentsAction)) { - throw new ISE("action[%s] is not supported", taskAction); + if (taskAction instanceof RetrieveUsedSegmentsAction) { + return (RetType) segments; + } + if (taskAction instanceof MarkSegmentToUpgradeAction) { + return (RetType) Integer.valueOf(0); } - return (RetType) segments; + throw new ISE("action[%s] is not supported", taskAction); } } @@ -2315,7 +2553,7 @@ public String getType() @Override public Set getInputSourceResources() { - return ImmutableSet.of(); + return Set.of(); } @JsonProperty diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/MinorCompactionInputSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/MinorCompactionInputSpecTest.java index c5a6f2fda033..71c59bc4a4aa 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/MinorCompactionInputSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/MinorCompactionInputSpecTest.java @@ -47,7 +47,7 @@ public void testSerde() throws Exception Assert.assertEquals(spec, deserialized); Assert.assertEquals(interval, deserialized.getInterval()); - Assert.assertEquals(segments, deserialized.getUncompactedSegments()); + Assert.assertEquals(segments, deserialized.getSegments()); } @Test @@ -55,18 +55,18 @@ public void testDeserializeFromClientFormat() throws Exception { ObjectMapper mapper = new DefaultObjectMapper(); String clientJson = "{" - + "\"type\":\"uncompacted\"," + + "\"type\":\"minor\"," + "\"interval\":\"2015-04-11/2015-04-12\"," - + "\"uncompactedSegments\":[{\"itvl\":\"2015-04-11/2015-04-12\",\"ver\":\"v1\",\"part\":0}]" + + "\"segments\":[{\"itvl\":\"2015-04-11/2015-04-12\",\"ver\":\"v1\",\"part\":0}]" + "}"; MinorCompactionInputSpec deserialized = mapper.readValue(clientJson, MinorCompactionInputSpec.class); Assert.assertEquals(Intervals.of("2015-04-11/2015-04-12"), deserialized.getInterval()); - Assert.assertEquals(1, deserialized.getUncompactedSegments().size()); + Assert.assertEquals(1, deserialized.getSegments().size()); Assert.assertEquals( new SegmentDescriptor(Intervals.of("2015-04-11/2015-04-12"), "v1", 0), - deserialized.getUncompactedSegments().get(0) + deserialized.getSegments().get(0) ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskLockHelperTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskLockHelperTest.java new file mode 100644 index 000000000000..3a47c5a0ab71 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskLockHelperTest.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.partition.NumberedOverwriteShardSpec; +import org.apache.druid.timeline.partition.PartitionIds; +import org.joda.time.Interval; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.List; + +public class TaskLockHelperTest +{ + private static final String DATA_SOURCE = "test_datasource"; + private static final Interval TEST_INTERVAL = Intervals.of("2017-01-01/2017-01-02"); + private static final String TEST_VERSION = DateTimes.nowUtc().toString(); + + @Test + public void testVerifyNonConsecutiveSegmentsInInputFails() + { + // Test that non-consecutive segments within the input list fail. + // Compacting segments {0, 1, 3} should fail because root partition 2 is missing. + final List segments = ImmutableList.of( + createSegment(0, 0, 1, (short) 1, (short) 1), // rootPartitionRange [0, 1) + createSegment(1, 1, 2, (short) 1, (short) 1), // rootPartitionRange [1, 2) + createSegment(3, 3, 4, (short) 1, (short) 1) // rootPartitionRange [3, 4) + ); + + Assertions.assertThrows( + ISE.class, + () -> TaskLockHelper.verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(segments) + ); + } + + @Test + public void testVerifyConsecutiveSegmentsSucceedEvenIfOtherSegmentsMissing() + { + final List segments = ImmutableList.of( + createSegment(3, 3, 4, (short) 1, (short) 1), // rootPartitionRange [3, 4) + createSegment(4, 4, 5, (short) 1, (short) 1), // rootPartitionRange [4, 5) + createSegment(5, 5, 6, (short) 1, (short) 1) // rootPartitionRange [5, 6) + ); + + TaskLockHelper.verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(segments); + } + + @Test + public void testVerifyConsecutiveSegmentsStillWorks() + { + final List segments = ImmutableList.of( + createSegment(0, 0, 1, (short) 1, (short) 1), + createSegment(1, 1, 2, (short) 1, (short) 1), + createSegment(2, 2, 3, (short) 1, (short) 1) + ); + + TaskLockHelper.verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(segments); + } + + @Test + public void testVerifyLargeGapSegmentsFails() + { + final List segments = ImmutableList.of( + createSegment(0, 0, 1, (short) 1, (short) 1), + createSegment(1, 1, 2, (short) 1, (short) 1), + createSegment(10, 10, 11, (short) 1, (short) 1) + ); + + Assertions.assertThrows( + ISE.class, + () -> TaskLockHelper.verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(segments) + ); + } + + @Test + public void testVerifyAtomicUpdateGroupValidationStillWorks() + { + final List segments = ImmutableList.of( + createSegment(0, 0, 1, (short) 1, (short) 2), + createSegment(1, 0, 1, (short) 1, (short) 2) + ); + + TaskLockHelper.verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(segments); + } + + @Test + public void testVerifyAtomicUpdateGroupIncompleteFails() + { + final List segments = ImmutableList.of( + createSegment(0, 0, 1, (short) 1, (short) 3), + createSegment(1, 0, 1, (short) 1, (short) 3) + ); + + // Should throw ISE because atomicUpdateGroupSize is 3 but we only have 2 segments + Assertions.assertThrows( + ISE.class, + () -> TaskLockHelper.verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(segments) + ); + } + + @Test + public void testVerifyDifferentMinorVersionsFail() + { + // Test that segments with same root partition range but different minor versions fail + final List segments = ImmutableList.of( + createSegment(0, 0, 1, (short) 1, (short) 2), + createSegment(1, 0, 1, (short) 2, (short) 2) // Different minor version + ); + Assertions.assertThrows( + ISE.class, + () -> TaskLockHelper.verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(segments) + ); + } + + @Test + public void testVerifyDifferentAtomicUpdateGroupSizesFail() + { + // Test that segments with same root partition range but different atomicUpdateGroupSize fail + final List segments = ImmutableList.of( + createSegment(0, 0, 1, (short) 1, (short) 2), + createSegment(1, 0, 1, (short) 1, (short) 3) + ); + Assertions.assertThrows( + ISE.class, + () -> TaskLockHelper.verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(segments) + ); + } + + @Test + public void testVerifyEmptySegmentsList() + { + final List segments = Collections.emptyList(); + TaskLockHelper.verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(segments); + } + + @Test + public void testVerifySingleSegment() + { + final List segments = ImmutableList.of( + createSegment(0, 0, 1, (short) 1, (short) 1) + ); + TaskLockHelper.verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(segments); + } + + @Test + public void testVerifyDifferentIntervalsFail() + { + final Interval interval1 = Intervals.of("2017-01-01/2017-01-02"); + final Interval interval2 = Intervals.of("2017-01-02/2017-01-03"); + final List segments = ImmutableList.of( + DataSegment.builder(SegmentId.of(DATA_SOURCE, interval1, TEST_VERSION, 0)) + .shardSpec(new NumberedOverwriteShardSpec( + PartitionIds.NON_ROOT_GEN_START_PARTITION_ID, + 0, + 1, + (short) 1, + (short) 1 + )) + .size(0) + .build(), + DataSegment.builder(SegmentId.of(DATA_SOURCE, interval2, TEST_VERSION, 0)) + .shardSpec(new NumberedOverwriteShardSpec( + PartitionIds.NON_ROOT_GEN_START_PARTITION_ID + 1, + 1, + 2, + (short) 1, + (short) 1 + )) + .size(0) + .build() + ); + Assertions.assertThrows( + IllegalArgumentException.class, + () -> TaskLockHelper.verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(segments) + ); + } + + /** + * Helper method to create a test segment with NumberedOverwriteShardSpec + */ + private DataSegment createSegment( + int partitionId, + int startRootPartitionId, + int endRootPartitionId, + short minorVersion, + short atomicUpdateGroupSize + ) + { + return DataSegment.builder(SegmentId.of(DATA_SOURCE, TEST_INTERVAL, TEST_VERSION, partitionId)) + .shardSpec(new NumberedOverwriteShardSpec( + PartitionIds.NON_ROOT_GEN_START_PARTITION_ID + partitionId, + startRootPartitionId, + endRootPartitionId, + minorVersion, + atomicUpdateGroupSize + )) + .size(0) + .build(); + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialCompactionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialCompactionTest.java index 9ef316e8aaa2..882c1c73bb1b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialCompactionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialCompactionTest.java @@ -31,7 +31,8 @@ import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.task.CompactionTask; import org.apache.druid.indexing.common.task.CompactionTask.Builder; -import org.apache.druid.indexing.common.task.SpecificSegmentsSpec; +import org.apache.druid.indexing.common.task.MinorCompactionInputSpec; +import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.segment.DataSegmentsWithSchemas; @@ -50,8 +51,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; public class PartialCompactionTest extends AbstractMultiPhaseParallelIndexingTest { @@ -73,7 +77,7 @@ public class PartialCompactionTest extends AbstractMultiPhaseParallelIndexingTes public PartialCompactionTest() { - super(LockGranularity.SEGMENT, true, DEFAULT_TRANSIENT_TASK_FAILURE_RATE, DEFAULT_TRANSIENT_API_FAILURE_RATE); + super(LockGranularity.TIME_CHUNK, true, DEFAULT_TRANSIENT_TASK_FAILURE_RATE, DEFAULT_TRANSIENT_API_FAILURE_RATE); } @Before @@ -137,8 +141,14 @@ public void testPartialCompactHashAndDynamicPartitionedSegments() ); } final CompactionTask compactionTask = newCompactionTaskBuilder() - .inputSpec(SpecificSegmentsSpec.fromSegments(segmentsToCompact)) + .inputSpec( + new MinorCompactionInputSpec( + INTERVAL_TO_INDEX, + segmentsToCompact.stream().map(DataSegment::toDescriptor).collect(Collectors.toList()) + ), true + ) .tuningConfig(newTuningConfig(new DynamicPartitionsSpec(20, null), 2, false)) + .context(Map.of(Tasks.USE_CONCURRENT_LOCKS, true)) .build(); dataSegmentsWithSchemas = runTask(compactionTask, TaskState.SUCCESS); verifySchema(dataSegmentsWithSchemas); @@ -197,8 +207,14 @@ public void testPartialCompactRangeAndDynamicPartitionedSegments() ); } final CompactionTask compactionTask = newCompactionTaskBuilder() - .inputSpec(SpecificSegmentsSpec.fromSegments(segmentsToCompact)) + .inputSpec( + new MinorCompactionInputSpec( + INTERVAL_TO_INDEX, + segmentsToCompact.stream().map(DataSegment::toDescriptor).collect(Collectors.toList()) + ), true + ) .tuningConfig(newTuningConfig(new DynamicPartitionsSpec(20, null), 2, false)) + .context(Map.of(Tasks.USE_CONCURRENT_LOCKS, true)) .build(); dataSegmentsWithSchemas = runTask(compactionTask, TaskState.SUCCESS); @@ -213,6 +229,138 @@ public void testPartialCompactRangeAndDynamicPartitionedSegments() } } + @Test + public void testMinorCompactionUpgradesNonCompactedSegments() + { + DataSegmentsWithSchemas dataSegmentsWithSchemas = runTestTask( + new HashedPartitionsSpec(null, 4, null), + TaskState.SUCCESS, + false + ); + verifySchema(dataSegmentsWithSchemas); + final Map> hashPartitionedSegments = + SegmentUtils.groupSegmentsByInterval(dataSegmentsWithSchemas.getSegments()); + hashPartitionedSegments.values().forEach( + segmentsInInterval -> segmentsInInterval.sort( + Comparator.comparing(segment -> segment.getShardSpec().getPartitionNum()) + ) + ); + + final List segmentsToCompact = new ArrayList<>(); + for (List segmentsInInterval : hashPartitionedSegments.values()) { + segmentsToCompact.addAll(segmentsInInterval.subList(0, Math.min(2, segmentsInInterval.size()))); + } + final Set originalSegments = dataSegmentsWithSchemas.getSegments(); + final Set compactedSegmentIds = segmentsToCompact.stream() + .map(segment -> segment.getId().toString()) + .collect(Collectors.toSet()); + final Set nonCompactedSegmentIds = + originalSegments.stream() + .map(segment -> segment.getId().toString()) + .filter(segmentId -> !compactedSegmentIds.contains(segmentId)) + .collect(Collectors.toSet()); + Assert.assertFalse(nonCompactedSegmentIds.isEmpty()); + final Set originalSegmentIds = new HashSet<>(compactedSegmentIds); + originalSegmentIds.addAll(nonCompactedSegmentIds); + + final CompactionTask compactionTask = newCompactionTaskBuilder() + .inputSpec( + new MinorCompactionInputSpec( + INTERVAL_TO_INDEX, + segmentsToCompact.stream().map(DataSegment::toDescriptor).collect(Collectors.toList()) + ), true + ) + .tuningConfig(newTuningConfig(new DynamicPartitionsSpec(20, null), 2, false)) + .context(Map.of(Tasks.USE_CONCURRENT_LOCKS, true)) + .build(); + dataSegmentsWithSchemas = runTask(compactionTask, TaskState.SUCCESS); + verifySchema(dataSegmentsWithSchemas); + + // Check published segment set after compaction + final Set publishedAfterCompaction = dataSegmentsWithSchemas.getSegments(); + Assert.assertFalse(SegmentUtils.groupSegmentsByInterval(publishedAfterCompaction).isEmpty()); + + final Set finalSegmentIds = publishedAfterCompaction.stream() + .map(segment -> segment.getId().toString()) + .collect(Collectors.toSet()); + + final Map upgradedFromSegmentIdMap = + getStorageCoordinator().retrieveUpgradedFromSegmentIds(DATASOURCE, finalSegmentIds); + Assert.assertFalse(upgradedFromSegmentIdMap.isEmpty()); + Assert.assertTrue(upgradedFromSegmentIdMap.values().stream().noneMatch(compactedSegmentIds::contains)); + Assert.assertTrue(originalSegmentIds.containsAll(upgradedFromSegmentIdMap.values())); + for (final String successorSegmentId : upgradedFromSegmentIdMap.keySet()) { + Assert.assertTrue(finalSegmentIds.contains(successorSegmentId)); + } + + // Validate new segment ids (replacements and/or upgraded replicas) + final Set newPublishedSegmentIds = new HashSet<>(finalSegmentIds); + newPublishedSegmentIds.removeAll(originalSegmentIds); + Assert.assertFalse(newPublishedSegmentIds.isEmpty()); + Assert.assertTrue( + newPublishedSegmentIds.stream().anyMatch(id -> !upgradedFromSegmentIdMap.containsKey(id)) + ); + + // Index newly published ids by day for compacted-source checks below. + final Map> newSegmentIdsByInterval = + publishedAfterCompaction.stream() + .filter(segment -> !segment.isTombstone() + && newPublishedSegmentIds.contains(segment.getId().toString()) + ) + .collect(Collectors.groupingBy( + DataSegment::getInterval, + Collectors.mapping( + segment -> segment.getId().toString(), + Collectors.toCollection(HashSet::new) + ) + )); + + // Verify non-compacted segments are being replaced + for (final String parentSegmentId : nonCompactedSegmentIds) { + final List successorSegmentIds = upgradedFromSegmentIdMap.entrySet() + .stream() + .filter(e -> parentSegmentId.equals(e.getValue())) + .map(Map.Entry::getKey) + .toList(); + if (finalSegmentIds.contains(parentSegmentId)) { + Assert.assertTrue(successorSegmentIds.isEmpty()); + } else if (!successorSegmentIds.isEmpty()) { + Assert.assertEquals(1, successorSegmentIds.size()); + Assert.assertTrue(finalSegmentIds.contains(successorSegmentIds.get(0))); + } + } + + // Verify compacted segments have new published ID + for (final DataSegment compactedSource : segmentsToCompact) { + final String compactedSourceId = compactedSource.getId().toString(); + Assert.assertFalse(finalSegmentIds.contains(compactedSourceId)); + final Set newIdsInSameInterval = newSegmentIdsByInterval.getOrDefault(compactedSource.getInterval(), Set.of()); + Assert.assertFalse(newIdsInSameInterval.isEmpty()); + } + + // non-compacted parents removed from published set match retrieveUpgradedToSegmentIds + final Set removedNonCompactedParentIds = + nonCompactedSegmentIds.stream().filter(id -> !finalSegmentIds.contains(id)).collect(Collectors.toSet()); + if (!removedNonCompactedParentIds.isEmpty()) { + final Map> upgradedToSegmentIdsByParent = + getStorageCoordinator().retrieveUpgradedToSegmentIds(DATASOURCE, removedNonCompactedParentIds); + for (final String parentSegmentId : removedNonCompactedParentIds) { + final Set expectedSuccessorIds = upgradedFromSegmentIdMap.entrySet() + .stream() + .filter(e -> parentSegmentId.equals(e.getValue())) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + if (expectedSuccessorIds.isEmpty()) { + continue; + } + final Set coordinatorSuccessorIds = + new HashSet<>(upgradedToSegmentIdsByParent.getOrDefault(parentSegmentId, Set.of())); + coordinatorSuccessorIds.remove(parentSegmentId); + Assert.assertTrue(coordinatorSuccessorIds.containsAll(expectedSuccessorIds)); + } + } + } + private DataSegmentsWithSchemas runTestTask( PartitionsSpec partitionsSpec, TaskState expectedTaskState, diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientMinorCompactionInputSpec.java b/server/src/main/java/org/apache/druid/client/indexing/ClientMinorCompactionInputSpec.java index 751ec8d462a1..50e5dd44bd5e 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientMinorCompactionInputSpec.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientMinorCompactionInputSpec.java @@ -35,22 +35,22 @@ */ public class ClientMinorCompactionInputSpec extends ClientCompactionIntervalSpec { - public static final String TYPE = "uncompacted"; + public static final String TYPE = "minor"; - private final List uncompactedSegments; + private final List segments; @JsonCreator public ClientMinorCompactionInputSpec( @JsonProperty("interval") Interval interval, - @JsonProperty("uncompactedSegments") List uncompactedSegments + @JsonProperty("segments") List segments ) { super(interval, null); - if (uncompactedSegments == null || uncompactedSegments.isEmpty()) { - throw InvalidInput.exception("'uncompactedSegments' must be non-empty."); + if (segments == null || segments.isEmpty()) { + throw InvalidInput.exception("'segments' must be non-empty."); } else if (interval != null) { List segmentsNotInInterval = - uncompactedSegments.stream().filter(s -> !interval.contains(s.getInterval())).collect(Collectors.toList()); + segments.stream().filter(s -> !interval.contains(s.getInterval())).collect(Collectors.toList()); if (!segmentsNotInInterval.isEmpty()) { throw new IAE( "Can not supply segments outside interval[%s], got segments[%s].", @@ -59,13 +59,13 @@ public ClientMinorCompactionInputSpec( ); } } - this.uncompactedSegments = uncompactedSegments; + this.segments = segments; } @JsonProperty - public List getUncompactedSegments() + public List getSegments() { - return uncompactedSegments; + return segments; } @Override @@ -81,13 +81,13 @@ public boolean equals(Object object) return false; } ClientMinorCompactionInputSpec that = (ClientMinorCompactionInputSpec) object; - return Objects.equals(uncompactedSegments, that.uncompactedSegments); + return Objects.equals(segments, that.segments); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), uncompactedSegments); + return Objects.hash(super.hashCode(), segments); } @Override @@ -95,7 +95,7 @@ public String toString() { return "ClientMinorCompactionInputSpec{" + "interval=" + getInterval() + - ",uncompactedSegments=" + uncompactedSegments + + ",segments=" + segments + '}'; } } diff --git a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpecTest.java b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpecTest.java index 7e116a271eb0..c6a41d7fe811 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpecTest.java +++ b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpecTest.java @@ -136,7 +136,7 @@ public void testClientCompactionIntervalSpec_serde() throws Exception ObjectMapper mapper = new DefaultObjectMapper(); Interval interval = Intervals.of("2015-04-11/2015-04-12"); - // Test without uncompactedSegments (full compaction) + // Test without segments (full compaction) ClientCompactionIntervalSpec withoutSegments = new ClientCompactionIntervalSpec(interval, null); String json2 = mapper.writeValueAsString(withoutSegments); ClientCompactionIntervalSpec deserialized2 = mapper.readValue(json2, ClientCompactionIntervalSpec.class); @@ -152,12 +152,12 @@ public void testClientMinorCompactionInputSpec_serde() throws Exception new SegmentDescriptor(Intervals.of("2015-04-11/2015-04-12"), "v1", 0) ); - // Test with uncompactedSegments (minor compaction) + // Test with segments (minor compaction) ClientCompactionInputSpec withSegments = new ClientMinorCompactionInputSpec(interval, segments); String json1 = mapper.writeValueAsString(withSegments); ClientCompactionInputSpec deserialized1 = mapper.readValue(json1, ClientCompactionIntervalSpec.class); Assert.assertTrue(deserialized1 instanceof ClientMinorCompactionInputSpec); Assert.assertEquals(withSegments, deserialized1); - Assert.assertEquals(segments, ((ClientMinorCompactionInputSpec) deserialized1).getUncompactedSegments()); + Assert.assertEquals(segments, ((ClientMinorCompactionInputSpec) deserialized1).getSegments()); } }