From 51d3e4d2ae80c058774960fc9d9d561e942ecfd4 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Mon, 5 Apr 2021 14:55:36 -0700 Subject: [PATCH 01/11] Make dropExisting flag for Compaction configurable --- docs/ingestion/compaction.md | 10 ++- .../common/task/CompactionIOConfig.java | 20 ++++- .../indexing/common/task/CompactionTask.java | 23 ++++-- .../druid/indexing/common/task/IndexTask.java | 1 - .../indexing/ClientCompactionIOConfig.java | 22 ++++- .../indexing/HttpIndexingServiceClient.java | 3 +- .../indexing/IndexingServiceClient.java | 1 + .../druid/segment/indexing/IOConfig.java | 1 + .../DataSourceCompactionConfig.java | 12 +++ .../UserCompactionTaskIOConfig.java | 82 +++++++++++++++++++ .../coordinator/duty/CompactSegments.java | 6 ++ .../indexing/NoopIndexingServiceClient.java | 1 + .../DataSourceCompactionConfigTest.java | 8 ++ .../coordinator/duty/CompactSegmentsTest.java | 10 ++- .../duty/NewestSegmentFirstIteratorTest.java | 9 ++ .../duty/NewestSegmentFirstPolicyTest.java | 1 + 16 files changed, 188 insertions(+), 22 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskIOConfig.java diff --git a/docs/ingestion/compaction.md b/docs/ingestion/compaction.md index 239523d401be..394da254ee2b 100644 --- a/docs/ingestion/compaction.md +++ b/docs/ingestion/compaction.md @@ -158,10 +158,12 @@ This task doesn't specify a `granularitySpec` so Druid retains the original segm The compaction `ioConfig` requires specifying `inputSpec` as follows: -|Field|Description|Required| -|-----|-----------|--------| -|`type`|Task type. Should be `compact`|Yes| -|`inputSpec`|Input specification|Yes| +|Field|Description|Default|Required?| +|-----|-----------|-------|--------| +|`type`|Task type. Should be `compact`|none|Yes| +|`inputSpec`|Input specification|none|Yes| +|`dropExisting`|If `true`, then the compaction task drops (mark unused) all existing segments fully contained by either the `interval` in the `interval` type `inputSpec` or the umbrella interval of the `segments` in the `segment` type `inputSpec` when the task publishes new compacted segments. If compaction fails, Druid does not drop or mark unused any segments.|false|no| + There are two supported `inputSpec`s for now. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java index 60972de2673a..faedd3da700b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName; import org.apache.druid.segment.indexing.IOConfig; +import javax.annotation.Nullable; import java.util.Objects; /** @@ -36,11 +37,16 @@ public class CompactionIOConfig implements IOConfig { private final CompactionInputSpec inputSpec; + private final boolean dropExisting; @JsonCreator - public CompactionIOConfig(@JsonProperty("inputSpec") CompactionInputSpec inputSpec) + public CompactionIOConfig( + @JsonProperty("inputSpec") CompactionInputSpec inputSpec, + @JsonProperty("dropExisting") @Nullable Boolean dropExisting + ) { this.inputSpec = inputSpec; + this.dropExisting = dropExisting == null ? DEFAULT_DROP_EXISTING : dropExisting; } @JsonProperty @@ -49,6 +55,12 @@ public CompactionInputSpec getInputSpec() return inputSpec; } + @JsonProperty + public boolean isDropExisting() + { + return dropExisting; + } + @Override public boolean equals(Object o) { @@ -59,13 +71,14 @@ public boolean equals(Object o) return false; } CompactionIOConfig that = (CompactionIOConfig) o; - return Objects.equals(inputSpec, that.inputSpec); + return dropExisting == that.dropExisting && + Objects.equals(inputSpec, that.inputSpec); } @Override public int hashCode() { - return Objects.hash(inputSpec); + return Objects.hash(inputSpec, dropExisting); } @Override @@ -73,6 +86,7 @@ public String toString() { return "CompactionIOConfig{" + "inputSpec=" + inputSpec + + ", dropExisting=" + dropExisting + '}'; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index ed4bf7eb2b3a..49f9b24dbb8a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -198,11 +198,11 @@ public CompactionTask( if (ioConfig != null) { this.ioConfig = ioConfig; } else if (interval != null) { - this.ioConfig = new CompactionIOConfig(new CompactionIntervalSpec(interval, null)); + this.ioConfig = new CompactionIOConfig(new CompactionIntervalSpec(interval, null), null); } else { // We already checked segments is not null or empty above. //noinspection ConstantConditions - this.ioConfig = new CompactionIOConfig(SpecificSegmentsSpec.fromSegments(segments)); + this.ioConfig = new CompactionIOConfig(SpecificSegmentsSpec.fromSegments(segments), null); } this.dimensionsSpec = dimensionsSpec == null ? dimensions : dimensionsSpec; @@ -383,7 +383,8 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception granularitySpec, toolbox.getCoordinatorClient(), segmentLoaderFactory, - retryPolicyFactory + retryPolicyFactory, + ioConfig ); final List indexTaskSpecs = IntStream .range(0, ingestionSpecs.size()) @@ -491,7 +492,8 @@ static List createIngestionSchema( @Nullable final ClientCompactionTaskGranularitySpec granularitySpec, final CoordinatorClient coordinatorClient, final SegmentLoaderFactory segmentLoaderFactory, - final RetryPolicyFactory retryPolicyFactory + final RetryPolicyFactory retryPolicyFactory, + final CompactionIOConfig ioConfig ) throws IOException, SegmentLoadingException { NonnullPair, List>> pair = prepareSegments( @@ -573,7 +575,8 @@ static List createIngestionSchema( interval, coordinatorClient, segmentLoaderFactory, - retryPolicyFactory + retryPolicyFactory, + ioConfig ), compactionTuningConfig ) @@ -600,7 +603,8 @@ static List createIngestionSchema( segmentProvider.interval, coordinatorClient, segmentLoaderFactory, - retryPolicyFactory + retryPolicyFactory, + ioConfig ), compactionTuningConfig ) @@ -614,7 +618,8 @@ private static ParallelIndexIOConfig createIoConfig( Interval interval, CoordinatorClient coordinatorClient, SegmentLoaderFactory segmentLoaderFactory, - RetryPolicyFactory retryPolicyFactory + RetryPolicyFactory retryPolicyFactory, + CompactionIOConfig ioConfig ) { return new ParallelIndexIOConfig( @@ -634,7 +639,7 @@ private static ParallelIndexIOConfig createIoConfig( ), null, false, - true + ioConfig.isDropExisting() ); } @@ -1021,7 +1026,7 @@ public Builder segments(List segments) public Builder inputSpec(CompactionInputSpec inputSpec) { - this.ioConfig = new CompactionIOConfig(inputSpec); + this.ioConfig = new CompactionIOConfig(inputSpec, null); return this; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index bb1ee7714633..fe7ea7ff5e55 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -1033,7 +1033,6 @@ public IndexTuningConfig getTuningConfig() public static class IndexIOConfig implements BatchIOConfig { private static final boolean DEFAULT_APPEND_TO_EXISTING = false; - private static final boolean DEFAULT_DROP_EXISTING = false; private final FirehoseFactory firehoseFactory; private final InputSource inputSource; diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java index eec09545ec7f..4aefcd6a9111 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java @@ -22,8 +22,11 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import javax.annotation.Nullable; import java.util.Objects; +import static org.apache.druid.segment.indexing.IOConfig.DEFAULT_DROP_EXISTING; + /** * IOConfig for {@link ClientCompactionTaskQuery}. * @@ -34,11 +37,16 @@ public class ClientCompactionIOConfig private static final String TYPE = "compact"; private final ClientCompactionIntervalSpec inputSpec; + private final boolean dropExisting; @JsonCreator - public ClientCompactionIOConfig(@JsonProperty("inputSpec") ClientCompactionIntervalSpec inputSpec) + public ClientCompactionIOConfig( + @JsonProperty("inputSpec") ClientCompactionIntervalSpec inputSpec, + @JsonProperty("dropExisting") @Nullable Boolean dropExisting + ) { this.inputSpec = inputSpec; + this.dropExisting = dropExisting == null ? DEFAULT_DROP_EXISTING : dropExisting; } @JsonProperty @@ -53,6 +61,12 @@ public ClientCompactionIntervalSpec getInputSpec() return inputSpec; } + @JsonProperty + public boolean isDropExisting() + { + return dropExisting; + } + @Override public boolean equals(Object o) { @@ -63,13 +77,14 @@ public boolean equals(Object o) return false; } ClientCompactionIOConfig that = (ClientCompactionIOConfig) o; - return Objects.equals(inputSpec, that.inputSpec); + return dropExisting == that.dropExisting && + Objects.equals(inputSpec, that.inputSpec); } @Override public int hashCode() { - return Objects.hash(inputSpec); + return Objects.hash(inputSpec, dropExisting); } @Override @@ -77,6 +92,7 @@ public String toString() { return "ClientCompactionIOConfig{" + "inputSpec=" + inputSpec + + ", dropExisting=" + dropExisting + '}'; } } diff --git a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java index bc9f9c56da9c..d8f8f35448a9 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java +++ b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java @@ -80,6 +80,7 @@ public String compactSegments( int compactionTaskPriority, @Nullable ClientCompactionTaskQueryTuningConfig tuningConfig, @Nullable ClientCompactionTaskGranularitySpec granularitySpec, + @Nullable Boolean dropExisting, @Nullable Map context ) { @@ -98,7 +99,7 @@ public String compactSegments( final ClientTaskQuery taskQuery = new ClientCompactionTaskQuery( taskId, dataSource, - new ClientCompactionIOConfig(ClientCompactionIntervalSpec.fromSegments(segments)), + new ClientCompactionIOConfig(ClientCompactionIntervalSpec.fromSegments(segments), dropExisting), tuningConfig, granularitySpec, context diff --git a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java index f7c9712a2250..14dfcfe1114a 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java +++ b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java @@ -41,6 +41,7 @@ String compactSegments( int compactionTaskPriority, @Nullable ClientCompactionTaskQueryTuningConfig tuningConfig, @Nullable ClientCompactionTaskGranularitySpec granularitySpec, + @Nullable Boolean dropExisting, @Nullable Map context ); diff --git a/server/src/main/java/org/apache/druid/segment/indexing/IOConfig.java b/server/src/main/java/org/apache/druid/segment/indexing/IOConfig.java index b1784806e7e3..58f84c2bc4c0 100644 --- a/server/src/main/java/org/apache/druid/segment/indexing/IOConfig.java +++ b/server/src/main/java/org/apache/druid/segment/indexing/IOConfig.java @@ -30,4 +30,5 @@ }) public interface IOConfig { + boolean DEFAULT_DROP_EXISTING = false; } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java index d64eb08de38a..e7a4240b06db 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java @@ -47,6 +47,7 @@ public class DataSourceCompactionConfig private final Period skipOffsetFromLatest; private final UserCompactionTaskQueryTuningConfig tuningConfig; private final UserCompactionTaskGranularityConfig granularitySpec; + private final UserCompactionTaskIOConfig ioConfig; private final Map taskContext; @JsonCreator @@ -58,6 +59,7 @@ public DataSourceCompactionConfig( @JsonProperty("skipOffsetFromLatest") @Nullable Period skipOffsetFromLatest, @JsonProperty("tuningConfig") @Nullable UserCompactionTaskQueryTuningConfig tuningConfig, @JsonProperty("granularitySpec") @Nullable UserCompactionTaskGranularityConfig granularitySpec, + @JsonProperty("ioConfig") @Nullable UserCompactionTaskIOConfig ioConfig, @JsonProperty("taskContext") @Nullable Map taskContext ) { @@ -71,6 +73,7 @@ public DataSourceCompactionConfig( this.maxRowsPerSegment = maxRowsPerSegment; this.skipOffsetFromLatest = skipOffsetFromLatest == null ? DEFAULT_SKIP_OFFSET_FROM_LATEST : skipOffsetFromLatest; this.tuningConfig = tuningConfig; + this.ioConfig = ioConfig; if (granularitySpec != null) { Preconditions.checkArgument( granularitySpec.getQueryGranularity() == null, @@ -119,6 +122,13 @@ public UserCompactionTaskQueryTuningConfig getTuningConfig() return tuningConfig; } + @JsonProperty + @Nullable + public UserCompactionTaskIOConfig getIoConfig() + { + return ioConfig; + } + @JsonProperty @Nullable public UserCompactionTaskGranularityConfig getGranularitySpec() @@ -150,6 +160,7 @@ public boolean equals(Object o) Objects.equals(skipOffsetFromLatest, that.skipOffsetFromLatest) && Objects.equals(tuningConfig, that.tuningConfig) && Objects.equals(granularitySpec, that.granularitySpec) && + Objects.equals(ioConfig, that.ioConfig) && Objects.equals(taskContext, that.taskContext); } @@ -164,6 +175,7 @@ public int hashCode() skipOffsetFromLatest, tuningConfig, granularitySpec, + ioConfig, taskContext ); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskIOConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskIOConfig.java new file mode 100644 index 000000000000..0cedbe446465 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskIOConfig.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import javax.annotation.Nullable; +import java.util.Objects; + +import static org.apache.druid.segment.indexing.IOConfig.DEFAULT_DROP_EXISTING; + +/** + * Spec containing IO configs for Auto Compaction. + * This class mimics JSON field names for fields supported in auto compaction with + * the corresponding fields in {@link org.apache.druid.segment.indexing.IOConfig}. + * This is done for end-user ease of use. Basically, end-user will use the same syntax / JSON structure to set + * IO configs for Auto Compaction as they would for any other ingestion task. + * Note that this class simply holds IO configs and pass it to compaction task spec. + */ +public class UserCompactionTaskIOConfig +{ + private final boolean dropExisting; + + @JsonCreator + public UserCompactionTaskIOConfig( + @JsonProperty("dropExisting") @Nullable Boolean dropExisting + ) + { + this.dropExisting = dropExisting == null ? DEFAULT_DROP_EXISTING : dropExisting; + } + + @JsonProperty + public boolean isDropExisting() + { + return dropExisting; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + UserCompactionTaskIOConfig that = (UserCompactionTaskIOConfig) o; + return dropExisting == that.dropExisting; + } + + @Override + public int hashCode() + { + return Objects.hash(dropExisting); + } + + @Override + public String toString() + { + return "UserCompactionTaskIOConfig{" + + "dropExisting=" + dropExisting + + '}'; + } +} diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java index b0477586e627..5d3175104a21 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java @@ -320,6 +320,11 @@ private CoordinatorStats doRun( queryGranularitySpec = null; } + Boolean dropExisting = null; + if (config.getIoConfig() != null) { + dropExisting = config.getIoConfig().isDropExisting(); + } + // make tuningConfig final String taskId = indexingServiceClient.compactSegments( "coordinator-issued", @@ -327,6 +332,7 @@ private CoordinatorStats doRun( config.getTaskPriority(), ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()), queryGranularitySpec, + dropExisting, newAutoCompactionContext(config.getTaskContext()) ); diff --git a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java index c5a85fc4dcd3..600b4bed7bff 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java +++ b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java @@ -51,6 +51,7 @@ public String compactSegments( int compactionTaskPriority, @Nullable ClientCompactionTaskQueryTuningConfig tuningConfig, @Nullable ClientCompactionTaskGranularitySpec granularitySpec, + @Nullable Boolean dropExisting, @Nullable Map context ) { diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java index efe04e37ccbb..11154f4cc0d9 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java @@ -58,6 +58,7 @@ public void testSerdeBasic() throws IOException new Period(3600), null, null, + null, ImmutableMap.of("key", "val") ); final String json = OBJECT_MAPPER.writeValueAsString(config); @@ -84,6 +85,7 @@ public void testSerdeWithMaxRowsPerSegment() throws IOException new Period(3600), null, null, + null, ImmutableMap.of("key", "val") ); final String json = OBJECT_MAPPER.writeValueAsString(config); @@ -127,6 +129,7 @@ public void testSerdeWithMaxTotalRows() throws IOException null ), null, + null, ImmutableMap.of("key", "val") ); final String json = OBJECT_MAPPER.writeValueAsString(config); @@ -170,6 +173,7 @@ public void testSerdeMaxTotalRowsWithMaxRowsPerSegment() throws IOException null ), null, + null, ImmutableMap.of("key", "val") ); @@ -235,6 +239,7 @@ public void testSerdeGranularitySpec() throws IOException new Period(3600), null, new UserCompactionTaskGranularityConfig(Granularities.HOUR, null), + null, ImmutableMap.of("key", "val") ); final String json = OBJECT_MAPPER.writeValueAsString(config); @@ -261,6 +266,7 @@ public void testFailIfGranularitySpecContainsNonDefaultQueryGranularity() new Period(3600), null, new UserCompactionTaskGranularityConfig(Granularities.HOUR, Granularities.MONTH), + null, ImmutableMap.of("key", "val") ); } @@ -276,6 +282,7 @@ public void testSerdeWithNullGranularitySpec() throws IOException new Period(3600), null, null, + null, ImmutableMap.of("key", "val") ); final String json = OBJECT_MAPPER.writeValueAsString(config); @@ -302,6 +309,7 @@ public void testSerdeGranularitySpecWithNullValues() throws IOException new Period(3600), null, new UserCompactionTaskGranularityConfig(null, null), + null, ImmutableMap.of("key", "val") ); final String json = OBJECT_MAPPER.writeValueAsString(config); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java index 4bec733940cf..6594ac3ca609 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java @@ -592,6 +592,7 @@ public void testCompactWithoutGranularitySpec() null ), null, + null, null ) ); @@ -605,6 +606,7 @@ public void testCompactWithoutGranularitySpec() ArgumentMatchers.anyInt(), ArgumentMatchers.any(), granularitySpecArgumentCaptor.capture(), + ArgumentMatchers.any(), ArgumentMatchers.any() ); // Only the same amount of segments as the original PARTITION_PER_TIME_INTERVAL since segment granulartity is the same @@ -646,6 +648,7 @@ public void testCompactWithGranularitySpec() null ), new UserCompactionTaskGranularityConfig(Granularities.YEAR, null), + null, null ) ); @@ -659,6 +662,7 @@ public void testCompactWithGranularitySpec() ArgumentMatchers.anyInt(), ArgumentMatchers.any(), granularitySpecArgumentCaptor.capture(), + ArgumentMatchers.any(), ArgumentMatchers.any() ); // All segments is compact at the same time since we changed the segment granularity to YEAR and all segment @@ -698,7 +702,8 @@ public void testCompactWithGranularitySpecConflictWithActiveCompactionTask() new ClientCompactionIntervalSpec( Intervals.of("2000/2099"), "testSha256OfSortedSegmentIds" - ) + ), + null ), null, new ClientCompactionTaskGranularitySpec(Granularities.DAY, null), @@ -737,6 +742,7 @@ public void testCompactWithGranularitySpecConflictWithActiveCompactionTask() null ), new UserCompactionTaskGranularityConfig(Granularities.YEAR, null), + null, null ) ); @@ -755,6 +761,7 @@ public void testCompactWithGranularitySpecConflictWithActiveCompactionTask() ArgumentMatchers.anyInt(), ArgumentMatchers.any(), granularitySpecArgumentCaptor.capture(), + ArgumentMatchers.any(), ArgumentMatchers.any() ); // All segments is compact at the same time since we changed the segment granularity to YEAR and all segment @@ -1059,6 +1066,7 @@ private List createCompactionConfigs(@Nullable Integ null ), null, + null, null ) ); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java index b32a7b7eb818..0cf0dededce1 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java @@ -92,6 +92,7 @@ public void testFindPartitionsSpecFromConfigWithNullTuningConfigReturnDynamicPar null, null, null, + null, null ); Assert.assertEquals( @@ -131,6 +132,7 @@ public void testFindPartitionsSpecFromConfigWithNullMaxTotalRowsReturnLongMaxVal null ), null, + null, null ); Assert.assertEquals( @@ -170,6 +172,7 @@ public void testFindPartitionsSpecFromConfigWithNonNullMaxTotalRowsReturnGivenVa null ), null, + null, null ); Assert.assertEquals( @@ -209,6 +212,7 @@ public void testFindPartitionsSpecFromConfigWithNonNullMaxRowsPerSegmentReturnGi null ), null, + null, null ); Assert.assertEquals( @@ -248,6 +252,7 @@ public void testFindPartitionsSpecFromConfigWithDeprecatedMaxRowsPerSegmentAndMa null ), null, + null, null ); Assert.assertEquals( @@ -287,6 +292,7 @@ public void testFindPartitionsSpecFromConfigWithDeprecatedMaxRowsPerSegmentAndPa null ), null, + null, null ); Assert.assertEquals( @@ -326,6 +332,7 @@ public void testFindPartitionsSpecFromConfigWithDeprecatedMaxTotalRowsAndPartiti null ), null, + null, null ); Assert.assertEquals( @@ -365,6 +372,7 @@ public void testFindPartitionsSpecFromConfigWithHashPartitionsSpec() null ), null, + null, null ); Assert.assertEquals( @@ -404,6 +412,7 @@ public void testFindPartitionsSpecFromConfigWithRangePartitionsSpec() null ), null, + null, null ); Assert.assertEquals( diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java index 9b7f600c736c..5ccd52c23003 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java @@ -1068,6 +1068,7 @@ private DataSourceCompactionConfig createCompactionConfig( skipOffsetFromLatest, null, granularitySpec, + null, null ); } From 0ce03a270fa8c6495f118f7243a8d7d2df7e0d1f Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Mon, 5 Apr 2021 14:57:11 -0700 Subject: [PATCH 02/11] fix checkstyle --- .../druid/client/indexing/ClientCompactionIOConfig.java | 5 ++--- .../druid/server/coordinator/UserCompactionTaskIOConfig.java | 5 ++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java index 4aefcd6a9111..0419bc2b5f29 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java @@ -21,12 +21,11 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.segment.indexing.IOConfig; import javax.annotation.Nullable; import java.util.Objects; -import static org.apache.druid.segment.indexing.IOConfig.DEFAULT_DROP_EXISTING; - /** * IOConfig for {@link ClientCompactionTaskQuery}. * @@ -46,7 +45,7 @@ public ClientCompactionIOConfig( ) { this.inputSpec = inputSpec; - this.dropExisting = dropExisting == null ? DEFAULT_DROP_EXISTING : dropExisting; + this.dropExisting = dropExisting == null ? IOConfig.DEFAULT_DROP_EXISTING : dropExisting; } @JsonProperty diff --git a/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskIOConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskIOConfig.java index 0cedbe446465..c162a29393ce 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskIOConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskIOConfig.java @@ -21,12 +21,11 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.segment.indexing.IOConfig; import javax.annotation.Nullable; import java.util.Objects; -import static org.apache.druid.segment.indexing.IOConfig.DEFAULT_DROP_EXISTING; - /** * Spec containing IO configs for Auto Compaction. * This class mimics JSON field names for fields supported in auto compaction with @@ -44,7 +43,7 @@ public UserCompactionTaskIOConfig( @JsonProperty("dropExisting") @Nullable Boolean dropExisting ) { - this.dropExisting = dropExisting == null ? DEFAULT_DROP_EXISTING : dropExisting; + this.dropExisting = dropExisting == null ? IOConfig.DEFAULT_DROP_EXISTING : dropExisting; } @JsonProperty From 8193a192eae2797f159aa6f9c7da542cf3597c08 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Mon, 5 Apr 2021 14:59:54 -0700 Subject: [PATCH 03/11] fix checkstyle --- .../ClientCompactionTaskQuerySerdeTest.java | 6 ++- .../common/task/CompactionTaskTest.java | 42 ++++++++++++------- 2 files changed, 32 insertions(+), 16 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java index a73e44b7dc36..1a714f715ddd 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java @@ -83,7 +83,8 @@ public void testClientCompactionTaskQueryToCompactionTask() throws IOException new ClientCompactionIntervalSpec( Intervals.of("2019/2020"), "testSha256OfSortedSegmentIds" - ) + ), + null ), new ClientCompactionTaskQueryTuningConfig( null, @@ -268,7 +269,8 @@ public void testCompactionTaskToClientCompactionTaskQuery() throws IOException new ClientCompactionIntervalSpec( Intervals.of("2019/2020"), "testSha256OfSortedSegmentIds" - ) + ), + null ), new ClientCompactionTaskQueryTuningConfig( 100, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index 4f386abe23d9..48f759f5ebfe 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -687,7 +687,8 @@ public void testCreateIngestionSchema() throws IOException, SegmentLoadingExcept null, COORDINATOR_CLIENT, segmentLoaderFactory, - RETRY_POLICY_FACTORY + RETRY_POLICY_FACTORY, + null ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); @@ -757,7 +758,8 @@ public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOExceptio null, COORDINATOR_CLIENT, segmentLoaderFactory, - RETRY_POLICY_FACTORY + RETRY_POLICY_FACTORY, + null ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); @@ -828,7 +830,8 @@ public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException, Segm null, COORDINATOR_CLIENT, segmentLoaderFactory, - RETRY_POLICY_FACTORY + RETRY_POLICY_FACTORY, + null ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); @@ -899,7 +902,8 @@ public void testCreateIngestionSchemaWithNumShards() throws IOException, Segment null, COORDINATOR_CLIENT, segmentLoaderFactory, - RETRY_POLICY_FACTORY + RETRY_POLICY_FACTORY, + null ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); @@ -961,7 +965,8 @@ public void testCreateIngestionSchemaWithCustomDimensionsSpec() throws IOExcepti null, COORDINATOR_CLIENT, segmentLoaderFactory, - RETRY_POLICY_FACTORY + RETRY_POLICY_FACTORY, + null ); ingestionSpecs.sort( @@ -1003,7 +1008,8 @@ public void testCreateIngestionSchemaWithCustomMetricsSpec() throws IOException, null, COORDINATOR_CLIENT, segmentLoaderFactory, - RETRY_POLICY_FACTORY + RETRY_POLICY_FACTORY, + null ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); @@ -1038,7 +1044,8 @@ public void testCreateIngestionSchemaWithCustomSegments() throws IOException, Se null, COORDINATOR_CLIENT, segmentLoaderFactory, - RETRY_POLICY_FACTORY + RETRY_POLICY_FACTORY, + null ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); @@ -1079,7 +1086,8 @@ public void testCreateIngestionSchemaWithDifferentSegmentSet() throws IOExceptio null, COORDINATOR_CLIENT, segmentLoaderFactory, - RETRY_POLICY_FACTORY + RETRY_POLICY_FACTORY, + null ); } @@ -1102,7 +1110,8 @@ public void testMissingMetadata() throws IOException, SegmentLoadingException null, COORDINATOR_CLIENT, segmentLoaderFactory, - RETRY_POLICY_FACTORY + RETRY_POLICY_FACTORY, + null ); } @@ -1136,7 +1145,8 @@ public void testSegmentGranularityAndNullQueryGranularity() throws IOException, new ClientCompactionTaskGranularitySpec(new PeriodGranularity(Period.months(3), null, null), null), COORDINATOR_CLIENT, segmentLoaderFactory, - RETRY_POLICY_FACTORY + RETRY_POLICY_FACTORY, + null ); final List expectedDimensionsSpec = ImmutableList.of( new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double"))) @@ -1172,7 +1182,8 @@ public void testQueryGranularityAndNullSegmentGranularity() throws IOException, new ClientCompactionTaskGranularitySpec(null, new PeriodGranularity(Period.months(3), null, null)), COORDINATOR_CLIENT, segmentLoaderFactory, - RETRY_POLICY_FACTORY + RETRY_POLICY_FACTORY, + null ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); @@ -1209,7 +1220,8 @@ public void testQueryGranularityAndSegmentGranularityNonNull() throws IOExceptio ), COORDINATOR_CLIENT, segmentLoaderFactory, - RETRY_POLICY_FACTORY + RETRY_POLICY_FACTORY, + null ); final List expectedDimensionsSpec = ImmutableList.of( new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double"))) @@ -1245,7 +1257,8 @@ public void testNullGranularitySpec() throws IOException, SegmentLoadingExceptio null, COORDINATOR_CLIENT, segmentLoaderFactory, - RETRY_POLICY_FACTORY + RETRY_POLICY_FACTORY, + null ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); @@ -1280,7 +1293,8 @@ public void testGranularitySpecWithNullQueryGranularityAndNullSegmentGranularity new ClientCompactionTaskGranularitySpec(null, null), COORDINATOR_CLIENT, segmentLoaderFactory, - RETRY_POLICY_FACTORY + RETRY_POLICY_FACTORY, + null ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); From eaea9952034db2daa533b84b6284cb2e08335a5a Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Mon, 5 Apr 2021 19:58:38 -0700 Subject: [PATCH 04/11] fix test --- .../server/coordinator/NewestSegmentFirstPolicyBenchmark.java | 1 + 1 file changed, 1 insertion(+) diff --git a/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java b/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java index e744bf92d2b9..b93053e63fe8 100644 --- a/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java @@ -100,6 +100,7 @@ public void setup() null, null, null, + null, null ) ); From 502686bf3d139f8666e6aa1b352b3203f0602e3d Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Mon, 5 Apr 2021 21:58:21 -0700 Subject: [PATCH 05/11] add tests --- docs/configuration/index.md | 10 ++ docs/ingestion/compaction.md | 4 +- docs/ingestion/native-batch.md | 7 +- .../indexing/common/task/CompactionTask.java | 18 ++- .../ClientCompactionTaskQuerySerdeTest.java | 10 +- .../task/CompactionTaskParallelRunTest.java | 46 +++++++- .../common/task/CompactionTaskTest.java | 75 ++++++++----- .../duty/ITAutoCompactionTest.java | 1 + .../DataSourceCompactionConfigTest.java | 28 +++++ .../coordinator/duty/CompactSegmentsTest.java | 105 ++++++++++++++++++ 10 files changed, 259 insertions(+), 45 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 5a91a772b861..e31150b2da67 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -848,6 +848,7 @@ A description of the compaction config is: |`tuningConfig`|Tuning config for compaction tasks. See below [Compaction Task TuningConfig](#automatic-compaction-tuningconfig).|no| |`taskContext`|[Task context](../ingestion/tasks.md#context) for compaction tasks.|no| |`granularitySpec`|Custom `granularitySpec` to describe the `segmentGranularity` for the compacted segments. See [Automatic compaction granularitySpec](#automatic-compaction-granularityspec)|No| +|`ioConfig`|IO config for compaction tasks. See below [Compaction Task IOConfig](#automatic-compaction-ioconfig).|no| An example of compaction config is: @@ -899,6 +900,15 @@ You can optionally use the `granularitySpec` object to configure the segment gra > Unlike manual compaction, automatic compaction does not support query granularity. +###### Automatic compaction IOConfig + +Auto compaction supports a subset of the [IOConfig for Parallel task](../ingestion/native-batch.md#ioConfig). +The below is a list of the supported configurations for auto compaction. + +|Property|Description|Default|Required| +|--------|-----------|-------|--------| +|`dropExisting`|If `true`, then the generated compaction task drops (mark unused) all existing segments fully contained by the umbrella interval of the compacted segments when the task publishes new segments. If compaction fails, Druid does not drop or mark unused any segments. WARNING: this functionaility is still in beta and can result in temporary data unavailability for data within the compacted `interval`. Note that chaning this config does not cause intervals to be recompacted.|false|no| + ### Overlord For general Overlord Process information, see [here](../design/overlord.md). diff --git a/docs/ingestion/compaction.md b/docs/ingestion/compaction.md index 394da254ee2b..7a6340d376a6 100644 --- a/docs/ingestion/compaction.md +++ b/docs/ingestion/compaction.md @@ -54,7 +54,7 @@ See [Setting up a manual compaction task](#setting-up-manual-compaction) for mor ## Data handling with compaction During compaction, Druid overwrites the original set of segments with the compacted set. Druid also locks the segments for the time interval being compacted to ensure data consistency. By default, compaction tasks do not modify the underlying data. You can configure the compaction task to change the query granularity or add or remove dimensions in the compaction task. This means that the only changes to query results should be the result of intentional, not automatic, changes. -For compaction tasks, `dropExisting` for underlying ingestion tasks is "true". This means that Druid can drop (mark unused) all the existing segments fully within interval for the compaction task. For an example of why this is important, see the suggestion for reindexing with finer granularity under [Implementation considerations](native-batch.md#implementation-considerations). +For compaction tasks, `dropExisting` in `ioConfig` can be set to "true" for Druid to drop (mark unused) all existing segments fully within interval for the compaction task. For an example of why this is important, see the suggestion for reindexing with finer granularity under [Implementation considerations](native-batch.md#implementation-considerations). WARNING: this functionaility is still in beta and can result in temporary data unavailability for data within the specified `interval` If an ingestion task needs to write data to a segment for a time interval locked for compaction, by default the ingestion task supersedes the compaction task and the compaction task fails without finishing. For manual compaction tasks you can adjust the input spec interval to avoid conflicts between ingestion and compaction. For automatic compaction, you can set the `skipOffsetFromLatest` key to adjustment the auto compaction starting point from the current time to reduce the chance of conflicts between ingestion and compaction. See [Compaction dynamic configuration](../configuration/index.md#compaction-dynamic-configuration) for more information. Another option is to set the compaction task to higher priority than the ingestion task. @@ -162,7 +162,7 @@ The compaction `ioConfig` requires specifying `inputSpec` as follows: |-----|-----------|-------|--------| |`type`|Task type. Should be `compact`|none|Yes| |`inputSpec`|Input specification|none|Yes| -|`dropExisting`|If `true`, then the compaction task drops (mark unused) all existing segments fully contained by either the `interval` in the `interval` type `inputSpec` or the umbrella interval of the `segments` in the `segment` type `inputSpec` when the task publishes new compacted segments. If compaction fails, Druid does not drop or mark unused any segments.|false|no| +|`dropExisting`|If `true`, then the compaction task drops (mark unused) all existing segments fully contained by either the `interval` in the `interval` type `inputSpec` or the umbrella interval of the `segments` in the `segment` type `inputSpec` when the task publishes new compacted segments. If compaction fails, Druid does not drop or mark unused any segments. WARNING: this functionaility is still in beta and can result in temporary data unavailability for data within the specified `interval`.|false|no| There are two supported `inputSpec`s for now. diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index 3b0d2a9b254f..bdcb6f8b72a5 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -91,7 +91,8 @@ The supported compression formats for native batch ingestion are `bz2`, `gz`, `x `granularitySpec`'s intervals, the portion of those segments outside the new segments' intervals will still be visible. - You can set `dropExisting` flag in the `ioConfig` to true if you want the ingestion task to drop all existing segments that start and end within your `granularitySpec`'s intervals. This applies whether or not the new data covers all existing segments. - `dropExisting` only applies when `appendToExisting` is false and the `granularitySpec` contains an `interval`. + `dropExisting` only applies when `appendToExisting` is false and the `granularitySpec` contains an `interval`. WARNING: this + functionaility is still in beta and can result in temporary data unavailability for data within the specified `interval` The following examples demonstrate when to set the `dropExisting` property to true in the `ioConfig`: @@ -219,7 +220,7 @@ that range if there's some stray data with unexpected timestamps. |type|The task type, this should always be `index_parallel`.|none|yes| |inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to parse input data.|none|yes| |appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This means that you can append new segments to any datasource regardless of its original partitioning scheme. You must use the `dynamic` partitioning type for the appended segments. If you specify a different partitioning type, the task fails with an error.|false|no| -|dropExisting|If `true` and `appendToExisting` is `false` and the `granularitySpec` contains an`interval`, then the ingestion task drops (mark unused) all existing segments fully contained by the specified `interval` when the task publishes new segments. If ingestion fails, Druid does not drop or mark unused any segments. In the case of misconfiguration where either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec`, Druid does not drop any segments even if `dropExisting` is `true`.|false|no| +|dropExisting|If `true` and `appendToExisting` is `false` and the `granularitySpec` contains an`interval`, then the ingestion task drops (mark unused) all existing segments fully contained by the specified `interval` when the task publishes new segments. If ingestion fails, Druid does not drop or mark unused any segments. In the case of misconfiguration where either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec`, Druid does not drop any segments even if `dropExisting` is `true`. WARNING: this functionaility is still in beta and can result in temporary data unavailability for data within the specified `interval`.|false|no| ### `tuningConfig` @@ -747,7 +748,7 @@ that range if there's some stray data with unexpected timestamps. |type|The task type, this should always be "index".|none|yes| |inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to parse input data.|none|yes| |appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This means that you can append new segments to any datasource regardless of its original partitioning scheme. You must use the `dynamic` partitioning type for the appended segments. If you specify a different partitioning type, the task fails with an error.|false|no| -|dropExisting|If `true` and `appendToExisting` is `false` and the `granularitySpec` contains an`interval`, then the ingestion task drops (mark unused) all existing segments fully contained by the specified `interval` when the task publishes new segments. If ingestion fails, Druid does not drop or mark unused any segments. In the case of misconfiguration where either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec`, Druid does not drop any segments even if `dropExisting` is `true`.|false|no| +|dropExisting|If `true` and `appendToExisting` is `false` and the `granularitySpec` contains an`interval`, then the ingestion task drops (mark unused) all existing segments fully contained by the specified `interval` when the task publishes new segments. If ingestion fails, Druid does not drop or mark unused any segments. In the case of misconfiguration where either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec`, Druid does not drop any segments even if `dropExisting` is `true`. WARNING: this functionaility is still in beta and can result in temporary data unavailability for data within the specified `interval`.|false|no| ### `tuningConfig` diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 49f9b24dbb8a..e27cd8fa2655 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -384,7 +384,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception toolbox.getCoordinatorClient(), segmentLoaderFactory, retryPolicyFactory, - ioConfig + ioConfig.isDropExisting() ); final List indexTaskSpecs = IntStream .range(0, ingestionSpecs.size()) @@ -493,7 +493,7 @@ static List createIngestionSchema( final CoordinatorClient coordinatorClient, final SegmentLoaderFactory segmentLoaderFactory, final RetryPolicyFactory retryPolicyFactory, - final CompactionIOConfig ioConfig + final boolean dropExisting ) throws IOException, SegmentLoadingException { NonnullPair, List>> pair = prepareSegments( @@ -576,7 +576,7 @@ static List createIngestionSchema( coordinatorClient, segmentLoaderFactory, retryPolicyFactory, - ioConfig + dropExisting ), compactionTuningConfig ) @@ -604,7 +604,7 @@ static List createIngestionSchema( coordinatorClient, segmentLoaderFactory, retryPolicyFactory, - ioConfig + dropExisting ), compactionTuningConfig ) @@ -619,7 +619,7 @@ private static ParallelIndexIOConfig createIoConfig( CoordinatorClient coordinatorClient, SegmentLoaderFactory segmentLoaderFactory, RetryPolicyFactory retryPolicyFactory, - CompactionIOConfig ioConfig + boolean dropExisting ) { return new ParallelIndexIOConfig( @@ -639,7 +639,7 @@ private static ParallelIndexIOConfig createIoConfig( ), null, false, - ioConfig.isDropExisting() + dropExisting ); } @@ -1030,6 +1030,12 @@ public Builder inputSpec(CompactionInputSpec inputSpec) return this; } + public Builder inputSpec(CompactionInputSpec inputSpec, Boolean dropExisting) + { + this.ioConfig = new CompactionIOConfig(inputSpec, dropExisting); + return this; + } + public Builder dimensionsSpec(DimensionsSpec dimensionsSpec) { this.dimensionsSpec = dimensionsSpec; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java index 1a714f715ddd..bb5576f107e8 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java @@ -84,7 +84,7 @@ public void testClientCompactionTaskQueryToCompactionTask() throws IOException Intervals.of("2019/2020"), "testSha256OfSortedSegmentIds" ), - null + true ), new ClientCompactionTaskQueryTuningConfig( null, @@ -202,6 +202,10 @@ public void testClientCompactionTaskQueryToCompactionTask() throws IOException query.getGranularitySpec().getSegmentGranularity(), task.getGranularitySpec().getSegmentGranularity() ); + Assert.assertEquals( + query.getIoConfig().isDropExisting(), + task.getIoConfig().isDropExisting() + ); Assert.assertEquals(query.getContext(), task.getContext()); } @@ -215,7 +219,7 @@ public void testCompactionTaskToClientCompactionTaskQuery() throws IOException new RetryPolicyFactory(new RetryPolicyConfig()) ); final CompactionTask task = builder - .inputSpec(new CompactionIntervalSpec(Intervals.of("2019/2020"), "testSha256OfSortedSegmentIds")) + .inputSpec(new CompactionIntervalSpec(Intervals.of("2019/2020"), "testSha256OfSortedSegmentIds"), true) .tuningConfig( new ParallelIndexTuningConfig( null, @@ -270,7 +274,7 @@ public void testCompactionTaskToClientCompactionTaskQuery() throws IOException Intervals.of("2019/2020"), "testSha256OfSortedSegmentIds" ), - null + true ), new ClientCompactionTaskQueryTuningConfig( 100, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java index 4693f3ad485c..8219eb443077 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java @@ -442,7 +442,7 @@ public void testDruidInputSourceCreateSplitsWithIndividualSplits() } @Test - public void testCompactionDropSegmentsOfInputInterval() + public void testCompactionDropSegmentsOfInputIntervalIfDropFlagIsSet() { runIndexTask(null, true); @@ -458,7 +458,8 @@ public void testCompactionDropSegmentsOfInputInterval() RETRY_POLICY_FACTORY ); final CompactionTask compactionTask = builder - .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null)) + // Set the dropExisting flag to true in the IOConfig of the compaction task + .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null), true) .tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING) .granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.MINUTE, null)) .build(); @@ -474,6 +475,47 @@ public void testCompactionDropSegmentsOfInputInterval() } } + @Test + public void testCompactionDoesNotDropSegmentsIfDropFlagNotSet() + { + runIndexTask(null, true); + + Collection usedSegments = getCoordinatorClient().fetchUsedSegmentsInDataSourceForIntervals(DATA_SOURCE, ImmutableList.of(INTERVAL_TO_INDEX)); + Assert.assertEquals(3, usedSegments.size()); + for (DataSegment segment : usedSegments) { + Assert.assertTrue(Granularities.HOUR.isAligned(segment.getInterval())); + } + + final Builder builder = new Builder( + DATA_SOURCE, + getSegmentLoaderFactory(), + RETRY_POLICY_FACTORY + ); + final CompactionTask compactionTask = builder + .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null)) + .tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING) + .granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.MINUTE, null)) + .build(); + + final Set compactedSegments = runTask(compactionTask); + + usedSegments = getCoordinatorClient().fetchUsedSegmentsInDataSourceForIntervals(DATA_SOURCE, ImmutableList.of(INTERVAL_TO_INDEX)); + // All the HOUR segments did not get dropped since MINUTES segments did not fully covering the 3 HOURS interval. + Assert.assertEquals(6, usedSegments.size()); + int hourSegmentCount = 0; + int minuteSegmentCount = 0; + for (DataSegment segment : usedSegments) { + if (Granularities.MINUTE.isAligned(segment.getInterval())) { + minuteSegmentCount++; + } + if (Granularities.MINUTE.isAligned(segment.getInterval())) { + hourSegmentCount++; + } + } + Assert.assertEquals(3, hourSegmentCount); + Assert.assertEquals(3, minuteSegmentCount); + } + private void runIndexTask(@Nullable PartitionsSpec partitionsSpec, boolean appendToExisting) { ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index 48f759f5ebfe..acb18f2a8b4e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -109,6 +109,7 @@ import org.apache.druid.segment.data.RoaringBitmapSerdeFactory; import org.apache.druid.segment.incremental.RowIngestionMetersFactory; import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.indexing.IOConfig; import org.apache.druid.segment.indexing.RealtimeTuningConfig; import org.apache.druid.segment.indexing.TuningConfig; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; @@ -688,7 +689,7 @@ public void testCreateIngestionSchema() throws IOException, SegmentLoadingExcept COORDINATOR_CLIENT, segmentLoaderFactory, RETRY_POLICY_FACTORY, - null + IOConfig.DEFAULT_DROP_EXISTING ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); @@ -705,7 +706,8 @@ public void testCreateIngestionSchema() throws IOException, SegmentLoadingExcept AGGREGATORS, SEGMENT_INTERVALS, Granularities.MONTH, - Granularities.NONE + Granularities.NONE, + IOConfig.DEFAULT_DROP_EXISTING ); } @@ -759,7 +761,7 @@ public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOExceptio COORDINATOR_CLIENT, segmentLoaderFactory, RETRY_POLICY_FACTORY, - null + IOConfig.DEFAULT_DROP_EXISTING ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); @@ -777,7 +779,8 @@ public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOExceptio SEGMENT_INTERVALS, tuningConfig, Granularities.MONTH, - Granularities.NONE + Granularities.NONE, + IOConfig.DEFAULT_DROP_EXISTING ); } @@ -831,7 +834,7 @@ public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException, Segm COORDINATOR_CLIENT, segmentLoaderFactory, RETRY_POLICY_FACTORY, - null + IOConfig.DEFAULT_DROP_EXISTING ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); @@ -849,7 +852,8 @@ public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException, Segm SEGMENT_INTERVALS, tuningConfig, Granularities.MONTH, - Granularities.NONE + Granularities.NONE, + IOConfig.DEFAULT_DROP_EXISTING ); } @@ -903,7 +907,7 @@ public void testCreateIngestionSchemaWithNumShards() throws IOException, Segment COORDINATOR_CLIENT, segmentLoaderFactory, RETRY_POLICY_FACTORY, - null + IOConfig.DEFAULT_DROP_EXISTING ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); @@ -921,7 +925,8 @@ public void testCreateIngestionSchemaWithNumShards() throws IOException, Segment SEGMENT_INTERVALS, tuningConfig, Granularities.MONTH, - Granularities.NONE + Granularities.NONE, + IOConfig.DEFAULT_DROP_EXISTING ); } @@ -966,7 +971,7 @@ public void testCreateIngestionSchemaWithCustomDimensionsSpec() throws IOExcepti COORDINATOR_CLIENT, segmentLoaderFactory, RETRY_POLICY_FACTORY, - null + IOConfig.DEFAULT_DROP_EXISTING ); ingestionSpecs.sort( @@ -984,7 +989,8 @@ public void testCreateIngestionSchemaWithCustomDimensionsSpec() throws IOExcepti AGGREGATORS, SEGMENT_INTERVALS, Granularities.MONTH, - Granularities.NONE + Granularities.NONE, + IOConfig.DEFAULT_DROP_EXISTING ); } @@ -1009,7 +1015,7 @@ public void testCreateIngestionSchemaWithCustomMetricsSpec() throws IOException, COORDINATOR_CLIENT, segmentLoaderFactory, RETRY_POLICY_FACTORY, - null + IOConfig.DEFAULT_DROP_EXISTING ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); @@ -1027,7 +1033,8 @@ public void testCreateIngestionSchemaWithCustomMetricsSpec() throws IOException, Arrays.asList(customMetricsSpec), SEGMENT_INTERVALS, Granularities.MONTH, - Granularities.NONE + Granularities.NONE, + IOConfig.DEFAULT_DROP_EXISTING ); } @@ -1045,7 +1052,7 @@ public void testCreateIngestionSchemaWithCustomSegments() throws IOException, Se COORDINATOR_CLIENT, segmentLoaderFactory, RETRY_POLICY_FACTORY, - null + IOConfig.DEFAULT_DROP_EXISTING ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); @@ -1062,7 +1069,8 @@ public void testCreateIngestionSchemaWithCustomSegments() throws IOException, Se AGGREGATORS, SEGMENT_INTERVALS, Granularities.MONTH, - Granularities.NONE + Granularities.NONE, + IOConfig.DEFAULT_DROP_EXISTING ); } @@ -1087,7 +1095,7 @@ public void testCreateIngestionSchemaWithDifferentSegmentSet() throws IOExceptio COORDINATOR_CLIENT, segmentLoaderFactory, RETRY_POLICY_FACTORY, - null + IOConfig.DEFAULT_DROP_EXISTING ); } @@ -1111,7 +1119,7 @@ public void testMissingMetadata() throws IOException, SegmentLoadingException COORDINATOR_CLIENT, segmentLoaderFactory, RETRY_POLICY_FACTORY, - null + IOConfig.DEFAULT_DROP_EXISTING ); } @@ -1146,7 +1154,7 @@ public void testSegmentGranularityAndNullQueryGranularity() throws IOException, COORDINATOR_CLIENT, segmentLoaderFactory, RETRY_POLICY_FACTORY, - null + IOConfig.DEFAULT_DROP_EXISTING ); final List expectedDimensionsSpec = ImmutableList.of( new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double"))) @@ -1165,7 +1173,8 @@ public void testSegmentGranularityAndNullQueryGranularity() throws IOException, AGGREGATORS, Collections.singletonList(COMPACTION_INTERVAL), new PeriodGranularity(Period.months(3), null, null), - Granularities.NONE + Granularities.NONE, + IOConfig.DEFAULT_DROP_EXISTING ); } @@ -1183,7 +1192,7 @@ public void testQueryGranularityAndNullSegmentGranularity() throws IOException, COORDINATOR_CLIENT, segmentLoaderFactory, RETRY_POLICY_FACTORY, - null + IOConfig.DEFAULT_DROP_EXISTING ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); @@ -1200,7 +1209,8 @@ public void testQueryGranularityAndNullSegmentGranularity() throws IOException, AGGREGATORS, SEGMENT_INTERVALS, Granularities.MONTH, - new PeriodGranularity(Period.months(3), null, null) + new PeriodGranularity(Period.months(3), null, null), + IOConfig.DEFAULT_DROP_EXISTING ); } @@ -1221,7 +1231,7 @@ public void testQueryGranularityAndSegmentGranularityNonNull() throws IOExceptio COORDINATOR_CLIENT, segmentLoaderFactory, RETRY_POLICY_FACTORY, - null + IOConfig.DEFAULT_DROP_EXISTING ); final List expectedDimensionsSpec = ImmutableList.of( new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double"))) @@ -1240,7 +1250,8 @@ public void testQueryGranularityAndSegmentGranularityNonNull() throws IOExceptio AGGREGATORS, Collections.singletonList(COMPACTION_INTERVAL), new PeriodGranularity(Period.months(3), null, null), - new PeriodGranularity(Period.months(3), null, null) + new PeriodGranularity(Period.months(3), null, null), + IOConfig.DEFAULT_DROP_EXISTING ); } @@ -1258,7 +1269,7 @@ public void testNullGranularitySpec() throws IOException, SegmentLoadingExceptio COORDINATOR_CLIENT, segmentLoaderFactory, RETRY_POLICY_FACTORY, - null + IOConfig.DEFAULT_DROP_EXISTING ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); @@ -1275,7 +1286,8 @@ public void testNullGranularitySpec() throws IOException, SegmentLoadingExceptio AGGREGATORS, SEGMENT_INTERVALS, Granularities.MONTH, - Granularities.NONE + Granularities.NONE, + IOConfig.DEFAULT_DROP_EXISTING ); } @@ -1294,7 +1306,7 @@ public void testGranularitySpecWithNullQueryGranularityAndNullSegmentGranularity COORDINATOR_CLIENT, segmentLoaderFactory, RETRY_POLICY_FACTORY, - null + IOConfig.DEFAULT_DROP_EXISTING ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); @@ -1311,7 +1323,8 @@ public void testGranularitySpecWithNullQueryGranularityAndNullSegmentGranularity AGGREGATORS, SEGMENT_INTERVALS, Granularities.MONTH, - Granularities.NONE + Granularities.NONE, + IOConfig.DEFAULT_DROP_EXISTING ); } @@ -1419,7 +1432,8 @@ private void assertIngestionSchema( List expectedMetricsSpec, List expectedSegmentIntervals, Granularity expectedSegmentGranularity, - Granularity expectedQueryGranularity + Granularity expectedQueryGranularity, + boolean expectedDropExisting ) { assertIngestionSchema( @@ -1464,7 +1478,8 @@ private void assertIngestionSchema( null ), expectedSegmentGranularity, - expectedQueryGranularity + expectedQueryGranularity, + expectedDropExisting ); } @@ -1475,7 +1490,8 @@ private void assertIngestionSchema( List expectedSegmentIntervals, ParallelIndexTuningConfig expectedTuningConfig, Granularity expectedSegmentGranularity, - Granularity expectedQueryGranularity + Granularity expectedQueryGranularity, + boolean expectedDropExisting ) { Preconditions.checkArgument( @@ -1522,6 +1538,7 @@ private void assertIngestionSchema( // assert ioConfig final ParallelIndexIOConfig ioConfig = ingestionSchema.getIOConfig(); Assert.assertFalse(ioConfig.isAppendToExisting()); + Assert.assertEquals(expectedDropExisting, ioConfig.isDropExisting()); final InputSource inputSource = ioConfig.getInputSource(); Assert.assertTrue(inputSource instanceof DruidInputSource); final DruidInputSource druidInputSource = (DruidInputSource) inputSource; diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java index cad6e2a73f79..dd914a6b3ca2 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java @@ -568,6 +568,7 @@ private void submitCompactionConfig( 1 ), granularitySpec, + null, null ); compactionResource.submitCompactionConfig(compactionConfig); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java index 11154f4cc0d9..8d2e02c63987 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java @@ -324,4 +324,32 @@ public void testSerdeGranularitySpecWithNullValues() throws IOException Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext()); Assert.assertEquals(config.getGranularitySpec(), fromJson.getGranularitySpec()); } + + @Test + public void testSerdeIOConfig() throws IOException + { + final DataSourceCompactionConfig config = new DataSourceCompactionConfig( + "dataSource", + null, + 500L, + null, + new Period(3600), + null, + new UserCompactionTaskGranularityConfig(Granularities.HOUR, null), + new UserCompactionTaskIOConfig(true), + ImmutableMap.of("key", "val") + ); + final String json = OBJECT_MAPPER.writeValueAsString(config); + final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class); + + Assert.assertEquals(config.getDataSource(), fromJson.getDataSource()); + Assert.assertEquals(25, fromJson.getTaskPriority()); + Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes()); + Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment()); + Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest()); + Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig()); + Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext()); + Assert.assertEquals(config.getGranularitySpec(), fromJson.getGranularitySpec()); + Assert.assertEquals(config.getIoConfig(), fromJson.getIoConfig()); + } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java index 6594ac3ca609..90b3b5db9529 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java @@ -66,6 +66,7 @@ import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig; +import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig; import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig; import org.apache.druid.timeline.CompactionState; import org.apache.druid.timeline.DataSegment; @@ -614,6 +615,110 @@ public void testCompactWithoutGranularitySpec() Assert.assertNull(granularitySpecArgumentCaptor.getValue()); } + @Test + public void testCompactWithNotNullIOConfig() + { + final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class); + final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, mockIndexingServiceClient); + final List compactionConfigs = new ArrayList<>(); + final String dataSource = DATA_SOURCE_PREFIX + 0; + compactionConfigs.add( + new DataSourceCompactionConfig( + dataSource, + 0, + 500L, + null, + new Period("PT0H"), // smaller than segment interval + new UserCompactionTaskQueryTuningConfig( + null, + null, + null, + null, + partitionsSpec, + null, + null, + null, + null, + null, + 3, + null, + null, + null, + null, + null, + null + ), + null, + new UserCompactionTaskIOConfig(true), + null + ) + ); + doCompactSegments(compactSegments, compactionConfigs); + ArgumentCaptor dropExistingCapture = ArgumentCaptor.forClass(Boolean.class); + Mockito.verify(mockIndexingServiceClient).compactSegments( + ArgumentMatchers.anyString(), + ArgumentMatchers.any(), + ArgumentMatchers.anyInt(), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + dropExistingCapture.capture(), + ArgumentMatchers.any() + ); + Assert.assertEquals(true, dropExistingCapture.getValue()); + } + + @Test + public void testCompactWithNullIOConfig() + { + final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class); + final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, mockIndexingServiceClient); + final List compactionConfigs = new ArrayList<>(); + final String dataSource = DATA_SOURCE_PREFIX + 0; + compactionConfigs.add( + new DataSourceCompactionConfig( + dataSource, + 0, + 500L, + null, + new Period("PT0H"), // smaller than segment interval + new UserCompactionTaskQueryTuningConfig( + null, + null, + null, + null, + partitionsSpec, + null, + null, + null, + null, + null, + 3, + null, + null, + null, + null, + null, + null + ), + null, + null, + null + ) + ); + doCompactSegments(compactSegments, compactionConfigs); + ArgumentCaptor dropExistingCapture = ArgumentCaptor.forClass(Boolean.class); + Mockito.verify(mockIndexingServiceClient).compactSegments( + ArgumentMatchers.anyString(), + ArgumentMatchers.any(), + ArgumentMatchers.anyInt(), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + dropExistingCapture.capture(), + ArgumentMatchers.any() + ); + Assert.assertNull(dropExistingCapture.getValue()); + } + @Test public void testCompactWithGranularitySpec() { From fe8b8f4cac051dc5e4ddc5fa276b56acc7ef99c0 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Mon, 5 Apr 2021 22:04:43 -0700 Subject: [PATCH 06/11] fix spelling --- docs/configuration/index.md | 2 +- docs/ingestion/compaction.md | 4 ++-- docs/ingestion/native-batch.md | 6 +++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index e31150b2da67..ee98ec9ac8a0 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -907,7 +907,7 @@ The below is a list of the supported configurations for auto compaction. |Property|Description|Default|Required| |--------|-----------|-------|--------| -|`dropExisting`|If `true`, then the generated compaction task drops (mark unused) all existing segments fully contained by the umbrella interval of the compacted segments when the task publishes new segments. If compaction fails, Druid does not drop or mark unused any segments. WARNING: this functionaility is still in beta and can result in temporary data unavailability for data within the compacted `interval`. Note that chaning this config does not cause intervals to be recompacted.|false|no| +|`dropExisting`|If `true`, then the generated compaction task drops (mark unused) all existing segments fully contained by the umbrella interval of the compacted segments when the task publishes new segments. If compaction fails, Druid does not drop or mark unused any segments. WARNING: this functionality is still in beta and can result in temporary data unavailability for data within the compacted `interval`. Note that chaning this config does not cause intervals to be recompacted.|false|no| ### Overlord diff --git a/docs/ingestion/compaction.md b/docs/ingestion/compaction.md index 7a6340d376a6..e5180e8826ec 100644 --- a/docs/ingestion/compaction.md +++ b/docs/ingestion/compaction.md @@ -54,7 +54,7 @@ See [Setting up a manual compaction task](#setting-up-manual-compaction) for mor ## Data handling with compaction During compaction, Druid overwrites the original set of segments with the compacted set. Druid also locks the segments for the time interval being compacted to ensure data consistency. By default, compaction tasks do not modify the underlying data. You can configure the compaction task to change the query granularity or add or remove dimensions in the compaction task. This means that the only changes to query results should be the result of intentional, not automatic, changes. -For compaction tasks, `dropExisting` in `ioConfig` can be set to "true" for Druid to drop (mark unused) all existing segments fully within interval for the compaction task. For an example of why this is important, see the suggestion for reindexing with finer granularity under [Implementation considerations](native-batch.md#implementation-considerations). WARNING: this functionaility is still in beta and can result in temporary data unavailability for data within the specified `interval` +For compaction tasks, `dropExisting` in `ioConfig` can be set to "true" for Druid to drop (mark unused) all existing segments fully within interval for the compaction task. For an example of why this is important, see the suggestion for reindexing with finer granularity under [Implementation considerations](native-batch.md#implementation-considerations). WARNING: this functionality is still in beta and can result in temporary data unavailability for data within the specified `interval` If an ingestion task needs to write data to a segment for a time interval locked for compaction, by default the ingestion task supersedes the compaction task and the compaction task fails without finishing. For manual compaction tasks you can adjust the input spec interval to avoid conflicts between ingestion and compaction. For automatic compaction, you can set the `skipOffsetFromLatest` key to adjustment the auto compaction starting point from the current time to reduce the chance of conflicts between ingestion and compaction. See [Compaction dynamic configuration](../configuration/index.md#compaction-dynamic-configuration) for more information. Another option is to set the compaction task to higher priority than the ingestion task. @@ -162,7 +162,7 @@ The compaction `ioConfig` requires specifying `inputSpec` as follows: |-----|-----------|-------|--------| |`type`|Task type. Should be `compact`|none|Yes| |`inputSpec`|Input specification|none|Yes| -|`dropExisting`|If `true`, then the compaction task drops (mark unused) all existing segments fully contained by either the `interval` in the `interval` type `inputSpec` or the umbrella interval of the `segments` in the `segment` type `inputSpec` when the task publishes new compacted segments. If compaction fails, Druid does not drop or mark unused any segments. WARNING: this functionaility is still in beta and can result in temporary data unavailability for data within the specified `interval`.|false|no| +|`dropExisting`|If `true`, then the compaction task drops (mark unused) all existing segments fully contained by either the `interval` in the `interval` type `inputSpec` or the umbrella interval of the `segments` in the `segment` type `inputSpec` when the task publishes new compacted segments. If compaction fails, Druid does not drop or mark unused any segments. WARNING: this functionality is still in beta and can result in temporary data unavailability for data within the specified `interval`.|false|no| There are two supported `inputSpec`s for now. diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index bdcb6f8b72a5..e08ec36d5c90 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -92,7 +92,7 @@ The supported compression formats for native batch ingestion are `bz2`, `gz`, `x - You can set `dropExisting` flag in the `ioConfig` to true if you want the ingestion task to drop all existing segments that start and end within your `granularitySpec`'s intervals. This applies whether or not the new data covers all existing segments. `dropExisting` only applies when `appendToExisting` is false and the `granularitySpec` contains an `interval`. WARNING: this - functionaility is still in beta and can result in temporary data unavailability for data within the specified `interval` + functionality is still in beta and can result in temporary data unavailability for data within the specified `interval` The following examples demonstrate when to set the `dropExisting` property to true in the `ioConfig`: @@ -220,7 +220,7 @@ that range if there's some stray data with unexpected timestamps. |type|The task type, this should always be `index_parallel`.|none|yes| |inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to parse input data.|none|yes| |appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This means that you can append new segments to any datasource regardless of its original partitioning scheme. You must use the `dynamic` partitioning type for the appended segments. If you specify a different partitioning type, the task fails with an error.|false|no| -|dropExisting|If `true` and `appendToExisting` is `false` and the `granularitySpec` contains an`interval`, then the ingestion task drops (mark unused) all existing segments fully contained by the specified `interval` when the task publishes new segments. If ingestion fails, Druid does not drop or mark unused any segments. In the case of misconfiguration where either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec`, Druid does not drop any segments even if `dropExisting` is `true`. WARNING: this functionaility is still in beta and can result in temporary data unavailability for data within the specified `interval`.|false|no| +|dropExisting|If `true` and `appendToExisting` is `false` and the `granularitySpec` contains an`interval`, then the ingestion task drops (mark unused) all existing segments fully contained by the specified `interval` when the task publishes new segments. If ingestion fails, Druid does not drop or mark unused any segments. In the case of misconfiguration where either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec`, Druid does not drop any segments even if `dropExisting` is `true`. WARNING: this functionality is still in beta and can result in temporary data unavailability for data within the specified `interval`.|false|no| ### `tuningConfig` @@ -748,7 +748,7 @@ that range if there's some stray data with unexpected timestamps. |type|The task type, this should always be "index".|none|yes| |inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to parse input data.|none|yes| |appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This means that you can append new segments to any datasource regardless of its original partitioning scheme. You must use the `dynamic` partitioning type for the appended segments. If you specify a different partitioning type, the task fails with an error.|false|no| -|dropExisting|If `true` and `appendToExisting` is `false` and the `granularitySpec` contains an`interval`, then the ingestion task drops (mark unused) all existing segments fully contained by the specified `interval` when the task publishes new segments. If ingestion fails, Druid does not drop or mark unused any segments. In the case of misconfiguration where either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec`, Druid does not drop any segments even if `dropExisting` is `true`. WARNING: this functionaility is still in beta and can result in temporary data unavailability for data within the specified `interval`.|false|no| +|dropExisting|If `true` and `appendToExisting` is `false` and the `granularitySpec` contains an`interval`, then the ingestion task drops (mark unused) all existing segments fully contained by the specified `interval` when the task publishes new segments. If ingestion fails, Druid does not drop or mark unused any segments. In the case of misconfiguration where either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec`, Druid does not drop any segments even if `dropExisting` is `true`. WARNING: this functionality is still in beta and can result in temporary data unavailability for data within the specified `interval`.|false|no| ### `tuningConfig` From c1d17e7d68c33ec7fd6d69afdf14b2fd4089be77 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Mon, 5 Apr 2021 23:38:41 -0700 Subject: [PATCH 07/11] fix docs --- docs/configuration/index.md | 2 +- docs/ingestion/compaction.md | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index ee98ec9ac8a0..641779b32924 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -907,7 +907,7 @@ The below is a list of the supported configurations for auto compaction. |Property|Description|Default|Required| |--------|-----------|-------|--------| -|`dropExisting`|If `true`, then the generated compaction task drops (mark unused) all existing segments fully contained by the umbrella interval of the compacted segments when the task publishes new segments. If compaction fails, Druid does not drop or mark unused any segments. WARNING: this functionality is still in beta and can result in temporary data unavailability for data within the compacted `interval`. Note that chaning this config does not cause intervals to be recompacted.|false|no| +|`dropExisting`|If `true`, then the generated compaction task drops (mark unused) all existing segments fully contained by the umbrella interval of the compacted segments when the task publishes new segments. If compaction fails, Druid does not drop or mark unused any segments. WARNING: this functionality is still in beta and can result in temporary data unavailability for data within the compacted `interval`. Note that changing this config does not cause intervals to be recompacted.|false|no| ### Overlord diff --git a/docs/ingestion/compaction.md b/docs/ingestion/compaction.md index e5180e8826ec..637861bd67ad 100644 --- a/docs/ingestion/compaction.md +++ b/docs/ingestion/compaction.md @@ -54,7 +54,7 @@ See [Setting up a manual compaction task](#setting-up-manual-compaction) for mor ## Data handling with compaction During compaction, Druid overwrites the original set of segments with the compacted set. Druid also locks the segments for the time interval being compacted to ensure data consistency. By default, compaction tasks do not modify the underlying data. You can configure the compaction task to change the query granularity or add or remove dimensions in the compaction task. This means that the only changes to query results should be the result of intentional, not automatic, changes. -For compaction tasks, `dropExisting` in `ioConfig` can be set to "true" for Druid to drop (mark unused) all existing segments fully within interval for the compaction task. For an example of why this is important, see the suggestion for reindexing with finer granularity under [Implementation considerations](native-batch.md#implementation-considerations). WARNING: this functionality is still in beta and can result in temporary data unavailability for data within the specified `interval` +For compaction tasks, `dropExisting` in `ioConfig` can be set to "true" for Druid to drop (mark unused) all existing segments fully contained by the interval of the compaction task. For an example of why this is important, see the suggestion for reindexing with finer granularity under [Implementation considerations](native-batch.md#implementation-considerations). WARNING: this functionality is still in beta and can result in temporary data unavailability for data within the compaction task interval. If an ingestion task needs to write data to a segment for a time interval locked for compaction, by default the ingestion task supersedes the compaction task and the compaction task fails without finishing. For manual compaction tasks you can adjust the input spec interval to avoid conflicts between ingestion and compaction. For automatic compaction, you can set the `skipOffsetFromLatest` key to adjustment the auto compaction starting point from the current time to reduce the chance of conflicts between ingestion and compaction. See [Compaction dynamic configuration](../configuration/index.md#compaction-dynamic-configuration) for more information. Another option is to set the compaction task to higher priority than the ingestion task. @@ -162,7 +162,7 @@ The compaction `ioConfig` requires specifying `inputSpec` as follows: |-----|-----------|-------|--------| |`type`|Task type. Should be `compact`|none|Yes| |`inputSpec`|Input specification|none|Yes| -|`dropExisting`|If `true`, then the compaction task drops (mark unused) all existing segments fully contained by either the `interval` in the `interval` type `inputSpec` or the umbrella interval of the `segments` in the `segment` type `inputSpec` when the task publishes new compacted segments. If compaction fails, Druid does not drop or mark unused any segments. WARNING: this functionality is still in beta and can result in temporary data unavailability for data within the specified `interval`.|false|no| +|`dropExisting`|If `true`, then the compaction task drops (mark unused) all existing segments fully contained by either the `interval` in the `interval` type `inputSpec` or the umbrella interval of the `segments` in the `segment` type `inputSpec` when the task publishes new compacted segments. If compaction fails, Druid does not drop or mark unused any segments. WARNING: this functionality is still in beta and can result in temporary data unavailability for data within the compaction task interval.|false|no| There are two supported `inputSpec`s for now. From 536e65744f1e6d729e9891d8f18b97dd78f333e5 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Mon, 5 Apr 2021 23:52:25 -0700 Subject: [PATCH 08/11] add IT --- .../duty/ITAutoCompactionTest.java | 152 ++++++++++++++++-- 1 file changed, 141 insertions(+), 11 deletions(-) diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java index dd914a6b3ca2..75dd064d3884 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java @@ -35,6 +35,7 @@ import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig; +import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig; import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig; import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.clients.CompactionResourceTestClient; @@ -168,7 +169,7 @@ public void testAutoCompactionDutyCanUpdateCompactionConfig() throws Exception LOG.info("Auto compaction test with hash partitioning"); final HashedPartitionsSpec hashedPartitionsSpec = new HashedPartitionsSpec(null, 3, null); - submitCompactionConfig(hashedPartitionsSpec, NO_SKIP_OFFSET, 1, null); + submitCompactionConfig(hashedPartitionsSpec, NO_SKIP_OFFSET, 1, null, false); // 2 segments published per day after compaction. forceTriggerAutoCompaction(4); verifyQuery(INDEX_QUERIES_RESOURCE); @@ -183,7 +184,7 @@ public void testAutoCompactionDutyCanUpdateCompactionConfig() throws Exception "city", false ); - submitCompactionConfig(rangePartitionsSpec, NO_SKIP_OFFSET, 1, null); + submitCompactionConfig(rangePartitionsSpec, NO_SKIP_OFFSET, 1, null, false); forceTriggerAutoCompaction(2); verifyQuery(INDEX_QUERIES_RESOURCE); verifySegmentsCompacted(rangePartitionsSpec, 2); @@ -287,7 +288,7 @@ public void testAutoCompactionDutyCanUpdateTaskSlots() throws Exception } @Test - public void testAutoCompactionDutyWithSegmentGranularity() throws Exception + public void testAutoCompactionDutyWithSegmentGranularityAndWithDropExistingTrue() throws Exception { loadData(INDEX_TASK); try (final Closeable ignored = unloader(fullDatasourceName)) { @@ -298,7 +299,8 @@ public void testAutoCompactionDutyWithSegmentGranularity() throws Exception verifyQuery(INDEX_QUERIES_RESOURCE); Granularity newGranularity = Granularities.YEAR; - submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null)); + // Set dropExisting to true + submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null), true); LOG.info("Auto compaction test with YEAR segment granularity"); @@ -314,10 +316,12 @@ public void testAutoCompactionDutyWithSegmentGranularity() throws Exception checkCompactionIntervals(expectedIntervalAfterCompaction); newGranularity = Granularities.DAY; - submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null)); + // Set dropExisting to true + submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null), true); LOG.info("Auto compaction test with DAY segment granularity"); + // Since dropExisting is set to true... // The earlier segment with YEAR granularity will be dropped post-compaction // Hence, we will only have 2013-08-31 to 2013-09-01 and 2013-09-01 to 2013-09-02. expectedIntervalAfterCompaction = new ArrayList<>(); @@ -333,6 +337,58 @@ public void testAutoCompactionDutyWithSegmentGranularity() throws Exception } } + @Test + public void testAutoCompactionDutyWithSegmentGranularityAndWithDropExistingFalse() throws Exception + { + loadData(INDEX_TASK); + try (final Closeable ignored = unloader(fullDatasourceName)) { + final List intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName); + intervalsBeforeCompaction.sort(null); + // 4 segments across 2 days (4 total)... + verifySegmentsCount(4); + verifyQuery(INDEX_QUERIES_RESOURCE); + + Granularity newGranularity = Granularities.YEAR; + // Set dropExisting to false + submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null), false); + + LOG.info("Auto compaction test with YEAR segment granularity"); + + List expectedIntervalAfterCompaction = new ArrayList<>(); + for (String interval : intervalsBeforeCompaction) { + for (Interval newinterval : newGranularity.getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) { + expectedIntervalAfterCompaction.add(newinterval.toString()); + } + } + forceTriggerAutoCompaction(1); + verifyQuery(INDEX_QUERIES_RESOURCE); + verifySegmentsCompacted(1, 1000); + checkCompactionIntervals(expectedIntervalAfterCompaction); + + newGranularity = Granularities.DAY; + // Set dropExisting to false + submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null), false); + + LOG.info("Auto compaction test with DAY segment granularity"); + + // Since dropExisting is set to false... + // The earlier segment with YEAR granularity is still 'used' as it’s not fully overshaowed. + // This is because we only have newer version on 2013-08-31 to 2013-09-01 and 2013-09-01 to 2013-09-02. + // The version for the YEAR segment is still the latest for 2013-01-01 to 2013-08-31 and 2013-09-02 to 2014-01-01. + // Hence, all three segments are available and the expected intervals are combined from the DAY and YEAR segment granularities + // (which are 2013-08-31 to 2013-09-01, 2013-09-01 to 2013-09-02 and 2013-01-01 to 2014-01-01) + for (String interval : intervalsBeforeCompaction) { + for (Interval newinterval : newGranularity.getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) { + expectedIntervalAfterCompaction.add(newinterval.toString()); + } + } + forceTriggerAutoCompaction(3); + verifyQuery(INDEX_QUERIES_RESOURCE); + verifySegmentsCompacted(3, 1000); + checkCompactionIntervals(expectedIntervalAfterCompaction); + } + } + @Test public void testAutoCompactionDutyWithSegmentGranularityAndMixedVersion() throws Exception { @@ -437,7 +493,7 @@ public void testAutoCompactionDutyWithSegmentGranularityAndExistingCompactedSegm } @Test - public void testAutoCompactionDutyWithSegmentGranularityAndSmallerSegmentGranularityCoveringMultipleSegmentsInTimeline() throws Exception + public void testAutoCompactionDutyWithSegmentGranularityAndSmallerSegmentGranularityCoveringMultipleSegmentsInTimelineAndDropExistingTrue() throws Exception { loadData(INDEX_TASK); try (final Closeable ignored = unloader(fullDatasourceName)) { @@ -448,7 +504,8 @@ public void testAutoCompactionDutyWithSegmentGranularityAndSmallerSegmentGranula verifyQuery(INDEX_QUERIES_RESOURCE); Granularity newGranularity = Granularities.YEAR; - submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null)); + // Set dropExisting to true + submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null), true); List expectedIntervalAfterCompaction = new ArrayList<>(); // We wil have one segment with interval of 2013-01-01/2014-01-01 (compacted with YEAR) @@ -473,7 +530,9 @@ public void testAutoCompactionDutyWithSegmentGranularityAndSmallerSegmentGranula checkCompactionIntervals(expectedIntervalAfterCompaction); newGranularity = Granularities.MONTH; - submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null)); + // Set dropExisting to true + submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null), true); + // Since dropExisting is set to true... // This will submit a single compaction task for interval of 2013-01-01/2014-01-01 with MONTH granularity expectedIntervalAfterCompaction = new ArrayList<>(); // The previous segment with interval of 2013-01-01/2014-01-01 (compacted with YEAR) will be dropped @@ -491,6 +550,71 @@ public void testAutoCompactionDutyWithSegmentGranularityAndSmallerSegmentGranula } } + @Test + public void testAutoCompactionDutyWithSegmentGranularityAndSmallerSegmentGranularityCoveringMultipleSegmentsInTimelineAndDropExistingFalse() throws Exception + { + loadData(INDEX_TASK); + try (final Closeable ignored = unloader(fullDatasourceName)) { + final List intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName); + intervalsBeforeCompaction.sort(null); + // 4 segments across 2 days (4 total)... + verifySegmentsCount(4); + verifyQuery(INDEX_QUERIES_RESOURCE); + + Granularity newGranularity = Granularities.YEAR; + // Set dropExisting to false + submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null), false); + + List expectedIntervalAfterCompaction = new ArrayList<>(); + // We wil have one segment with interval of 2013-01-01/2014-01-01 (compacted with YEAR) + for (String interval : intervalsBeforeCompaction) { + for (Interval newinterval : newGranularity.getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) { + expectedIntervalAfterCompaction.add(newinterval.toString()); + } + } + forceTriggerAutoCompaction(1); + verifyQuery(INDEX_QUERIES_RESOURCE); + verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED); + checkCompactionIntervals(expectedIntervalAfterCompaction); + + loadData(INDEX_TASK); + verifySegmentsCount(5); + verifyQuery(INDEX_QUERIES_RESOURCE); + // 5 segments. 1 compacted YEAR segment and 4 newly ingested DAY segments across 2 days + // We wil have one segment with interval of 2013-01-01/2014-01-01 (compacted with YEAR) from the compaction earlier + // two segments with interval of 2013-08-31/2013-09-01 (newly ingested with DAY) + // and two segments with interval of 2013-09-01/2013-09-02 (newly ingested with DAY) + expectedIntervalAfterCompaction.addAll(intervalsBeforeCompaction); + checkCompactionIntervals(expectedIntervalAfterCompaction); + + newGranularity = Granularities.MONTH; + // Set dropExisting to false + submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null), false); + // Since dropExisting is set to true... + // This will submit a single compaction task for interval of 2013-01-01/2014-01-01 with MONTH granularity + expectedIntervalAfterCompaction = new ArrayList<>(); + // Since dropExisting is set to false... + // We wil have one segment with interval of 2013-01-01/2014-01-01 (compacted with YEAR) from before the compaction + for (String interval : intervalsBeforeCompaction) { + for (Interval newinterval : Granularities.YEAR.getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) { + expectedIntervalAfterCompaction.add(newinterval.toString()); + } + } + // one segments with interval of 2013-09-01/2013-10-01 (compacted with MONTH) + // and one segments with interval of 2013-10-01/2013-11-01 (compacted with MONTH) + for (String interval : intervalsBeforeCompaction) { + for (Interval newinterval : Granularities.MONTH.getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) { + expectedIntervalAfterCompaction.add(newinterval.toString()); + } + } + + forceTriggerAutoCompaction(3); + verifyQuery(INDEX_QUERIES_RESOURCE); + verifySegmentsCompacted(3, MAX_ROWS_PER_SEGMENT_COMPACTED); + checkCompactionIntervals(expectedIntervalAfterCompaction); + } + } + private void loadData(String indexTask) throws Exception { String taskSpec = getResourceAsString(indexTask); @@ -532,14 +656,20 @@ private void submitCompactionConfig(Integer maxRowsPerSegment, Period skipOffset private void submitCompactionConfig(Integer maxRowsPerSegment, Period skipOffsetFromLatest, UserCompactionTaskGranularityConfig granularitySpec) throws Exception { - submitCompactionConfig(new DynamicPartitionsSpec(maxRowsPerSegment, null), skipOffsetFromLatest, 1, granularitySpec); + submitCompactionConfig(maxRowsPerSegment, skipOffsetFromLatest, granularitySpec, false); + } + + private void submitCompactionConfig(Integer maxRowsPerSegment, Period skipOffsetFromLatest, UserCompactionTaskGranularityConfig granularitySpec, boolean dropExisting) throws Exception + { + submitCompactionConfig(new DynamicPartitionsSpec(maxRowsPerSegment, null), skipOffsetFromLatest, 1, granularitySpec, dropExisting); } private void submitCompactionConfig( PartitionsSpec partitionsSpec, Period skipOffsetFromLatest, int maxNumConcurrentSubTasks, - UserCompactionTaskGranularityConfig granularitySpec + UserCompactionTaskGranularityConfig granularitySpec, + boolean dropExisting ) throws Exception { DataSourceCompactionConfig compactionConfig = new DataSourceCompactionConfig( @@ -568,7 +698,7 @@ private void submitCompactionConfig( 1 ), granularitySpec, - null, + !dropExisting ? null : new UserCompactionTaskIOConfig(true), null ); compactionResource.submitCompactionConfig(compactionConfig); From e6f2b628407806eabfb03f0b018a55e17f61d216 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Tue, 6 Apr 2021 13:05:55 -0700 Subject: [PATCH 09/11] fix test --- .../common/task/CompactionTaskRunTest.java | 83 ++++++++++++++++++- .../UserCompactionTaskIOConfig.java | 2 +- .../DataSourceCompactionConfigTest.java | 30 ++++++- 3 files changed, 110 insertions(+), 5 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index 5eadc388db0f..df2485b86ad9 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -802,7 +802,7 @@ public void testCompactThenAppend() throws Exception } @Test - public void testPartialIntervalCompactWithFinerSegmentGranularityThenFullIntervalCompact() throws Exception + public void testPartialIntervalCompactWithFinerSegmentGranularityThenFullIntervalCompactWithDropExistingTrue() throws Exception { // This test fails with segment lock because of the bug reported in https://github.com/apache/druid/issues/10911. if (lockGranularity == LockGranularity.SEGMENT) { @@ -841,8 +841,9 @@ public void testPartialIntervalCompactWithFinerSegmentGranularityThenFullInterva ); final CompactionTask partialCompactionTask = builder - .interval(compactionPartialInterval) .segmentGranularity(Granularities.MINUTE) + // Set dropExisting to true + .inputSpec(new CompactionIntervalSpec(compactionPartialInterval, null), true) .build(); final Pair> partialCompactionResult = runTask(partialCompactionTask); @@ -864,8 +865,9 @@ public void testPartialIntervalCompactWithFinerSegmentGranularityThenFullInterva Assert.assertEquals(expectedSegments, segmentsAfterPartialCompaction); final CompactionTask fullCompactionTask = builder - .interval(Intervals.of("2014-01-01/2014-01-02")) .segmentGranularity(null) + // Set dropExisting to true + .inputSpec(new CompactionIntervalSpec(Intervals.of("2014-01-01/2014-01-02"), null), true) .build(); final Pair> fullCompactionResult = runTask(fullCompactionTask); @@ -902,6 +904,81 @@ public void testPartialIntervalCompactWithFinerSegmentGranularityThenFullInterva ); } + @Test + public void testPartialIntervalCompactWithFinerSegmentGranularityThenFullIntervalCompactWithDropExistingFalse() throws Exception + { + // This test fails with segment lock because of the bug reported in https://github.com/apache/druid/issues/10911. + if (lockGranularity == LockGranularity.SEGMENT) { + return; + } + + runIndexTask(); + + final Set expectedSegments = new HashSet<>( + getStorageCoordinator().retrieveUsedSegmentsForIntervals( + DATA_SOURCE, + Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")), + Segments.ONLY_VISIBLE + ) + ); + + final Builder builder = new Builder( + DATA_SOURCE, + segmentLoaderFactory, + RETRY_POLICY_FACTORY + ); + + final Interval partialInterval = Intervals.of("2014-01-01T01:00:00/2014-01-01T02:00:00"); + final CompactionTask partialCompactionTask = builder + .segmentGranularity(Granularities.MINUTE) + // Set dropExisting to false + .inputSpec(new CompactionIntervalSpec(partialInterval, null), false) + .build(); + + final Pair> partialCompactionResult = runTask(partialCompactionTask); + Assert.assertTrue(partialCompactionResult.lhs.isSuccess()); + // All segments in the previous expectedSegments should still appear as they have larger segment granularity. + expectedSegments.addAll(partialCompactionResult.rhs); + + final Set segmentsAfterPartialCompaction = new HashSet<>( + getStorageCoordinator().retrieveUsedSegmentsForIntervals( + DATA_SOURCE, + Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")), + Segments.ONLY_VISIBLE + ) + ); + + Assert.assertEquals(expectedSegments, segmentsAfterPartialCompaction); + + final CompactionTask fullCompactionTask = builder + .segmentGranularity(null) + // Set dropExisting to false + .inputSpec(new CompactionIntervalSpec(Intervals.of("2014-01-01/2014-01-02"), null), false) + .build(); + + final Pair> fullCompactionResult = runTask(fullCompactionTask); + Assert.assertTrue(fullCompactionResult.lhs.isSuccess()); + + final List segmentsAfterFullCompaction = new ArrayList<>( + getStorageCoordinator().retrieveUsedSegmentsForIntervals( + DATA_SOURCE, + Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")), + Segments.ONLY_VISIBLE + ) + ); + segmentsAfterFullCompaction.sort( + (s1, s2) -> Comparators.intervalsByStartThenEnd().compare(s1.getInterval(), s2.getInterval()) + ); + + Assert.assertEquals(3, segmentsAfterFullCompaction.size()); + for (int i = 0; i < segmentsAfterFullCompaction.size(); i++) { + Assert.assertEquals( + Intervals.of(StringUtils.format("2014-01-01T%02d/2014-01-01T%02d", i, i + 1)), + segmentsAfterFullCompaction.get(i).getInterval() + ); + } + } + @Test public void testRunIndexAndCompactForSameSegmentAtTheSameTime() throws Exception { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskIOConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskIOConfig.java index c162a29393ce..df5af287acad 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskIOConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskIOConfig.java @@ -29,7 +29,7 @@ /** * Spec containing IO configs for Auto Compaction. * This class mimics JSON field names for fields supported in auto compaction with - * the corresponding fields in {@link org.apache.druid.segment.indexing.IOConfig}. + * the corresponding fields in {@link IOConfig}. * This is done for end-user ease of use. Basically, end-user will use the same syntax / JSON structure to set * IO configs for Auto Compaction as they would for any other ingestion task. * Note that this class simply holds IO configs and pass it to compaction task spec. diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java index 8d2e02c63987..c798d29e1ea9 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java @@ -326,7 +326,7 @@ public void testSerdeGranularitySpecWithNullValues() throws IOException } @Test - public void testSerdeIOConfig() throws IOException + public void testSerdeIOConfigWithNonNullDropExisting() throws IOException { final DataSourceCompactionConfig config = new DataSourceCompactionConfig( "dataSource", @@ -352,4 +352,32 @@ public void testSerdeIOConfig() throws IOException Assert.assertEquals(config.getGranularitySpec(), fromJson.getGranularitySpec()); Assert.assertEquals(config.getIoConfig(), fromJson.getIoConfig()); } + + @Test + public void testSerdeIOConfigWithNullDropExisting() throws IOException + { + final DataSourceCompactionConfig config = new DataSourceCompactionConfig( + "dataSource", + null, + 500L, + null, + new Period(3600), + null, + new UserCompactionTaskGranularityConfig(Granularities.HOUR, null), + new UserCompactionTaskIOConfig(null), + ImmutableMap.of("key", "val") + ); + final String json = OBJECT_MAPPER.writeValueAsString(config); + final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class); + + Assert.assertEquals(config.getDataSource(), fromJson.getDataSource()); + Assert.assertEquals(25, fromJson.getTaskPriority()); + Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes()); + Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment()); + Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest()); + Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig()); + Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext()); + Assert.assertEquals(config.getGranularitySpec(), fromJson.getGranularitySpec()); + Assert.assertEquals(config.getIoConfig(), fromJson.getIoConfig()); + } } From 8747a180b088471d0ee0be379a78aa3a84194699 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Tue, 6 Apr 2021 15:19:35 -0700 Subject: [PATCH 10/11] fix doc --- docs/configuration/index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 641779b32924..59931732cde5 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -902,7 +902,7 @@ You can optionally use the `granularitySpec` object to configure the segment gra ###### Automatic compaction IOConfig -Auto compaction supports a subset of the [IOConfig for Parallel task](../ingestion/native-batch.md#ioConfig). +Auto compaction supports a subset of the [IOConfig for Parallel task](../ingestion/native-batch.md). The below is a list of the supported configurations for auto compaction. |Property|Description|Default|Required| From 1613173d37a28c20a5a7b5ff93bfd3b78d7876d2 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Tue, 6 Apr 2021 21:23:07 -0700 Subject: [PATCH 11/11] fix doc --- docs/configuration/index.md | 2 +- website/.spelling | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 59931732cde5..8e5ca9b7e58f 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -907,7 +907,7 @@ The below is a list of the supported configurations for auto compaction. |Property|Description|Default|Required| |--------|-----------|-------|--------| -|`dropExisting`|If `true`, then the generated compaction task drops (mark unused) all existing segments fully contained by the umbrella interval of the compacted segments when the task publishes new segments. If compaction fails, Druid does not drop or mark unused any segments. WARNING: this functionality is still in beta and can result in temporary data unavailability for data within the compacted `interval`. Note that changing this config does not cause intervals to be recompacted.|false|no| +|`dropExisting`|If `true`, then the generated compaction task drops (mark unused) all existing segments fully contained by the umbrella interval of the compacted segments when the task publishes new segments. If compaction fails, Druid does not drop or mark unused any segments. WARNING: this functionality is still in beta and can result in temporary data unavailability for data within the compacted `interval`. Note that changing this config does not cause intervals to be compacted again.|false|no| ### Overlord diff --git a/website/.spelling b/website/.spelling index 017a1a91c125..55e970dd1f00 100644 --- a/website/.spelling +++ b/website/.spelling @@ -1647,6 +1647,7 @@ HadoopIndexTasks HttpEmitter HttpPostEmitter InetAddress.getLocalHost +IOConfig JRE8u60 KeyManager L1