diff --git a/benchmarks/src/main/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java b/benchmarks/src/main/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java index dfef1283a2a3..40fa0c31a7b5 100644 --- a/benchmarks/src/main/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java +++ b/benchmarks/src/main/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java @@ -99,7 +99,6 @@ public void setup() null, null, null, - null, null ) ); diff --git a/core/src/main/java/org/apache/druid/indexer/partitions/Checks.java b/core/src/main/java/org/apache/druid/indexer/Checks.java similarity index 57% rename from core/src/main/java/org/apache/druid/indexer/partitions/Checks.java rename to core/src/main/java/org/apache/druid/indexer/Checks.java index b3b174ac0399..424ca6e79ac8 100644 --- a/core/src/main/java/org/apache/druid/indexer/partitions/Checks.java +++ b/core/src/main/java/org/apache/druid/indexer/Checks.java @@ -17,20 +17,41 @@ * under the License. */ -package org.apache.druid.indexer.partitions; +package org.apache.druid.indexer; + +import org.apache.druid.java.util.common.IAE; + +import java.util.List; /** * Various helper methods useful for checking the validity of arguments to spec constructors. */ -class Checks +public final class Checks { + public static Property checkOneNotNullOrEmpty(List> properties) + { + Property nonNullProperty = null; + for (Property property : properties) { + if (!property.isValueNullOrEmptyCollection()) { + if (nonNullProperty == null) { + nonNullProperty = property; + } else { + throw new IAE("At most one of %s must be present", properties); + } + } + } + if (nonNullProperty == null) { + throw new IAE("At most one of %s must be present", properties); + } + return nonNullProperty; + } + /** * @return Non-null value, or first one if both are null. -1 is interpreted as null for historical reasons. */ - @SuppressWarnings("VariableNotUsedInsideIf") // false positive: checked for 'null' not used inside 'if - static Property checkAtMostOneNotNull(Property property1, Property property2) + public static Property checkAtMostOneNotNull(Property property1, Property property2) { - final Property property; + final Property property; boolean isNull1 = property1.getValue() == null; boolean isNull2 = property2.getValue() == null; @@ -42,9 +63,7 @@ static Property checkAtMostOneNotNull(Property property1, Prop } else if (isNull2) { property = property1; } else { - throw new IllegalArgumentException( - "At most one of " + property1.getName() + " or " + property2.getName() + " must be present" - ); + throw new IAE("At most one of [%s] or [%s] must be present", property1, property2); } return property; @@ -53,10 +72,14 @@ static Property checkAtMostOneNotNull(Property property1, Prop /** * @return Non-null value, or first one if both are null. -1 is interpreted as null for historical reasons. */ - static Property checkAtMostOneNotNull(String name1, Integer value1, String name2, Integer value2) + public static Property checkAtMostOneNotNull(String name1, T value1, String name2, T value2) { - Property property1 = new Property<>(name1, value1); - Property property2 = new Property<>(name2, value2); + Property property1 = new Property<>(name1, value1); + Property property2 = new Property<>(name2, value2); return checkAtMostOneNotNull(property1, property2); } + + private Checks() + { + } } diff --git a/core/src/main/java/org/apache/druid/indexer/partitions/Property.java b/core/src/main/java/org/apache/druid/indexer/Property.java similarity index 75% rename from core/src/main/java/org/apache/druid/indexer/partitions/Property.java rename to core/src/main/java/org/apache/druid/indexer/Property.java index 6d4715edd1b5..9f9467ff9741 100644 --- a/core/src/main/java/org/apache/druid/indexer/partitions/Property.java +++ b/core/src/main/java/org/apache/druid/indexer/Property.java @@ -17,19 +17,20 @@ * under the License. */ -package org.apache.druid.indexer.partitions; +package org.apache.druid.indexer; +import java.util.Collection; import java.util.Objects; /** * Convenience class for holding a pair of string key and templated value. */ -class Property +public class Property { private final String name; private final T value; - Property(String name, T value) + public Property(String name, T value) { this.name = name; this.value = value; @@ -45,6 +46,17 @@ public T getValue() return value; } + public boolean isValueNullOrEmptyCollection() + { + if (value == null) { + return true; + } + if (value instanceof Collection) { + return ((Collection) value).isEmpty(); + } + return false; + } + @Override public boolean equals(Object o) { @@ -64,4 +76,13 @@ public int hashCode() { return Objects.hash(name, value); } + + @Override + public String toString() + { + return "Property{" + + "name='" + name + '\'' + + ", value=" + value + + '}'; + } } diff --git a/core/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java b/core/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java index f9f639681586..7e8c066a761a 100644 --- a/core/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java +++ b/core/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java @@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.druid.indexer.Checks; +import org.apache.druid.indexer.Property; import javax.annotation.Nullable; import java.util.Collections; diff --git a/core/src/main/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpec.java b/core/src/main/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpec.java index 3c8e506b56c5..b9f0132788a7 100644 --- a/core/src/main/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpec.java +++ b/core/src/main/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpec.java @@ -25,6 +25,8 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.druid.indexer.Checks; +import org.apache.druid.indexer.Property; import javax.annotation.Nullable; import javax.validation.constraints.NotNull; diff --git a/core/src/main/java/org/apache/druid/segment/SegmentUtils.java b/core/src/main/java/org/apache/druid/segment/SegmentUtils.java index 1f610651b8ac..10d673002811 100644 --- a/core/src/main/java/org/apache/druid/segment/SegmentUtils.java +++ b/core/src/main/java/org/apache/druid/segment/SegmentUtils.java @@ -19,15 +19,23 @@ package org.apache.druid.segment; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; import com.google.common.io.Files; import com.google.common.primitives.Ints; import org.apache.druid.guice.annotations.PublicApi; import org.apache.druid.java.util.common.IOE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.timeline.DataSegment; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; /** * Utility methods useful for implementing deep storage extensions. @@ -35,6 +43,19 @@ @PublicApi public class SegmentUtils { + private static final HashFunction HASH_FUNCTION = Hashing.sha256(); + + /** + * Hash the IDs of the given segments based on SHA-256 algorithm. + */ + public static String hashIds(List segments) + { + Collections.sort(segments); + final Hasher hasher = HASH_FUNCTION.newHasher(); + segments.forEach(segment -> hasher.putString(segment.getId().toString(), StandardCharsets.UTF_8)); + return StringUtils.fromUtf8(hasher.hash().asBytes()); + } + public static int getVersionFromDir(File inDir) throws IOException { File versionFile = new File(inDir, "version.bin"); @@ -53,4 +74,8 @@ public static int getVersionFromDir(File inDir) throws IOException throw new IOE("Invalid segment dir [%s]. Can't find either of version.bin or index.drd.", inDir); } + + private SegmentUtils() + { + } } diff --git a/core/src/test/java/org/apache/druid/indexer/ChecksTest.java b/core/src/test/java/org/apache/druid/indexer/ChecksTest.java new file mode 100644 index 000000000000..63dcd0d84a6f --- /dev/null +++ b/core/src/test/java/org/apache/druid/indexer/ChecksTest.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexer; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import org.apache.druid.java.util.common.StringUtils; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.Collections; +import java.util.List; + +public class ChecksTest +{ + private static final String NAME1 = "name1"; + private static final Integer VALUE1 = 1; + private static final String NAME2 = "name2"; + private static final Integer VALUE2 = 2; + private static final Integer NULL = null; + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @Test + public void checkAtMostOneNotNullFirstNull() + { + Property result = Checks.checkAtMostOneNotNull(NAME1, NULL, NAME2, VALUE2); + Assert.assertEquals(NAME2, result.getName()); + Assert.assertEquals(VALUE2, result.getValue()); + } + + @Test + public void checkAtMostOneNotNullSecondNull() + { + Property result = Checks.checkAtMostOneNotNull(NAME1, VALUE1, NAME2, NULL); + Assert.assertEquals(NAME1, result.getName()); + Assert.assertEquals(VALUE1, result.getValue()); + } + + @Test + public void checkAtMostOneNotNullBothNull() + { + Property result = Checks.checkAtMostOneNotNull(NAME1, NULL, NAME2, NULL); + Assert.assertEquals(NAME1, result.getName()); + Assert.assertEquals(NULL, result.getValue()); + } + + @Test + public void checkAtMostOneNotNullNeitherNull() + { + exception.expect(IllegalArgumentException.class); + exception.expectMessage( + StringUtils.format( + "[Property{name='%s', value=%s}] or [Property{name='%s', value=%s}]", + NAME1, + VALUE1, + NAME2, + VALUE2 + ) + ); + + Checks.checkAtMostOneNotNull(NAME1, VALUE1, NAME2, VALUE2); + } + + @Test + public void testCheckOneNotNullOrEmpty() + { + final List> properties = ImmutableList.of( + new Property<>("p1", null), + new Property<>("p2", 2), + new Property<>("p3", null), + new Property<>("p4", Collections.emptyList()) + ); + final Property property = Checks.checkOneNotNullOrEmpty(properties); + Assert.assertEquals(new Property<>("p2", 2), property); + } + + @Test + public void testCheckOneNotNullOrEmptyWithTwoNonNulls() + { + final List> properties = ImmutableList.of( + new Property<>("p1", null), + new Property<>("p2", 2), + new Property<>("p3", 3), + new Property<>("p4", Collections.emptyList()) + ); + exception.expect(IllegalArgumentException.class); + exception.expectMessage( + "At most one of [Property{name='p1', value=null}, Property{name='p2', value=2}, Property{name='p3', value=3}, " + + "Property{name='p4', value=[]}] must be present" + ); + Checks.checkOneNotNullOrEmpty(properties); + } + + @Test + public void testCheckOneNotNullOrEmptyWithNonNullAndNonEmpty() + { + final List> properties = ImmutableList.of( + new Property<>("p1", null), + new Property<>("p2", 2), + new Property<>("p3", null), + new Property<>("p4", Lists.newArrayList(1, 2)) + ); + exception.expect(IllegalArgumentException.class); + exception.expectMessage( + "At most one of [Property{name='p1', value=null}, Property{name='p2', value=2}, Property{name='p3', value=null}, " + + "Property{name='p4', value=[1, 2]}] must be present" + ); + Checks.checkOneNotNullOrEmpty(properties); + } + + @Test + public void testCheckOneNotNullOrEmptyWithAllNulls() + { + final List> properties = ImmutableList.of( + new Property<>("p1", null), + new Property<>("p2", null), + new Property<>("p3", null), + new Property<>("p4", null) + ); + exception.expect(IllegalArgumentException.class); + exception.expectMessage( + "At most one of [Property{name='p1', value=null}, Property{name='p2', value=null}, " + + "Property{name='p3', value=null}, Property{name='p4', value=null}] must be present" + ); + Checks.checkOneNotNullOrEmpty(properties); + } +} diff --git a/core/src/test/java/org/apache/druid/indexer/partitions/ChecksTest.java b/core/src/test/java/org/apache/druid/indexer/partitions/ChecksTest.java deleted file mode 100644 index 97d1a69602db..000000000000 --- a/core/src/test/java/org/apache/druid/indexer/partitions/ChecksTest.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexer.partitions; - -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; - -public class ChecksTest -{ - private static final String NAME1 = "name1"; - private static final Integer VALUE1 = 1; - private static final String NAME2 = "name2"; - private static final Integer VALUE2 = 2; - private static final Integer NULL = null; - - @Rule - public ExpectedException exception = ExpectedException.none(); - - @Test - public void checkAtMostOneNotNullFirstNull() - { - Property result = Checks.checkAtMostOneNotNull(NAME1, NULL, NAME2, VALUE2); - Assert.assertEquals(NAME2, result.getName()); - Assert.assertEquals(VALUE2, result.getValue()); - } - - @Test - public void checkAtMostOneNotNullSecondNull() - { - Property result = Checks.checkAtMostOneNotNull(NAME1, VALUE1, NAME2, NULL); - Assert.assertEquals(NAME1, result.getName()); - Assert.assertEquals(VALUE1, result.getValue()); - } - - @Test - public void checkAtMostOneNotNullBothNull() - { - Property result = Checks.checkAtMostOneNotNull(NAME1, NULL, NAME2, NULL); - Assert.assertEquals(NAME1, result.getName()); - Assert.assertEquals(NULL, result.getValue()); - } - - @Test - public void checkAtMostOneNotNullNeitherNull() - { - exception.expect(IllegalArgumentException.class); - exception.expectMessage("At most one of " + NAME1 + " or " + NAME2 + " must be present"); - - //noinspection ConstantConditions (expected to fail) - Checks.checkAtMostOneNotNull(NAME1, VALUE1, NAME2, VALUE2); - } -} diff --git a/core/src/test/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpecTest.java b/core/src/test/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpecTest.java index 2b572afcbc7c..51883a605e03 100644 --- a/core/src/test/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpecTest.java +++ b/core/src/test/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpecTest.java @@ -74,7 +74,7 @@ public void havingBothTargetForbidden() new Tester() .targetRowsPerSegment(1) .targetPartitionSize(1) - .testIllegalArgumentException("At most one of targetRowsPerSegment or targetPartitionSize must be present"); + .testIllegalArgumentException("At most one of [Property{name='targetRowsPerSegment', value=1}] or [Property{name='targetPartitionSize', value=1}] must be present"); } @Test @@ -83,7 +83,7 @@ public void havingBothMaxForbidden() new Tester() .maxRowsPerSegment(1) .maxPartitionSize(1) - .testIllegalArgumentException("At most one of maxRowsPerSegment or maxPartitionSize must be present"); + .testIllegalArgumentException("At most one of [Property{name='maxRowsPerSegment', value=1}] or [Property{name='maxPartitionSize', value=1}] must be present"); } @Test diff --git a/docs/configuration/index.md b/docs/configuration/index.md index f44d5aef9ac7..21bd60e927a5 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -788,7 +788,6 @@ A description of the compaction config is: |`inputSegmentSizeBytes`|Maximum number of total segment bytes processed per compaction task. Since a time chunk must be processed in its entirety, if the segments for a particular time chunk have a total size in bytes greater than this parameter, compaction will not run for that time chunk. Because each compaction task runs with a single thread, setting this value too far above 1–2GB will result in compaction tasks taking an excessive amount of time.|no (default = 419430400)| |`targetCompactionSizeBytes`|The target segment size, for each segment, after compaction. The actual sizes of compacted segments might be slightly larger or smaller than this value. Each compaction task may generate more than one output segment, and it will try to keep each output segment close to this configured size. This configuration cannot be used together with `maxRowsPerSegment`.|no (default = 419430400)| |`maxRowsPerSegment`|Max number of rows per segment after compaction. This configuration cannot be used together with `targetCompactionSizeBytes`.|no| -|`maxNumSegmentsToCompact`|Maximum number of segments to compact together per compaction task. Since a time chunk must be processed in its entirety, if a time chunk has a total number of segments greater than this parameter, compaction will not run for that time chunk.|no (default = 150)| |`skipOffsetFromLatest`|The offset for searching segments to be compacted. Strongly recommended to set for realtime dataSources. |no (default = "P1D")| |`tuningConfig`|Tuning config for compaction tasks. See below [Compaction Task TuningConfig](#compaction-tuningconfig).|no| |`taskContext`|[Task context](../ingestion/tasks.html#context) for compaction tasks.|no| diff --git a/docs/ingestion/data-management.md b/docs/ingestion/data-management.md index cf22c04386e8..f0cc822ca4c2 100644 --- a/docs/ingestion/data-management.md +++ b/docs/ingestion/data-management.md @@ -99,7 +99,7 @@ Compaction tasks merge all segments of the given interval. The syntax is: "type": "compact", "id": , "dataSource": , - "interval": , + "ioConfig": , "dimensions" , "segmentGranularity": , "targetCompactionSizeBytes": @@ -113,7 +113,7 @@ Compaction tasks merge all segments of the given interval. The syntax is: |`type`|Task type. Should be `compact`|Yes| |`id`|Task id|No| |`dataSource`|DataSource name to be compacted|Yes| -|`interval`|Interval of segments to be compacted|Yes| +|`ioConfig`|ioConfig for compaction task. See [Compaction IOConfig](#compaction-ioconfig) for details.|Yes| |`dimensionsSpec`|Custom dimensionsSpec. Compaction task will use this dimensionsSpec if exist instead of generating one. See below for more details.|No| |`metricsSpec`|Custom metricsSpec. Compaction task will use this metricsSpec if specified rather than generating one.|No| |`segmentGranularity`|If this is set, compactionTask will change the segment granularity for the given interval. See `segmentGranularity` of [`granularitySpec`](index.md#granularityspec) for more details. See the below table for the behavior.|No| @@ -128,7 +128,13 @@ An example of compaction task is { "type" : "compact", "dataSource" : "wikipedia", - "interval" : "2017-01-01/2018-01-01" + "ioConfig" : { + "type": "compact", + "inputSpec": { + "type": "interval", + "interval": "2017-01-01/2018-01-01" + } + } } ``` @@ -158,6 +164,31 @@ See [Roll-up](../ingestion/index.html#rollup) for more details. You can check that your segments are rolled up or not by using [Segment Metadata Queries](../querying/segmentmetadataquery.html#analysistypes). +### Compaction IOConfig + +The compaction IOConfig requires specifying `inputSpec` as seen below. + +|Field|Description|Required| +|-----|-----------|--------| +|`type`|Task type. Should be `compact`|Yes| +|`inputSpec`|Input specification|Yes| + +There are two supported `inputSpec`s for now. + +The interval `inputSpec` is: + +|Field|Description|Required| +|-----|-----------|--------| +|`type`|Task type. Should be `interval`|Yes| +|`interval`|Interval to compact|Yes| + +The segments `inputSpec` is: + +|Field|Description|Required| +|-----|-----------|--------| +|`type`|Task type. Should be `segments`|Yes| +|`segments`|A list of segment IDs|Yes| + ## Adding new data diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/partitions/HashedPartitionsSpecTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/partitions/HashedPartitionsSpecTest.java index 961592dbeb69..2195ab2064d0 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/partitions/HashedPartitionsSpecTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/partitions/HashedPartitionsSpecTest.java @@ -24,11 +24,10 @@ import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.Multimap; import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.StringUtils; import org.hamcrest.CoreMatchers; import org.junit.Assert; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import java.util.Collections; import java.util.List; @@ -38,9 +37,6 @@ public class HashedPartitionsSpecTest { private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper(); - @Rule - public ExpectedException exception = ExpectedException.none(); - @Test public void havingTargetRowsPerSegmentOnly() { @@ -130,7 +126,11 @@ public void havingIncompatiblePropertiesIsForbidden() Assert.fail(reasonPrefix + " did not throw exception"); } catch (RuntimeException e) { - String expectedMessage = "At most one of " + first + " or " + second + " must be present"; + final String expectedMessage = StringUtils.format( + "At most one of [Property{name='%s', value=100}] or [Property{name='%s', value=100}] must be present", + first, + second + ); Assert.assertThat( reasonPrefix + " has wrong failure message", e.getMessage(), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java new file mode 100644 index 000000000000..e107d2e4a38b --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.druid.segment.indexing.IOConfig; + +import java.util.Objects; + +/** + * {@link IOConfig} for {@link CompactionTask}. + * + * @see CompactionInputSpec + */ +@JsonTypeName("compact") +public class CompactionIOConfig implements IOConfig +{ + private final CompactionInputSpec inputSpec; + + @JsonCreator + public CompactionIOConfig(@JsonProperty("inputSpec") CompactionInputSpec inputSpec) + { + this.inputSpec = inputSpec; + } + + @JsonProperty + public CompactionInputSpec getInputSpec() + { + return inputSpec; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CompactionIOConfig that = (CompactionIOConfig) o; + return Objects.equals(inputSpec, that.inputSpec); + } + + @Override + public int hashCode() + { + return Objects.hash(inputSpec); + } + + @Override + public String toString() + { + return "CompactionIOConfig{" + + "inputSpec=" + inputSpec + + '}'; + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionInputSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionInputSpec.java new file mode 100644 index 000000000000..324f21240fc4 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionInputSpec.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonSubTypes.Type; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import java.util.List; + +/** + * Input specification for compaction task. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes(value = { + @Type(name = CompactionIntervalSpec.TYPE, value = CompactionIntervalSpec.class), + @Type(name = SpecificSegmentsSpec.TYPE, value = SpecificSegmentsSpec.class) +}) +public interface CompactionInputSpec +{ + /** + * Find the umbrella interval containing the specified input. + */ + Interval findInterval(String dataSource); + + /** + * Validate the specified input against the most recent published segments. + * This method is used to check whether the specified input has gone stale. + * + * @param latestSegments most recent published segments in the interval returned by {@link #findInterval} + */ + boolean validateSegments(List latestSegments); +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIntervalSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIntervalSpec.java new file mode 100644 index 000000000000..66304424edda --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIntervalSpec.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.JodaUtils; +import org.apache.druid.segment.SegmentUtils; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * Specifying an interval to compact. A hash of the segment IDs can be optionally provided for segment validation. + */ +public class CompactionIntervalSpec implements CompactionInputSpec +{ + public static final String TYPE = "interval"; + + private final Interval interval; + @Nullable + private final String sha256OfSortedSegmentIds; + + @JsonCreator + public CompactionIntervalSpec( + @JsonProperty("interval") Interval interval, + @JsonProperty("sha256OfSortedSegmentIds") @Nullable String sha256OfSortedSegmentIds + ) + { + if (interval != null && interval.toDurationMillis() == 0) { + throw new IAE("Interval[%s] is empty, must specify a nonempty interval", interval); + } + this.interval = interval; + this.sha256OfSortedSegmentIds = sha256OfSortedSegmentIds; + } + + @JsonProperty + public Interval getInterval() + { + return interval; + } + + @Nullable + @JsonProperty + public String getSha256OfSortedSegmentIds() + { + return sha256OfSortedSegmentIds; + } + + @Override + public Interval findInterval(String dataSource) + { + return interval; + } + + @Override + public boolean validateSegments(List latestSegments) + { + final Interval segmentsInterval = JodaUtils.umbrellaInterval( + latestSegments.stream().map(DataSegment::getInterval).collect(Collectors.toList()) + ); + if (interval.overlaps(segmentsInterval)) { + if (sha256OfSortedSegmentIds != null) { + final String hashOfThem = SegmentUtils.hashIds(latestSegments); + return hashOfThem.equals(sha256OfSortedSegmentIds); + } else { + return true; + } + } else { + return false; + } + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CompactionIntervalSpec that = (CompactionIntervalSpec) o; + return Objects.equals(interval, that.interval) && + Objects.equals(sha256OfSortedSegmentIds, that.sha256OfSortedSegmentIds); + } + + @Override + public int hashCode() + { + return Objects.hash(interval, sha256OfSortedSegmentIds); + } + + @Override + public String toString() + { + return "CompactionIntervalSpec{" + + "interval=" + interval + + ", sha256OfSegmentIds='" + sha256OfSortedSegmentIds + '\'' + + '}'; + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 0f0cfbbb4212..4161a35f17f1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -28,6 +28,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.data.input.impl.DimensionSchema; @@ -40,6 +41,8 @@ import org.apache.druid.data.input.impl.NoopInputRowParser; import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.TimeAndDimsParseSpec; +import org.apache.druid.indexer.Checks; +import org.apache.druid.indexer.Property; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; @@ -53,7 +56,6 @@ import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec; import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig; import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory; -import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.Numbers; @@ -121,8 +123,7 @@ public class CompactionTask extends AbstractBatchIndexTask private static final Logger log = new Logger(CompactionTask.class); private static final String TYPE = "compact"; - private final Interval interval; - private final List segments; + private final CompactionIOConfig ioConfig; @Nullable private final DimensionsSpec dimensionsSpec; @Nullable @@ -176,8 +177,9 @@ public CompactionTask( @JsonProperty("id") final String id, @JsonProperty("resource") final TaskResource taskResource, @JsonProperty("dataSource") final String dataSource, - @JsonProperty("interval") @Nullable final Interval interval, - @JsonProperty("segments") @Nullable final List segments, + @JsonProperty("interval") @Deprecated @Nullable final Interval interval, + @JsonProperty("segments") @Deprecated @Nullable final List segments, + @JsonProperty("ioConfig") @Nullable CompactionIOConfig ioConfig, @JsonProperty("dimensions") @Nullable final DimensionsSpec dimensions, @JsonProperty("dimensionsSpec") @Nullable final DimensionsSpec dimensionsSpec, @JsonProperty("metricsSpec") @Nullable final AggregatorFactory[] metricsSpec, @@ -196,22 +198,32 @@ public CompactionTask( ) { super(getOrMakeId(id, TYPE, dataSource), null, taskResource, dataSource, context); - Preconditions.checkArgument(interval != null || segments != null, "interval or segments should be specified"); - Preconditions.checkArgument(interval == null || segments == null, "one of interval and segments should be null"); - if (interval != null && interval.toDurationMillis() == 0) { - throw new IAE("Interval[%s] is empty, must specify a nonempty interval", interval); + Checks.checkOneNotNullOrEmpty( + ImmutableList.of( + new Property<>("ioConfig", ioConfig), + new Property<>("interval", interval), + new Property<>("segments", segments) + ) + ); + + if (ioConfig != null) { + this.ioConfig = ioConfig; + } else if (interval != null) { + this.ioConfig = new CompactionIOConfig(new CompactionIntervalSpec(interval, null)); + } else { + // We already checked segments is not null or empty above. + //noinspection ConstantConditions + this.ioConfig = new CompactionIOConfig(SpecificSegmentsSpec.fromSegments(segments)); } - this.interval = interval; - this.segments = segments; this.dimensionsSpec = dimensionsSpec == null ? dimensions : dimensionsSpec; this.metricsSpec = metricsSpec; this.segmentGranularity = segmentGranularity; this.targetCompactionSizeBytes = targetCompactionSizeBytes; this.tuningConfig = tuningConfig; this.jsonMapper = jsonMapper; - this.segmentProvider = segments == null ? new SegmentProvider(dataSource, interval) : new SegmentProvider(segments); + this.segmentProvider = new SegmentProvider(dataSource, this.ioConfig.getInputSpec()); this.partitionConfigurationManager = new PartitionConfigurationManager(targetCompactionSizeBytes, tuningConfig); this.authorizerMapper = authorizerMapper; this.chatHandlerProvider = chatHandlerProvider; @@ -223,15 +235,9 @@ public CompactionTask( } @JsonProperty - public Interval getInterval() + public CompactionIOConfig getIoConfig() { - return interval; - } - - @JsonProperty - public List getSegments() - { - return segments; + return ioConfig; } @JsonProperty @@ -343,7 +349,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception } if (indexTaskSpecs.isEmpty()) { - log.warn("Interval[%s] has no segments, nothing to do.", interval); + log.warn("Can't find segments from inputSpec[%s], nothing to do.", ioConfig.getInputSpec()); return TaskStatus.failure(getId()); } else { registerResourceCloserOnAbnormalExit(currentSubTaskHolder); @@ -758,36 +764,14 @@ private static DimensionSchema createDimensionSchema( static class SegmentProvider { private final String dataSource; + private final CompactionInputSpec inputSpec; private final Interval interval; - @Nullable - private final List segments; - SegmentProvider(String dataSource, Interval interval) + SegmentProvider(String dataSource, CompactionInputSpec inputSpec) { this.dataSource = Preconditions.checkNotNull(dataSource); - this.interval = Preconditions.checkNotNull(interval); - this.segments = null; - } - - SegmentProvider(List segments) - { - Preconditions.checkArgument(segments != null && !segments.isEmpty()); - final String dataSource = segments.get(0).getDataSource(); - Preconditions.checkArgument( - segments.stream().allMatch(segment -> segment.getDataSource().equals(dataSource)), - "segments should have the same dataSource" - ); - this.dataSource = dataSource; - this.segments = segments; - this.interval = JodaUtils.umbrellaInterval( - segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()) - ); - } - - @Nullable - List getSegments() - { - return segments; + this.inputSpec = inputSpec; + this.interval = inputSpec.findInterval(dataSource); } List checkAndGetSegments(TaskActionClient actionClient) throws IOException @@ -804,24 +788,11 @@ List checkAndGetSegments(TaskActionClient actionClient) throws IOEx .map(PartitionChunk::getObject) .collect(Collectors.toList()); - if (segments != null) { - Collections.sort(latestSegments); - Collections.sort(segments); - - if (!latestSegments.equals(segments)) { - final List unknownSegments = segments.stream() - .filter(segment -> !latestSegments.contains(segment)) - .collect(Collectors.toList()); - final List missingSegments = latestSegments.stream() - .filter(segment -> !segments.contains(segment)) - .collect(Collectors.toList()); - throw new ISE( - "Specified segments in the spec are different from the current used segments. " - + "There are unknown segments[%s] and missing segments[%s] in the spec.", - unknownSegments, - missingSegments - ); - } + if (!inputSpec.validateSegments(latestSegments)) { + throw new ISE( + "Specified segments in the spec are different from the current used segments. " + + "Possibly new segments would have been added or some segments have been unpublished." + ); } return latestSegments; } @@ -952,10 +923,7 @@ public static class Builder private final RetryPolicyFactory retryPolicyFactory; private final AppenderatorsManager appenderatorsManager; - @Nullable - private Interval interval; - @Nullable - private List segments; + private CompactionIOConfig ioConfig; @Nullable private DimensionsSpec dimensionsSpec; @Nullable @@ -994,13 +962,17 @@ public Builder( public Builder interval(Interval interval) { - this.interval = interval; - return this; + return inputSpec(new CompactionIntervalSpec(interval, null)); } public Builder segments(List segments) { - this.segments = segments; + return inputSpec(SpecificSegmentsSpec.fromSegments(segments)); + } + + public Builder inputSpec(CompactionInputSpec inputSpec) + { + this.ioConfig = new CompactionIOConfig(inputSpec); return this; } @@ -1046,8 +1018,9 @@ public CompactionTask build() null, null, dataSource, - interval, - segments, + null, + null, + ioConfig, null, dimensionsSpec, metricsSpec, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SpecificSegmentsSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SpecificSegmentsSpec.java new file mode 100644 index 000000000000..71da54bc8241 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SpecificSegmentsSpec.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.druid.java.util.common.JodaUtils; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; +import org.joda.time.Interval; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +public class SpecificSegmentsSpec implements CompactionInputSpec +{ + public static final String TYPE = "segments"; + + private final List segments; + + public static SpecificSegmentsSpec fromSegments(List segments) + { + Preconditions.checkArgument(!segments.isEmpty(), "Empty segment list"); + return new SpecificSegmentsSpec( + segments.stream().map(segment -> segment.getId().toString()).collect(Collectors.toList()) + ); + } + + @JsonCreator + public SpecificSegmentsSpec(@JsonProperty("segments") List segments) + { + this.segments = segments; + // Sort segments to use in validateSegments. + Collections.sort(this.segments); + } + + @JsonProperty + public List getSegments() + { + return segments; + } + + @Override + public Interval findInterval(String dataSource) + { + final List segmentIds = segments + .stream() + .map(segment -> SegmentId.tryParse(dataSource, segment)) + .collect(Collectors.toList()); + return JodaUtils.umbrellaInterval( + segmentIds.stream().map(SegmentId::getInterval).collect(Collectors.toList()) + ); + } + + @Override + public boolean validateSegments(List latestSegments) + { + final List thoseSegments = latestSegments + .stream() + .map(segment -> segment.getId().toString()) + .sorted() + .collect(Collectors.toList()); + return this.segments.equals(thoseSegments); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SpecificSegmentsSpec that = (SpecificSegmentsSpec) o; + return Objects.equals(segments, that.segments); + } + + @Override + public int hashCode() + { + return Objects.hash(segments); + } + + @Override + public String toString() + { + return "SpecificSegmentsSpec{" + + "segments=" + segments + + '}'; + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactQuerySerdeTest.java new file mode 100644 index 000000000000..508b5c021aab --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactQuerySerdeTest.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair; +import com.google.common.collect.ImmutableList; +import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.client.indexing.ClientCompactQuery; +import org.apache.druid.client.indexing.ClientCompactQueryTuningConfig; +import org.apache.druid.client.indexing.ClientCompactionIOConfig; +import org.apache.druid.client.indexing.ClientCompactionIntervalSpec; +import org.apache.druid.guice.GuiceAnnotationIntrospector; +import org.apache.druid.guice.GuiceInjectableValues; +import org.apache.druid.guice.GuiceInjectors; +import org.apache.druid.indexing.common.SegmentLoaderFactory; +import org.apache.druid.indexing.common.TestUtils; +import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.data.BitmapSerde.DefaultBitmapSerdeFactory; +import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy; +import org.apache.druid.segment.data.CompressionStrategy; +import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; +import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; +import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider; +import org.apache.druid.server.security.AuthTestUtils; +import org.apache.druid.server.security.AuthorizerMapper; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.HashMap; + +public class ClientCompactQuerySerdeTest +{ + private static final RowIngestionMetersFactory ROW_INGESTION_METERS_FACTORY = new TestUtils() + .getRowIngestionMetersFactory(); + private static final CoordinatorClient COORDINATOR_CLIENT = new CoordinatorClient(null, null); + private static final AppenderatorsManager APPENDERATORS_MANAGER = new TestAppenderatorsManager(); + + @Test + public void testSerde() throws IOException + { + final ObjectMapper mapper = setupInjectablesInObjectMapper(new DefaultObjectMapper()); + final ClientCompactQuery query = new ClientCompactQuery( + "datasource", + new ClientCompactionIOConfig( + new ClientCompactionIntervalSpec( + Intervals.of("2019/2020"), + "testSha256OfSortedSegmentIds" + ) + ), + null, + new ClientCompactQueryTuningConfig( + 100, + 40000, + 2000L, + 30000L, + new IndexSpec( + new DefaultBitmapSerdeFactory(), + CompressionStrategy.LZ4, + CompressionStrategy.LZF, + LongEncodingStrategy.LONGS + ), + null, + 1000L + ), + new HashMap<>() + ); + + final byte[] json = mapper.writeValueAsBytes(query); + final CompactionTask task = (CompactionTask) mapper.readValue(json, Task.class); + + Assert.assertEquals(query.getDataSource(), task.getDataSource()); + Assert.assertTrue(task.getIoConfig().getInputSpec() instanceof CompactionIntervalSpec); + Assert.assertEquals( + query.getIoConfig().getInputSpec().getInterval(), + ((CompactionIntervalSpec) task.getIoConfig().getInputSpec()).getInterval() + ); + Assert.assertEquals( + query.getIoConfig().getInputSpec().getSha256OfSortedSegmentIds(), + ((CompactionIntervalSpec) task.getIoConfig().getInputSpec()).getSha256OfSortedSegmentIds() + ); + Assert.assertEquals(query.getTargetCompactionSizeBytes(), task.getTargetCompactionSizeBytes()); + Assert.assertEquals( + query.getTuningConfig().getMaxRowsInMemory().intValue(), task.getTuningConfig().getMaxRowsInMemory() + ); + Assert.assertEquals( + query.getTuningConfig().getMaxBytesInMemory().longValue(), task.getTuningConfig().getMaxBytesInMemory() + ); + Assert.assertEquals( + query.getTuningConfig().getMaxRowsPerSegment(), task.getTuningConfig().getMaxRowsPerSegment() + ); + Assert.assertEquals( + query.getTuningConfig().getMaxTotalRows(), task.getTuningConfig().getMaxTotalRows() + ); + Assert.assertEquals( + query.getTuningConfig().getIndexSpec(), task.getTuningConfig().getIndexSpec() + ); + Assert.assertEquals( + query.getTuningConfig().getPushTimeout().longValue(), task.getTuningConfig().getPushTimeout() + ); + Assert.assertEquals(query.getContext(), task.getContext()); + } + + private static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper objectMapper) + { + final GuiceAnnotationIntrospector guiceIntrospector = new GuiceAnnotationIntrospector(); + objectMapper.setAnnotationIntrospectors( + new AnnotationIntrospectorPair( + guiceIntrospector, + objectMapper.getSerializationConfig().getAnnotationIntrospector() + ), + new AnnotationIntrospectorPair( + guiceIntrospector, + objectMapper.getDeserializationConfig().getAnnotationIntrospector() + ) + ); + GuiceInjectableValues injectableValues = new GuiceInjectableValues( + GuiceInjectors.makeStartupInjectorWithModules( + ImmutableList.of( + binder -> { + binder.bind(AuthorizerMapper.class).toInstance(AuthTestUtils.TEST_AUTHORIZER_MAPPER); + binder.bind(ChatHandlerProvider.class).toInstance(new NoopChatHandlerProvider()); + binder.bind(RowIngestionMetersFactory.class).toInstance(ROW_INGESTION_METERS_FACTORY); + binder.bind(CoordinatorClient.class).toInstance(COORDINATOR_CLIENT); + binder.bind(SegmentLoaderFactory.class).toInstance(new SegmentLoaderFactory(null, objectMapper)); + binder.bind(AppenderatorsManager.class).toInstance(APPENDERATORS_MANAGER); + } + ) + ) + ); + objectMapper.setInjectableValues(injectableValues); + return objectMapper; + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionInputSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionInputSpecTest.java new file mode 100644 index 000000000000..543428fe6acf --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionInputSpecTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.JodaUtils; +import org.apache.druid.segment.SegmentUtils; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +@RunWith(Parameterized.class) +public class CompactionInputSpecTest +{ + private static final String DATASOURCE = "datasource"; + private static final List SEGMENTS = prepareSegments(); + private static Interval INTERVAL = JodaUtils.umbrellaInterval( + SEGMENTS.stream().map(DataSegment::getInterval).collect(Collectors.toList()) + ); + + @Parameters + public static Iterable constructorFeeder() + { + return ImmutableList.of( + new Object[]{ + new CompactionIntervalSpec( + INTERVAL, + SegmentUtils.hashIds(SEGMENTS) + ) + }, + new Object[]{ + new SpecificSegmentsSpec( + SEGMENTS.stream().map(segment -> segment.getId().toString()).collect(Collectors.toList()) + ) + } + ); + } + + private static List prepareSegments() + { + return IntStream.range(0, 20) + .mapToObj(i -> newSegment(Intervals.of("2019-01-%02d/2019-01-%02d", i + 1, i + 2))) + .collect(Collectors.toList()); + } + + private static DataSegment newSegment(Interval interval) + { + return new DataSegment( + DATASOURCE, + interval, + "version", + null, + null, + null, + null, + 9, + 10 + ); + } + + private final CompactionInputSpec inputSpec; + + public CompactionInputSpecTest(CompactionInputSpec inputSpec) + { + this.inputSpec = inputSpec; + } + + @Test + public void testFindInterval() + { + Assert.assertEquals(INTERVAL, inputSpec.findInterval(DATASOURCE)); + } + + @Test + public void testValidateSegments() + { + Assert.assertTrue(inputSpec.validateSegments(SEGMENTS)); + } + + @Test + public void testValidateWrongSegments() + { + final List someSegmentIsMissing = new ArrayList<>(SEGMENTS); + someSegmentIsMissing.remove(0); + Assert.assertFalse(inputSpec.validateSegments(someSegmentIsMissing)); + + final List someSegmentIsUnknown = new ArrayList<>(SEGMENTS); + someSegmentIsUnknown.add(newSegment(Intervals.of("2018-01-01/2018-01-02"))); + Assert.assertFalse(inputSpec.validateSegments(someSegmentIsUnknown)); + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index 5d6ec71f49d9..fd201d933c67 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -81,6 +81,7 @@ import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.Metadata; import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.SegmentUtils; import org.apache.druid.segment.SimpleQueryableIndex; import org.apache.druid.segment.column.BaseColumn; import org.apache.druid.segment.column.BitmapIndex; @@ -150,15 +151,17 @@ public class CompactionTaskTest private static final Map MIXED_TYPE_COLUMN_MAP = new HashMap<>(); private static final IndexTuningConfig TUNING_CONFIG = createTuningConfig(); + private static final RowIngestionMetersFactory ROW_INGESTION_METERS_FACTORY = new TestUtils() + .getRowIngestionMetersFactory(); + private static final Map SEGMENT_MAP = new HashMap<>(); + private static final CoordinatorClient COORDINATOR_CLIENT = new TestCoordinatorClient(SEGMENT_MAP); + private static final AppenderatorsManager APPENDERATORS_MANAGER = new TestAppenderatorsManager(); + private static final ObjectMapper OBJECT_MAPPER = setupInjectablesInObjectMapper(new DefaultObjectMapper()); + private static final RetryPolicyFactory RETRY_POLICY_FACTORY = new RetryPolicyFactory(new RetryPolicyConfig()); + private static Map DIMENSIONS; private static List AGGREGATORS; private static List SEGMENTS; - private static RowIngestionMetersFactory rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory(); - private static Map segmentMap = new HashMap<>(); - private static CoordinatorClient coordinatorClient = new TestCoordinatorClient(segmentMap); - private static AppenderatorsManager appenderatorsManager = new TestAppenderatorsManager(); - private static ObjectMapper objectMapper = setupInjectablesInObjectMapper(new DefaultObjectMapper()); - private static RetryPolicyFactory retryPolicyFactory = new RetryPolicyFactory(new RetryPolicyConfig()); private TaskToolbox toolbox; private SegmentLoaderFactory segmentLoaderFactory; @@ -207,7 +210,7 @@ public static void setupClass() for (int i = 0; i < SEGMENT_INTERVALS.size(); i++) { final Interval segmentInterval = Intervals.of(StringUtils.format("2017-0%d-01/2017-0%d-01", (i + 1), (i + 2))); - segmentMap.put( + SEGMENT_MAP.put( new DataSegment( DATA_SOURCE, segmentInterval, @@ -222,7 +225,7 @@ public static void setupClass() new File("file_" + i) ); } - SEGMENTS = new ArrayList<>(segmentMap.keySet()); + SEGMENTS = new ArrayList<>(SEGMENT_MAP.keySet()); } private static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper objectMapper) @@ -244,10 +247,10 @@ private static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper objectMa binder -> { binder.bind(AuthorizerMapper.class).toInstance(AuthTestUtils.TEST_AUTHORIZER_MAPPER); binder.bind(ChatHandlerProvider.class).toInstance(new NoopChatHandlerProvider()); - binder.bind(RowIngestionMetersFactory.class).toInstance(rowIngestionMetersFactory); - binder.bind(CoordinatorClient.class).toInstance(coordinatorClient); + binder.bind(RowIngestionMetersFactory.class).toInstance(ROW_INGESTION_METERS_FACTORY); + binder.bind(CoordinatorClient.class).toInstance(COORDINATOR_CLIENT); binder.bind(SegmentLoaderFactory.class).toInstance(new SegmentLoaderFactory(null, objectMapper)); - binder.bind(AppenderatorsManager.class).toInstance(appenderatorsManager); + binder.bind(AppenderatorsManager.class).toInstance(APPENDERATORS_MANAGER); } ) ) @@ -312,13 +315,13 @@ private static IndexTuningConfig createTuningConfig() @Before public void setup() { - final IndexIO testIndexIO = new TestIndexIO(objectMapper, segmentMap); + final IndexIO testIndexIO = new TestIndexIO(OBJECT_MAPPER, SEGMENT_MAP); toolbox = new TestTaskToolbox( - new TestTaskActionClient(new ArrayList<>(segmentMap.keySet())), + new TestTaskActionClient(new ArrayList<>(SEGMENT_MAP.keySet())), testIndexIO, - segmentMap + SEGMENT_MAP ); - segmentLoaderFactory = new SegmentLoaderFactory(testIndexIO, objectMapper); + segmentLoaderFactory = new SegmentLoaderFactory(testIndexIO, OBJECT_MAPPER); } @Test @@ -326,23 +329,25 @@ public void testSerdeWithInterval() throws IOException { final Builder builder = new Builder( DATA_SOURCE, - objectMapper, + OBJECT_MAPPER, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, - rowIngestionMetersFactory, - coordinatorClient, + ROW_INGESTION_METERS_FACTORY, + COORDINATOR_CLIENT, segmentLoaderFactory, - retryPolicyFactory, - appenderatorsManager + RETRY_POLICY_FACTORY, + APPENDERATORS_MANAGER ); final CompactionTask task = builder - .interval(COMPACTION_INTERVAL) + .inputSpec( + new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS)) + ) .tuningConfig(createTuningConfig()) .context(ImmutableMap.of("testKey", "testContext")) .build(); - final byte[] bytes = objectMapper.writeValueAsBytes(task); - final CompactionTask fromJson = objectMapper.readValue(bytes, CompactionTask.class); + final byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(task); + final CompactionTask fromJson = OBJECT_MAPPER.readValue(bytes, CompactionTask.class); assertEquals(task, fromJson); } @@ -351,14 +356,14 @@ public void testSerdeWithSegments() throws IOException { final Builder builder = new Builder( DATA_SOURCE, - objectMapper, + OBJECT_MAPPER, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, - rowIngestionMetersFactory, - coordinatorClient, + ROW_INGESTION_METERS_FACTORY, + COORDINATOR_CLIENT, segmentLoaderFactory, - retryPolicyFactory, - appenderatorsManager + RETRY_POLICY_FACTORY, + APPENDERATORS_MANAGER ); final CompactionTask task = builder .segments(SEGMENTS) @@ -366,8 +371,8 @@ public void testSerdeWithSegments() throws IOException .context(ImmutableMap.of("testKey", "testContext")) .build(); - final byte[] bytes = objectMapper.writeValueAsBytes(task); - final CompactionTask fromJson = objectMapper.readValue(bytes, CompactionTask.class); + final byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(task); + final CompactionTask fromJson = OBJECT_MAPPER.readValue(bytes, CompactionTask.class); assertEquals(task, fromJson); } @@ -376,14 +381,14 @@ public void testSerdeWithDimensions() throws IOException { final Builder builder = new Builder( DATA_SOURCE, - objectMapper, + OBJECT_MAPPER, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, - rowIngestionMetersFactory, - coordinatorClient, + ROW_INGESTION_METERS_FACTORY, + COORDINATOR_CLIENT, segmentLoaderFactory, - retryPolicyFactory, - appenderatorsManager + RETRY_POLICY_FACTORY, + APPENDERATORS_MANAGER ); final CompactionTask task = builder @@ -401,8 +406,8 @@ public void testSerdeWithDimensions() throws IOException .context(ImmutableMap.of("testKey", "testVal")) .build(); - final byte[] bytes = objectMapper.writeValueAsBytes(task); - final CompactionTask fromJson = objectMapper.readValue(bytes, CompactionTask.class); + final byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(task); + final CompactionTask fromJson = OBJECT_MAPPER.readValue(bytes, CompactionTask.class); assertEquals(task, fromJson); } @@ -410,10 +415,9 @@ private static void assertEquals(CompactionTask expected, CompactionTask actual) { Assert.assertEquals(expected.getType(), actual.getType()); Assert.assertEquals(expected.getDataSource(), actual.getDataSource()); - Assert.assertEquals(expected.getInterval(), actual.getInterval()); - Assert.assertEquals(expected.getSegments(), actual.getSegments()); + Assert.assertEquals(expected.getIoConfig(), actual.getIoConfig()); Assert.assertEquals(expected.getDimensionsSpec(), actual.getDimensionsSpec()); - Assert.assertTrue(Arrays.equals(expected.getMetricsSpec(), actual.getMetricsSpec())); + Assert.assertArrayEquals(expected.getMetricsSpec(), actual.getMetricsSpec()); Assert.assertEquals(expected.getTargetCompactionSizeBytes(), actual.getTargetCompactionSizeBytes()); Assert.assertEquals(expected.getTuningConfig(), actual.getTuningConfig()); Assert.assertEquals(expected.getContext(), actual.getContext()); @@ -424,15 +428,15 @@ public void testCreateIngestionSchema() throws IOException, SegmentLoadingExcept { final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, - new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL), + new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), new PartitionConfigurationManager(null, TUNING_CONFIG), null, null, null, - objectMapper, - coordinatorClient, + OBJECT_MAPPER, + COORDINATOR_CLIENT, segmentLoaderFactory, - retryPolicyFactory + RETRY_POLICY_FACTORY ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); @@ -484,15 +488,15 @@ public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOExceptio ); final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, - new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL), + new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), new PartitionConfigurationManager(null, tuningConfig), null, null, null, - objectMapper, - coordinatorClient, + OBJECT_MAPPER, + COORDINATOR_CLIENT, segmentLoaderFactory, - retryPolicyFactory + RETRY_POLICY_FACTORY ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); @@ -545,15 +549,15 @@ public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException, Segm ); final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, - new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL), + new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), new PartitionConfigurationManager(null, tuningConfig), null, null, null, - objectMapper, - coordinatorClient, + OBJECT_MAPPER, + COORDINATOR_CLIENT, segmentLoaderFactory, - retryPolicyFactory + RETRY_POLICY_FACTORY ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); @@ -606,15 +610,15 @@ public void testCreateIngestionSchemaWithNumShards() throws IOException, Segment ); final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, - new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL), + new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), new PartitionConfigurationManager(null, tuningConfig), null, null, null, - objectMapper, - coordinatorClient, + OBJECT_MAPPER, + COORDINATOR_CLIENT, segmentLoaderFactory, - retryPolicyFactory + RETRY_POLICY_FACTORY ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); @@ -667,15 +671,15 @@ public void testCreateIngestionSchemaWithCustomDimensionsSpec() throws IOExcepti final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, - new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL), + new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), new PartitionConfigurationManager(null, TUNING_CONFIG), customSpec, null, null, - objectMapper, - coordinatorClient, + OBJECT_MAPPER, + COORDINATOR_CLIENT, segmentLoaderFactory, - retryPolicyFactory + RETRY_POLICY_FACTORY ); ingestionSpecs.sort( @@ -708,15 +712,15 @@ public void testCreateIngestionSchemaWithCustomMetricsSpec() throws IOException, final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, - new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL), + new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), new PartitionConfigurationManager(null, TUNING_CONFIG), null, customMetricsSpec, null, - objectMapper, - coordinatorClient, + OBJECT_MAPPER, + COORDINATOR_CLIENT, segmentLoaderFactory, - retryPolicyFactory + RETRY_POLICY_FACTORY ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); @@ -742,15 +746,15 @@ public void testCreateIngestionSchemaWithCustomSegments() throws IOException, Se { final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, - new SegmentProvider(SEGMENTS), + new SegmentProvider(DATA_SOURCE, SpecificSegmentsSpec.fromSegments(SEGMENTS)), new PartitionConfigurationManager(null, TUNING_CONFIG), null, null, null, - objectMapper, - coordinatorClient, + OBJECT_MAPPER, + COORDINATOR_CLIENT, segmentLoaderFactory, - retryPolicyFactory + RETRY_POLICY_FACTORY ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); @@ -782,15 +786,15 @@ public void testCreateIngestionSchemaWithDifferentSegmentSet() throws IOExceptio segments.remove(segments.size() / 2); CompactionTask.createIngestionSchema( toolbox, - new SegmentProvider(segments), + new SegmentProvider(DATA_SOURCE, SpecificSegmentsSpec.fromSegments(segments)), new PartitionConfigurationManager(null, TUNING_CONFIG), null, null, null, - objectMapper, - coordinatorClient, + OBJECT_MAPPER, + COORDINATOR_CLIENT, segmentLoaderFactory, - retryPolicyFactory + RETRY_POLICY_FACTORY ); } @@ -805,15 +809,15 @@ public void testMissingMetadata() throws IOException, SegmentLoadingException final List segments = new ArrayList<>(SEGMENTS); CompactionTask.createIngestionSchema( toolbox, - new SegmentProvider(segments), + new SegmentProvider(DATA_SOURCE, SpecificSegmentsSpec.fromSegments(segments)), new PartitionConfigurationManager(null, TUNING_CONFIG), null, null, null, - objectMapper, - coordinatorClient, + OBJECT_MAPPER, + COORDINATOR_CLIENT, segmentLoaderFactory, - retryPolicyFactory + RETRY_POLICY_FACTORY ); } @@ -825,14 +829,14 @@ public void testEmptyInterval() final Builder builder = new Builder( DATA_SOURCE, - objectMapper, + OBJECT_MAPPER, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, - rowIngestionMetersFactory, - coordinatorClient, + ROW_INGESTION_METERS_FACTORY, + COORDINATOR_CLIENT, segmentLoaderFactory, - retryPolicyFactory, - appenderatorsManager + RETRY_POLICY_FACTORY, + APPENDERATORS_MANAGER ); final CompactionTask task = builder @@ -874,15 +878,15 @@ public void testTargetPartitionSizeWithPartitionConfig() throws IOException, Seg expectedException.expectMessage("targetCompactionSizeBytes[6] cannot be used with"); final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, - new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL), + new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), new PartitionConfigurationManager(6L, tuningConfig), null, null, null, - objectMapper, - coordinatorClient, + OBJECT_MAPPER, + COORDINATOR_CLIENT, segmentLoaderFactory, - retryPolicyFactory + RETRY_POLICY_FACTORY ); } @@ -891,15 +895,15 @@ public void testSegmentGranularity() throws IOException, SegmentLoadingException { final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, - new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL), + new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), new PartitionConfigurationManager(null, TUNING_CONFIG), null, null, new PeriodGranularity(Period.months(3), null, null), - objectMapper, - coordinatorClient, + OBJECT_MAPPER, + COORDINATOR_CLIENT, segmentLoaderFactory, - retryPolicyFactory + RETRY_POLICY_FACTORY ); final List expectedDimensionsSpec = ImmutableList.of( new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double"))) @@ -926,15 +930,15 @@ public void testNullSegmentGranularityAnd() throws IOException, SegmentLoadingEx { final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, - new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL), + new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), new PartitionConfigurationManager(null, TUNING_CONFIG), null, null, null, - objectMapper, - coordinatorClient, + OBJECT_MAPPER, + COORDINATOR_CLIENT, segmentLoaderFactory, - retryPolicyFactory + RETRY_POLICY_FACTORY ); final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); @@ -962,7 +966,7 @@ public void testHugeTargetCompactionSize() final Map queryableIndexMap = indexIO.getQueryableIndexMap(); final List> segments = new ArrayList<>(); - for (Entry entry : segmentMap.entrySet()) { + for (Entry entry : SEGMENT_MAP.entrySet()) { final DataSegment segment = entry.getKey(); final File file = entry.getValue(); segments.add(Pair.of(Preconditions.checkNotNull(queryableIndexMap.get(file)), segment)); @@ -1087,7 +1091,7 @@ private void assertIngestionSchema( final DataSchema dataSchema = ingestionSchema.getDataSchema(); Assert.assertEquals(DATA_SOURCE, dataSchema.getDataSource()); - final InputRowParser parser = objectMapper.convertValue(dataSchema.getParser(), InputRowParser.class); + final InputRowParser parser = OBJECT_MAPPER.convertValue(dataSchema.getParser(), InputRowParser.class); Assert.assertTrue(parser instanceof TransformingInputRowParser); Assert.assertTrue(((TransformingInputRowParser) parser).getParser() instanceof NoopInputRowParser); Assert.assertTrue(parser.getParseSpec() instanceof TimeAndDimsParseSpec); @@ -1181,7 +1185,7 @@ private static class TestTaskToolbox extends TaskToolbox null, null, null, - new IndexMergerV9(objectMapper, indexIO, OffHeapMemorySegmentWriteOutMediumFactory.instance()), + new IndexMergerV9(OBJECT_MAPPER, indexIO, OffHeapMemorySegmentWriteOutMediumFactory.instance()), null, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/SpecificSegmentsSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/SpecificSegmentsSpecTest.java new file mode 100644 index 000000000000..4fb1558f0ef7 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/SpecificSegmentsSpecTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task; + +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class SpecificSegmentsSpecTest +{ + @Test + public void createTest() + { + final List segments = IntStream + .range(0, 20) + .mapToObj(i -> newSegment(Intervals.of("2019-01-%02d/2019-01-%02d", i + 1, i + 2))) + .collect(Collectors.toList()); + final List expectedSegmentIds = segments + .stream() + .map(segment -> segment.getId().toString()) + .collect(Collectors.toList()); + Collections.shuffle(segments, ThreadLocalRandom.current()); + final SpecificSegmentsSpec spec = SpecificSegmentsSpec.fromSegments(segments); + Assert.assertEquals(expectedSegmentIds, spec.getSegments()); + } + + private static DataSegment newSegment(Interval interval) + { + return new DataSegment( + "datasource", + interval, + "version", + null, + null, + null, + null, + 9, + 10 + ); + } +} diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java index e29cd60ed272..887d4b86e583 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java @@ -21,11 +21,8 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.druid.timeline.DataSegment; -import org.joda.time.Interval; import javax.annotation.Nullable; -import java.util.List; import java.util.Map; import java.util.Objects; @@ -36,8 +33,7 @@ public class ClientCompactQuery implements ClientQuery { private final String dataSource; - private final List segments; - private final Interval interval; + private final ClientCompactionIOConfig ioConfig; @Nullable private final Long targetCompactionSizeBytes; private final ClientCompactQueryTuningConfig tuningConfig; @@ -46,16 +42,14 @@ public class ClientCompactQuery implements ClientQuery @JsonCreator public ClientCompactQuery( @JsonProperty("dataSource") String dataSource, - @JsonProperty("interval") @Nullable final Interval interval, - @JsonProperty("segments") @Nullable final List segments, + @JsonProperty("ioConfig") ClientCompactionIOConfig ioConfig, @JsonProperty("targetCompactionSizeBytes") @Nullable Long targetCompactionSizeBytes, @JsonProperty("tuningConfig") ClientCompactQueryTuningConfig tuningConfig, @JsonProperty("context") Map context ) { this.dataSource = dataSource; - this.segments = segments; - this.interval = interval; + this.ioConfig = ioConfig; this.targetCompactionSizeBytes = targetCompactionSizeBytes; this.tuningConfig = tuningConfig; this.context = context; @@ -76,15 +70,9 @@ public String getDataSource() } @JsonProperty - public List getSegments() + public ClientCompactionIOConfig getIoConfig() { - return segments; - } - - @JsonProperty - public Interval getInterval() - { - return interval; + return ioConfig; } @JsonProperty @@ -117,8 +105,7 @@ public boolean equals(Object o) } ClientCompactQuery that = (ClientCompactQuery) o; return Objects.equals(dataSource, that.dataSource) && - Objects.equals(segments, that.segments) && - Objects.equals(interval, that.interval) && + Objects.equals(ioConfig, that.ioConfig) && Objects.equals(targetCompactionSizeBytes, that.targetCompactionSizeBytes) && Objects.equals(tuningConfig, that.tuningConfig) && Objects.equals(context, that.context); @@ -127,23 +114,15 @@ public boolean equals(Object o) @Override public int hashCode() { - return Objects.hash( - dataSource, - segments, - interval, - targetCompactionSizeBytes, - tuningConfig, - context - ); + return Objects.hash(dataSource, ioConfig, targetCompactionSizeBytes, tuningConfig, context); } @Override public String toString() { - return getClass().getSimpleName() + "{" + + return "ClientCompactQuery{" + "dataSource='" + dataSource + '\'' + - ", segments=" + segments + - ", interval=" + interval + + ", ioConfig=" + ioConfig + ", targetCompactionSizeBytes=" + targetCompactionSizeBytes + ", tuningConfig=" + tuningConfig + ", context=" + context + diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java new file mode 100644 index 000000000000..2bc66d2f53eb --- /dev/null +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client.indexing; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +/** + * IOConfig for {@link ClientCompactQuery}. + * + * Should be synchronized with org.apache.druid.indexing.common.task.CompactionIOConfig. + */ +public class ClientCompactionIOConfig +{ + private static final String TYPE = "compact"; + + private final ClientCompactionIntervalSpec inputSpec; + + @JsonCreator + public ClientCompactionIOConfig(@JsonProperty("inputSpec") ClientCompactionIntervalSpec inputSpec) + { + this.inputSpec = inputSpec; + } + + @JsonProperty + public String getType() + { + return TYPE; + } + + @JsonProperty + public ClientCompactionIntervalSpec getInputSpec() + { + return inputSpec; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ClientCompactionIOConfig that = (ClientCompactionIOConfig) o; + return Objects.equals(inputSpec, that.inputSpec); + } + + @Override + public int hashCode() + { + return Objects.hash(inputSpec); + } + + @Override + public String toString() + { + return "ClientCompactionIOConfig{" + + "inputSpec=" + inputSpec + + '}'; + } +} diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpec.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpec.java new file mode 100644 index 000000000000..ddec63568341 --- /dev/null +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpec.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client.indexing; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.JodaUtils; +import org.apache.druid.segment.SegmentUtils; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * InputSpec for {@link ClientCompactionIOConfig}. + * + * Should be synchronized with org.apache.druid.indexing.common.task.CompactionIntervalSpec. + */ +public class ClientCompactionIntervalSpec +{ + private static final String TYPE = "interval"; + + private final Interval interval; + @Nullable + private final String sha256OfSortedSegmentIds; + + public static ClientCompactionIntervalSpec fromSegments(List segments) + { + return new ClientCompactionIntervalSpec( + JodaUtils.umbrellaInterval( + segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()) + ), + SegmentUtils.hashIds(segments) + ); + } + + @JsonCreator + public ClientCompactionIntervalSpec( + @JsonProperty("interval") Interval interval, + @JsonProperty("sha256OfSortedSegmentIds") @Nullable String sha256OfSortedSegmentIds + ) + { + if (interval != null && interval.toDurationMillis() == 0) { + throw new IAE("Interval[%s] is empty, must specify a nonempty interval", interval); + } + this.interval = interval; + this.sha256OfSortedSegmentIds = sha256OfSortedSegmentIds; + } + + @JsonProperty + public String getType() + { + return TYPE; + } + + @JsonProperty + public Interval getInterval() + { + return interval; + } + + @Nullable + @JsonProperty + public String getSha256OfSortedSegmentIds() + { + return sha256OfSortedSegmentIds; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ClientCompactionIntervalSpec that = (ClientCompactionIntervalSpec) o; + return Objects.equals(interval, that.interval) && + Objects.equals(sha256OfSortedSegmentIds, that.sha256OfSortedSegmentIds); + } + + @Override + public int hashCode() + { + return Objects.hash(interval, sha256OfSortedSegmentIds); + } + + @Override + public String toString() + { + return "ClientCompactionIntervalSpec{" + + "interval=" + interval + + ", sha256OfSortedSegmentIds='" + sha256OfSortedSegmentIds + '\'' + + '}'; + } +} diff --git a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java index de590a2a85ca..0efb9c97c840 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java +++ b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java @@ -93,8 +93,7 @@ public String compactSegments( return runTask( new ClientCompactQuery( dataSource, - null, - segments, + new ClientCompactionIOConfig(ClientCompactionIntervalSpec.fromSegments(segments)), targetCompactionSizeBytes, tuningConfig, context diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java index ebf4da63da2d..9c42a11b073b 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java @@ -38,7 +38,6 @@ public class DataSourceCompactionConfig // should be synchronized with Tasks.DEFAULT_MERGE_TASK_PRIORITY private static final int DEFAULT_COMPACTION_TASK_PRIORITY = 25; private static final long DEFAULT_INPUT_SEGMENT_SIZE_BYTES = 400 * 1024 * 1024; - private static final int DEFAULT_NUM_INPUT_SEGMENTS = 150; private static final Period DEFAULT_SKIP_OFFSET_FROM_LATEST = new Period("P1D"); private final String dataSource; @@ -50,7 +49,6 @@ public class DataSourceCompactionConfig // RemoteTaskRunnerConfig.maxZnodeBytes. @Nullable private final Integer maxRowsPerSegment; - private final int maxNumSegmentsToCompact; private final Period skipOffsetFromLatest; private final UserCompactTuningConfig tuningConfig; private final Map taskContext; @@ -62,7 +60,6 @@ public DataSourceCompactionConfig( @JsonProperty("inputSegmentSizeBytes") @Nullable Long inputSegmentSizeBytes, @JsonProperty("targetCompactionSizeBytes") @Nullable Long targetCompactionSizeBytes, @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment, - @JsonProperty("maxNumSegmentsToCompact") @Nullable Integer maxNumSegmentsToCompact, @JsonProperty("skipOffsetFromLatest") @Nullable Period skipOffsetFromLatest, @JsonProperty("tuningConfig") @Nullable UserCompactTuningConfig tuningConfig, @JsonProperty("taskContext") @Nullable Map taskContext @@ -81,17 +78,9 @@ public DataSourceCompactionConfig( tuningConfig ); this.maxRowsPerSegment = maxRowsPerSegment; - this.maxNumSegmentsToCompact = maxNumSegmentsToCompact == null - ? DEFAULT_NUM_INPUT_SEGMENTS - : maxNumSegmentsToCompact; this.skipOffsetFromLatest = skipOffsetFromLatest == null ? DEFAULT_SKIP_OFFSET_FROM_LATEST : skipOffsetFromLatest; this.tuningConfig = tuningConfig; this.taskContext = taskContext; - - Preconditions.checkArgument( - this.maxNumSegmentsToCompact > 1, - "numTargetCompactionSegments should be larger than 1" - ); } /** @@ -157,12 +146,6 @@ public long getInputSegmentSizeBytes() return inputSegmentSizeBytes; } - @JsonProperty - public int getMaxNumSegmentsToCompact() - { - return maxNumSegmentsToCompact; - } - @JsonProperty @Nullable public Long getTargetCompactionSizeBytes() @@ -209,7 +192,6 @@ public boolean equals(Object o) DataSourceCompactionConfig that = (DataSourceCompactionConfig) o; return taskPriority == that.taskPriority && inputSegmentSizeBytes == that.inputSegmentSizeBytes && - maxNumSegmentsToCompact == that.maxNumSegmentsToCompact && Objects.equals(dataSource, that.dataSource) && Objects.equals(targetCompactionSizeBytes, that.targetCompactionSizeBytes) && Objects.equals(skipOffsetFromLatest, that.skipOffsetFromLatest) && @@ -225,7 +207,6 @@ public int hashCode() taskPriority, inputSegmentSizeBytes, targetCompactionSizeBytes, - maxNumSegmentsToCompact, skipOffsetFromLatest, tuningConfig, taskContext diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java index f2a1a9fa51fd..04ae6576a515 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java @@ -28,8 +28,6 @@ import org.apache.druid.client.indexing.TaskPayloadResponse; import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.JodaUtils; -import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; import org.apache.druid.server.coordinator.CoordinatorStats; @@ -39,7 +37,6 @@ import org.apache.druid.timeline.VersionedIntervalTimeline; import org.joda.time.Interval; -import javax.annotation.Nullable; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -94,22 +91,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) } if (COMPACT_TASK_TYPE.equals(response.getPayload().getType())) { final ClientCompactQuery compactQuery = (ClientCompactQuery) response.getPayload(); - final Interval interval; - - if (compactQuery.getSegments() != null) { - interval = JodaUtils.umbrellaInterval( - compactQuery.getSegments() - .stream() - .map(DataSegment::getInterval) - .sorted(Comparators.intervalsByStartThenEnd()) - .collect(Collectors.toList()) - ); - } else if (compactQuery.getInterval() != null) { - interval = compactQuery.getInterval(); - } else { - throw new ISE("task[%s] has neither 'segments' nor 'interval'", status.getId()); - } - + final Interval interval = compactQuery.getIoConfig().getInputSpec().getInterval(); compactTaskIntervals.computeIfAbsent(status.getDataSource(), k -> new ArrayList<>()).add(interval); } else { throw new ISE("WTH? task[%s] is not a compactTask?", status.getId()); @@ -218,7 +200,6 @@ private CoordinatorStats makeStats(int numCompactionTasks, CompactionSegmentIter return stats; } - @Nullable public long getRemainingSegmentSizeBytes(String dataSource) { return remainingSegmentSizeBytes.getLong(dataSource); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java index e0eecf18ea90..06a5d6a60dba 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java @@ -247,19 +247,17 @@ private static SegmentsToCompact findSegmentsToCompact( { final long inputSegmentSize = config.getInputSegmentSizeBytes(); final @Nullable Long targetCompactionSizeBytes = config.getTargetCompactionSizeBytes(); - final int maxNumSegmentsToCompact = config.getMaxNumSegmentsToCompact(); // Finds segments to compact together while iterating timeline from latest to oldest while (compactibleTimelineObjectHolderCursor.hasNext()) { final SegmentsToCompact candidates = new SegmentsToCompact(compactibleTimelineObjectHolderCursor.next()); final boolean isCompactibleSize = candidates.getTotalSize() <= inputSegmentSize; - final boolean isCompactibleNum = candidates.getNumSegments() <= maxNumSegmentsToCompact; final boolean needsCompaction = SegmentCompactorUtil.needsCompaction( targetCompactionSizeBytes, candidates.segments ); - if (isCompactibleSize && isCompactibleNum && needsCompaction) { + if (isCompactibleSize && needsCompaction) { return candidates; } else { if (!isCompactibleSize) { @@ -272,18 +270,6 @@ private static SegmentsToCompact findSegmentsToCompact( inputSegmentSize ); } - if (!isCompactibleNum) { - log.warn( - "Number of segments[%d] for datasource[%s] and interval[%s] is larger than " - + "maxNumSegmentsToCompact[%d]. If you see lots of shards are being skipped due to too many " - + "segments, consider increasing 'numTargetCompactionSegments' and " - + "'druid.indexer.runner.maxZnodeBytes'. Continue to the next interval.", - candidates.getNumSegments(), - candidates.segments.get(0).getDataSource(), - candidates.segments.get(0).getInterval(), - maxNumSegmentsToCompact - ); - } if (!needsCompaction) { log.warn( "Size of most of segments[%s] is larger than targetCompactionSizeBytes[%s] " diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java index 45b3b5a82652..3081a0bffd0e 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java @@ -51,7 +51,6 @@ public void testSerdeBasic() throws IOException 500L, 100L, null, - 20, new Period(3600), null, ImmutableMap.of("key", "val") @@ -64,7 +63,6 @@ public void testSerdeBasic() throws IOException Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes()); Assert.assertEquals(config.getTargetCompactionSizeBytes(), fromJson.getTargetCompactionSizeBytes()); Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment()); - Assert.assertEquals(config.getMaxNumSegmentsToCompact(), fromJson.getMaxNumSegmentsToCompact()); Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest()); Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig()); Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext()); @@ -79,7 +77,6 @@ public void testSerdeWithMaxRowsPerSegment() throws IOException 500L, null, 30, - 20, new Period(3600), null, ImmutableMap.of("key", "val") @@ -92,7 +89,6 @@ public void testSerdeWithMaxRowsPerSegment() throws IOException Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes()); Assert.assertNull(fromJson.getTargetCompactionSizeBytes()); Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment()); - Assert.assertEquals(config.getMaxNumSegmentsToCompact(), fromJson.getMaxNumSegmentsToCompact()); Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest()); Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig()); Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext()); @@ -118,7 +114,6 @@ public void testSerdeWithMaxTotalRows() throws IOException 500L, null, null, - 20, new Period(3600), new UserCompactTuningConfig( null, @@ -138,7 +133,6 @@ public void testSerdeWithMaxTotalRows() throws IOException Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes()); Assert.assertNull(fromJson.getTargetCompactionSizeBytes()); Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment()); - Assert.assertEquals(config.getMaxNumSegmentsToCompact(), fromJson.getMaxNumSegmentsToCompact()); Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest()); Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig()); Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext()); @@ -157,7 +151,6 @@ public void testSerdeTargetCompactionSizeBytesWithMaxRowsPerSegment() 500L, 10000L, 1000, - 20, new Period(3600), null, ImmutableMap.of("key", "val") @@ -177,7 +170,6 @@ public void testSerdeTargetCompactionSizeBytesWithMaxTotalRows() 500L, 10000L, null, - 20, new Period(3600), new UserCompactTuningConfig( null, @@ -200,7 +192,6 @@ public void testSerdeMaxTotalRowsWithMaxRowsPerSegment() throws IOException 500L, null, 10000, - 20, new Period(3600), new UserCompactTuningConfig( null, @@ -221,7 +212,6 @@ public void testSerdeMaxTotalRowsWithMaxRowsPerSegment() throws IOException Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes()); Assert.assertNull(fromJson.getTargetCompactionSizeBytes()); Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment()); - Assert.assertEquals(config.getMaxNumSegmentsToCompact(), fromJson.getMaxNumSegmentsToCompact()); Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest()); Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig()); Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext()); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java index fcc1053b7de2..36e6a6957450 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java @@ -377,7 +377,6 @@ private static List createCompactionConfigs() 50L, 20L, null, - null, new Period("PT1H"), // smaller than segment interval null, null diff --git a/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java index 4ad8b0da8609..5e3e8b96e7d6 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java @@ -56,7 +56,7 @@ public void testLargeOffsetAndSmallSegmentInterval() { final Period segmentPeriod = new Period("PT1H"); final CompactionSegmentIterator iterator = policy.reset( - ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, 100, new Period("P2D"))), + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P2D"))), ImmutableMap.of( DATA_SOURCE, createTimeline( @@ -81,7 +81,7 @@ public void testSmallOffsetAndLargeSegmentInterval() { final Period segmentPeriod = new Period("PT1H"); final CompactionSegmentIterator iterator = policy.reset( - ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, 100, new Period("PT1M"))), + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1M"))), ImmutableMap.of( DATA_SOURCE, createTimeline( @@ -114,41 +114,7 @@ public void testLargeGapInData() { final Period segmentPeriod = new Period("PT1H"); final CompactionSegmentIterator iterator = policy.reset( - ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, 100, new Period("PT1H1M"))), - ImmutableMap.of( - DATA_SOURCE, - createTimeline( - new SegmentGenerateSpec(Intervals.of("2017-11-16T20:00:00/2017-11-17T04:00:00"), segmentPeriod), - // larger gap than SegmentCompactorUtil.LOOKUP_PERIOD (1 day) - new SegmentGenerateSpec(Intervals.of("2017-11-14T00:00:00/2017-11-15T07:00:00"), segmentPeriod) - ) - ), - Collections.emptyMap() - ); - - assertCompactSegmentIntervals( - iterator, - segmentPeriod, - Intervals.of("2017-11-16T20:00:00/2017-11-16T21:00:00"), - Intervals.of("2017-11-17T01:00:00/2017-11-17T02:00:00"), - false - ); - - assertCompactSegmentIntervals( - iterator, - segmentPeriod, - Intervals.of("2017-11-14T00:00:00/2017-11-14T01:00:00"), - Intervals.of("2017-11-15T06:00:00/2017-11-15T07:00:00"), - true - ); - } - - @Test - public void testSmallNumTargetCompactionSegments() - { - final Period segmentPeriod = new Period("PT1H"); - final CompactionSegmentIterator iterator = policy.reset( - ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, 5, new Period("PT1H1M"))), + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1H1M"))), ImmutableMap.of( DATA_SOURCE, createTimeline( @@ -181,7 +147,7 @@ public void testSmallNumTargetCompactionSegments() public void testHugeShard() { final CompactionSegmentIterator iterator = policy.reset( - ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, 100, new Period("P1D"))), + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P1D"))), ImmutableMap.of( DATA_SOURCE, createTimeline( @@ -231,7 +197,7 @@ public void testHugeShard() public void testManySegmentsPerShard() { final CompactionSegmentIterator iterator = policy.reset( - ImmutableMap.of(DATA_SOURCE, createCompactionConfig(800000, 100, new Period("P1D"))), + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(800000, new Period("P1D"))), ImmutableMap.of( DATA_SOURCE, createTimeline( @@ -283,70 +249,6 @@ public void testManySegmentsPerShard() Assert.assertEquals(Intervals.of("2017-12-03T11:00:00/2017-12-03T12:00:00"), lastInterval); } - @Test - public void testManySegmentsPerShard2() - { - final CompactionSegmentIterator iterator = policy.reset( - ImmutableMap.of(DATA_SOURCE, createCompactionConfig(800000, 100, new Period("P1D"))), - ImmutableMap.of( - DATA_SOURCE, - createTimeline( - new SegmentGenerateSpec( - Intervals.of("2017-12-04T11:00:00/2017-12-05T05:00:00"), - new Period("PT1H"), - 200, - 150 - ), - new SegmentGenerateSpec( - Intervals.of("2017-12-04T06:00:00/2017-12-04T11:00:00"), - new Period("PT1H"), - 375, - 80 - ), - new SegmentGenerateSpec( - Intervals.of("2017-12-03T18:00:00/2017-12-04T06:00:00"), - new Period("PT12H"), - 257000, - 1 - ), - new SegmentGenerateSpec( - Intervals.of("2017-12-03T11:00:00/2017-12-03T18:00:00"), - new Period("PT1H"), - 200, - 150 - ), - new SegmentGenerateSpec( - Intervals.of("2017-12-02T19:00:00/2017-12-03T11:00:00"), - new Period("PT16H"), - 257000, - 1 - ), - new SegmentGenerateSpec( - Intervals.of("2017-12-02T11:00:00/2017-12-02T19:00:00"), - new Period("PT1H"), - 200, - 150 - ), - new SegmentGenerateSpec( - Intervals.of("2017-12-01T18:00:00/2017-12-02T11:00:00"), - new Period("PT17H"), - 257000, - 1 - ), - new SegmentGenerateSpec( - Intervals.of("2017-12-01T09:00:00/2017-12-01T18:00:00"), - new Period("PT1H"), - 200, - 150 - ) - ) - ), - Collections.emptyMap() - ); - - Assert.assertFalse(iterator.hasNext()); - } - @Test public void testSkipUnknownDataSource() { @@ -355,9 +257,9 @@ public void testSkipUnknownDataSource() final CompactionSegmentIterator iterator = policy.reset( ImmutableMap.of( unknownDataSource, - createCompactionConfig(10000, 100, new Period("P2D")), + createCompactionConfig(10000, new Period("P2D")), DATA_SOURCE, - createCompactionConfig(10000, 100, new Period("P2D")) + createCompactionConfig(10000, new Period("P2D")) ), ImmutableMap.of( DATA_SOURCE, @@ -382,7 +284,7 @@ public void testSkipUnknownDataSource() public void testIgnoreSingleSegmentToCompact() { final CompactionSegmentIterator iterator = policy.reset( - ImmutableMap.of(DATA_SOURCE, createCompactionConfig(800000, 100, new Period("P1D"))), + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(800000, new Period("P1D"))), ImmutableMap.of( DATA_SOURCE, createTimeline( @@ -431,7 +333,7 @@ public void testClearSegmentsToCompactWhenSkippingSegments() ) ); final CompactionSegmentIterator iterator = policy.reset( - ImmutableMap.of(DATA_SOURCE, createCompactionConfig(maxSizeOfSegmentsToCompact, 100, new Period("P0D"))), + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(maxSizeOfSegmentsToCompact, new Period("P0D"))), ImmutableMap.of(DATA_SOURCE, timeline), Collections.emptyMap() ); @@ -461,7 +363,7 @@ public void testIfFirstSegmentIsInSkipOffset() ); final CompactionSegmentIterator iterator = policy.reset( - ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, 100, new Period("P1D"))), + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"))), ImmutableMap.of(DATA_SOURCE, timeline), Collections.emptyMap() ); @@ -482,7 +384,7 @@ public void testIfFirstSegmentOverlapsSkipOffset() ); final CompactionSegmentIterator iterator = policy.reset( - ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, 100, new Period("P1D"))), + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"))), ImmutableMap.of(DATA_SOURCE, timeline), Collections.emptyMap() ); @@ -495,7 +397,7 @@ public void testWithSkipIntervals() { final Period segmentPeriod = new Period("PT1H"); final CompactionSegmentIterator iterator = policy.reset( - ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, 100, new Period("P1D"))), + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P1D"))), ImmutableMap.of( DATA_SOURCE, createTimeline( @@ -535,7 +437,7 @@ public void testHoleInSearchInterval() { final Period segmentPeriod = new Period("PT1H"); final CompactionSegmentIterator iterator = policy.reset( - ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, 100, new Period("PT1H"))), + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1H"))), ImmutableMap.of( DATA_SOURCE, createTimeline( @@ -668,7 +570,6 @@ private static VersionedIntervalTimeline createTimeline( private DataSourceCompactionConfig createCompactionConfig( long targetCompactionSizeBytes, - int numTargetCompactionSegments, Period skipOffsetFromLatest ) { @@ -678,7 +579,6 @@ private DataSourceCompactionConfig createCompactionConfig( targetCompactionSizeBytes, targetCompactionSizeBytes, null, - numTargetCompactionSegments, skipOffsetFromLatest, null, null diff --git a/web-console/src/dialogs/compaction-dialog/__snapshots__/compaction-dialog.spec.tsx.snap b/web-console/src/dialogs/compaction-dialog/__snapshots__/compaction-dialog.spec.tsx.snap index fd23131e9b12..dd96dc149d2d 100644 --- a/web-console/src/dialogs/compaction-dialog/__snapshots__/compaction-dialog.spec.tsx.snap +++ b/web-console/src/dialogs/compaction-dialog/__snapshots__/compaction-dialog.spec.tsx.snap @@ -167,119 +167,6 @@ exports[`compaction dialog matches snapshot 1`] = ` -
- -
-
-
- -
-
- - -
-
-
-
diff --git a/web-console/src/dialogs/compaction-dialog/compaction-dialog.tsx b/web-console/src/dialogs/compaction-dialog/compaction-dialog.tsx index 5f4e824a6190..2d2fc50c5633 100644 --- a/web-console/src/dialogs/compaction-dialog/compaction-dialog.tsx +++ b/web-console/src/dialogs/compaction-dialog/compaction-dialog.tsx @@ -96,18 +96,6 @@ export class CompactionDialog extends React.PureComponent<

), }, - { - name: 'maxNumSegmentsToCompact', - type: 'number', - defaultValue: 150, - info: ( -

- Maximum number of segments to compact together per compaction task. Since a time - chunk must be processed in its entirety, if a time chunk has a total number of - segments greater than this parameter, compaction will not run for that time chunk. -

- ), - }, { name: 'skipOffsetFromLatest', type: 'string', diff --git a/website/.spelling b/website/.spelling index 0b875ed486ea..ee301c7d1b1e 100644 --- a/website/.spelling +++ b/website/.spelling @@ -803,6 +803,7 @@ listDelimiter timestampSpec - ../docs/ingestion/data-management.md 1GB +IOConfig compactionTask compactionTasks ingestSegmentFirehose