From 6e507b1f79118fc8700b65ba9260bb608620d493 Mon Sep 17 00:00:00 2001
From: Jihoon Son
Date: Mon, 23 Sep 2019 12:40:24 +0900
Subject: [PATCH 1/7] IOConfig for compaction task
---
.../NewestSegmentFirstPolicyBenchmark.java | 1 -
.../apache/druid/segment/SegmentUtils.java | 22 +++
.../common/task/CompactionIOConfig.java | 72 ++++++++++
.../common/task/CompactionInputSpec.java | 40 ++++++
.../common/task/CompactionIntervalSpec.java | 121 +++++++++++++++++
.../indexing/common/task/CompactionTask.java | 112 ++++++----------
.../common/task/SpecificSegmentsSpec.java | 111 +++++++++++++++
.../common/task/CompactionTaskTest.java | 34 ++---
.../client/indexing/ClientCompactQuery.java | 39 ++----
.../indexing/ClientCompactionIOConfig.java | 71 ++++++++++
.../indexing/ClientCompactionInputSpec.java | 34 +++++
.../ClientCompactionIntervalSpec.java | 114 ++++++++++++++++
.../indexing/HttpIndexingServiceClient.java | 3 +-
.../druid/client/indexing/IOConfig.java | 27 ++++
.../DataSourceCompactionConfig.java | 19 ---
.../DruidCoordinatorSegmentCompactor.java | 21 +--
.../helper/NewestSegmentFirstIterator.java | 16 +--
.../DataSourceCompactionConfigTest.java | 10 --
.../DruidCoordinatorSegmentCompactorTest.java | 1 -
.../helper/NewestSegmentFirstPolicyTest.java | 126 ++----------------
20 files changed, 693 insertions(+), 301 deletions(-)
create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java
create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionInputSpec.java
create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIntervalSpec.java
create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/common/task/SpecificSegmentsSpec.java
create mode 100644 server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java
create mode 100644 server/src/main/java/org/apache/druid/client/indexing/ClientCompactionInputSpec.java
create mode 100644 server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpec.java
create mode 100644 server/src/main/java/org/apache/druid/client/indexing/IOConfig.java
diff --git a/benchmarks/src/main/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java b/benchmarks/src/main/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
index dfef1283a2a3..40fa0c31a7b5 100644
--- a/benchmarks/src/main/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
+++ b/benchmarks/src/main/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
@@ -99,7 +99,6 @@ public void setup()
null,
null,
null,
- null,
null
)
);
diff --git a/core/src/main/java/org/apache/druid/segment/SegmentUtils.java b/core/src/main/java/org/apache/druid/segment/SegmentUtils.java
index 1f610651b8ac..4dff10c7dd0e 100644
--- a/core/src/main/java/org/apache/druid/segment/SegmentUtils.java
+++ b/core/src/main/java/org/apache/druid/segment/SegmentUtils.java
@@ -19,15 +19,23 @@
package org.apache.druid.segment;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
import com.google.common.io.Files;
import com.google.common.primitives.Ints;
import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.timeline.DataSegment;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.List;
/**
* Utility methods useful for implementing deep storage extensions.
@@ -35,6 +43,16 @@
@PublicApi
public class SegmentUtils
{
+ private static final HashFunction HASH_FUNCTION = Hashing.sha256();
+
+ public static String hashIds(List segments)
+ {
+ Collections.sort(segments);
+ final Hasher hasher = HASH_FUNCTION.newHasher();
+ segments.forEach(segment -> hasher.putString(segment.getId().toString(), StandardCharsets.UTF_8));
+ return StringUtils.fromUtf8(hasher.hash().asBytes());
+ }
+
public static int getVersionFromDir(File inDir) throws IOException
{
File versionFile = new File(inDir, "version.bin");
@@ -53,4 +71,8 @@ public static int getVersionFromDir(File inDir) throws IOException
throw new IOE("Invalid segment dir [%s]. Can't find either of version.bin or index.drd.", inDir);
}
+
+ private SegmentUtils()
+ {
+ }
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java
new file mode 100644
index 000000000000..075b95deaf13
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.druid.segment.indexing.IOConfig;
+
+import java.util.Objects;
+
+@JsonTypeName("compact")
+public class CompactionIOConfig implements IOConfig
+{
+ private final CompactionInputSpec inputSpec;
+
+ @JsonCreator
+ public CompactionIOConfig(@JsonProperty("inputSpec") CompactionInputSpec inputSpec)
+ {
+ this.inputSpec = inputSpec;
+ }
+
+ @JsonProperty
+ public CompactionInputSpec getInputSpec()
+ {
+ return inputSpec;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CompactionIOConfig that = (CompactionIOConfig) o;
+ return Objects.equals(inputSpec, that.inputSpec);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(inputSpec);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "CompactionIOConfig{" +
+ "inputSpec=" + inputSpec +
+ '}';
+ }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionInputSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionInputSpec.java
new file mode 100644
index 000000000000..7f0ed013b957
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionInputSpec.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+
+import java.util.List;
+
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonSubTypes(value = {
+ @Type(name = CompactionIntervalSpec.TYPE, value = CompactionIntervalSpec.class),
+ @Type(name = SpecificSegmentsSpec.TYPE, value = SpecificSegmentsSpec.class)
+})
+public interface CompactionInputSpec
+{
+ Interval findInterval(String dataSource);
+
+ boolean validateSegments(List segments);
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIntervalSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIntervalSpec.java
new file mode 100644
index 000000000000..09d49ae1c192
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIntervalSpec.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.JodaUtils;
+import org.apache.druid.segment.SegmentUtils;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public class CompactionIntervalSpec implements CompactionInputSpec
+{
+ public static final String TYPE = "interval";
+
+ private final Interval interval;
+ @Nullable
+ private final String sha256OfSortedSegmentIds;
+
+ @JsonCreator
+ public CompactionIntervalSpec(
+ @JsonProperty("interval") Interval interval,
+ @JsonProperty("sha256OfSortedSegmentIds") @Nullable String sha256OfSortedSegmentIds
+ )
+ {
+ if (interval != null && interval.toDurationMillis() == 0) {
+ throw new IAE("Interval[%s] is empty, must specify a nonempty interval", interval);
+ }
+ this.interval = interval;
+ this.sha256OfSortedSegmentIds = sha256OfSortedSegmentIds;
+ }
+
+ @JsonProperty
+ public Interval getInterval()
+ {
+ return interval;
+ }
+
+ @Nullable
+ @JsonProperty
+ public String getSha256OfSortedSegmentIds()
+ {
+ return sha256OfSortedSegmentIds;
+ }
+
+ @Override
+ public Interval findInterval(String dataSource)
+ {
+ return interval;
+ }
+
+ @Override
+ public boolean validateSegments(List segments)
+ {
+ final Interval segmentsInterval = JodaUtils.umbrellaInterval(
+ segments.stream().map(DataSegment::getInterval).collect(Collectors.toList())
+ );
+ if (interval.overlaps(segmentsInterval)) {
+ if (sha256OfSortedSegmentIds != null) {
+ final String hashOfThem = SegmentUtils.hashIds(segments);
+ return hashOfThem.equals(sha256OfSortedSegmentIds);
+ } else {
+ return true;
+ }
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CompactionIntervalSpec that = (CompactionIntervalSpec) o;
+ return Objects.equals(interval, that.interval) &&
+ Objects.equals(sha256OfSortedSegmentIds, that.sha256OfSortedSegmentIds);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(interval, sha256OfSortedSegmentIds);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "CompactionIntervalSpec{" +
+ "interval=" + interval +
+ ", sha256OfSegmentIds='" + sha256OfSortedSegmentIds + '\'' +
+ '}';
+ }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 0f0cfbbb4212..3861e73cdcb9 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -121,8 +121,7 @@ public class CompactionTask extends AbstractBatchIndexTask
private static final Logger log = new Logger(CompactionTask.class);
private static final String TYPE = "compact";
- private final Interval interval;
- private final List segments;
+ private final CompactionIOConfig ioConfig;
@Nullable
private final DimensionsSpec dimensionsSpec;
@Nullable
@@ -176,8 +175,9 @@ public CompactionTask(
@JsonProperty("id") final String id,
@JsonProperty("resource") final TaskResource taskResource,
@JsonProperty("dataSource") final String dataSource,
- @JsonProperty("interval") @Nullable final Interval interval,
- @JsonProperty("segments") @Nullable final List segments,
+ @JsonProperty("interval") @Deprecated @Nullable final Interval interval,
+ @JsonProperty("segments") @Deprecated @Nullable final List segments,
+ @JsonProperty("ioConfig") @Nullable CompactionIOConfig ioConfig,
@JsonProperty("dimensions") @Nullable final DimensionsSpec dimensions,
@JsonProperty("dimensionsSpec") @Nullable final DimensionsSpec dimensionsSpec,
@JsonProperty("metricsSpec") @Nullable final AggregatorFactory[] metricsSpec,
@@ -196,22 +196,25 @@ public CompactionTask(
)
{
super(getOrMakeId(id, TYPE, dataSource), null, taskResource, dataSource, context);
- Preconditions.checkArgument(interval != null || segments != null, "interval or segments should be specified");
- Preconditions.checkArgument(interval == null || segments == null, "one of interval and segments should be null");
- if (interval != null && interval.toDurationMillis() == 0) {
- throw new IAE("Interval[%s] is empty, must specify a nonempty interval", interval);
+ if (ioConfig != null) {
+ this.ioConfig = ioConfig;
+ } else {
+ if (interval != null) {
+ this.ioConfig = new CompactionIOConfig(new CompactionIntervalSpec(interval, null));
+ } else if (segments != null && !segments.isEmpty()) {
+ this.ioConfig = new CompactionIOConfig(SpecificSegmentsSpec.fromSegments(segments));
+ } else {
+ throw new IAE("Missing ioConfig");
+ }
}
-
- this.interval = interval;
- this.segments = segments;
this.dimensionsSpec = dimensionsSpec == null ? dimensions : dimensionsSpec;
this.metricsSpec = metricsSpec;
this.segmentGranularity = segmentGranularity;
this.targetCompactionSizeBytes = targetCompactionSizeBytes;
this.tuningConfig = tuningConfig;
this.jsonMapper = jsonMapper;
- this.segmentProvider = segments == null ? new SegmentProvider(dataSource, interval) : new SegmentProvider(segments);
+ this.segmentProvider = new SegmentProvider(dataSource, this.ioConfig.getInputSpec());
this.partitionConfigurationManager = new PartitionConfigurationManager(targetCompactionSizeBytes, tuningConfig);
this.authorizerMapper = authorizerMapper;
this.chatHandlerProvider = chatHandlerProvider;
@@ -223,15 +226,9 @@ public CompactionTask(
}
@JsonProperty
- public Interval getInterval()
+ public CompactionIOConfig getIoConfig()
{
- return interval;
- }
-
- @JsonProperty
- public List getSegments()
- {
- return segments;
+ return ioConfig;
}
@JsonProperty
@@ -343,7 +340,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
}
if (indexTaskSpecs.isEmpty()) {
- log.warn("Interval[%s] has no segments, nothing to do.", interval);
+ log.warn("Can't find segments from inputSpec[%s], nothing to do.", ioConfig.getInputSpec());
return TaskStatus.failure(getId());
} else {
registerResourceCloserOnAbnormalExit(currentSubTaskHolder);
@@ -758,36 +755,14 @@ private static DimensionSchema createDimensionSchema(
static class SegmentProvider
{
private final String dataSource;
+ private final CompactionInputSpec inputSpec;
private final Interval interval;
- @Nullable
- private final List segments;
- SegmentProvider(String dataSource, Interval interval)
+ SegmentProvider(String dataSource, CompactionInputSpec inputSpec)
{
this.dataSource = Preconditions.checkNotNull(dataSource);
- this.interval = Preconditions.checkNotNull(interval);
- this.segments = null;
- }
-
- SegmentProvider(List segments)
- {
- Preconditions.checkArgument(segments != null && !segments.isEmpty());
- final String dataSource = segments.get(0).getDataSource();
- Preconditions.checkArgument(
- segments.stream().allMatch(segment -> segment.getDataSource().equals(dataSource)),
- "segments should have the same dataSource"
- );
- this.dataSource = dataSource;
- this.segments = segments;
- this.interval = JodaUtils.umbrellaInterval(
- segments.stream().map(DataSegment::getInterval).collect(Collectors.toList())
- );
- }
-
- @Nullable
- List getSegments()
- {
- return segments;
+ this.inputSpec = inputSpec;
+ this.interval = inputSpec.findInterval(dataSource);
}
List checkAndGetSegments(TaskActionClient actionClient) throws IOException
@@ -804,24 +779,11 @@ List checkAndGetSegments(TaskActionClient actionClient) throws IOEx
.map(PartitionChunk::getObject)
.collect(Collectors.toList());
- if (segments != null) {
- Collections.sort(latestSegments);
- Collections.sort(segments);
-
- if (!latestSegments.equals(segments)) {
- final List unknownSegments = segments.stream()
- .filter(segment -> !latestSegments.contains(segment))
- .collect(Collectors.toList());
- final List missingSegments = latestSegments.stream()
- .filter(segment -> !segments.contains(segment))
- .collect(Collectors.toList());
- throw new ISE(
- "Specified segments in the spec are different from the current used segments. "
- + "There are unknown segments[%s] and missing segments[%s] in the spec.",
- unknownSegments,
- missingSegments
- );
- }
+ if (!inputSpec.validateSegments(latestSegments)) {
+ throw new ISE(
+ "Specified segments in the spec are different from the current used segments. "
+ + "Possibly new segments would have been added or some segments have been unpublished."
+ );
}
return latestSegments;
}
@@ -952,10 +914,7 @@ public static class Builder
private final RetryPolicyFactory retryPolicyFactory;
private final AppenderatorsManager appenderatorsManager;
- @Nullable
- private Interval interval;
- @Nullable
- private List segments;
+ private CompactionIOConfig ioConfig;
@Nullable
private DimensionsSpec dimensionsSpec;
@Nullable
@@ -994,13 +953,17 @@ public Builder(
public Builder interval(Interval interval)
{
- this.interval = interval;
- return this;
+ return inputSpec(new CompactionIntervalSpec(interval, null));
}
public Builder segments(List segments)
{
- this.segments = segments;
+ return inputSpec(SpecificSegmentsSpec.fromSegments(segments));
+ }
+
+ public Builder inputSpec(CompactionInputSpec inputSpec)
+ {
+ this.ioConfig = new CompactionIOConfig(inputSpec);
return this;
}
@@ -1046,8 +1009,9 @@ public CompactionTask build()
null,
null,
dataSource,
- interval,
- segments,
+ null,
+ null,
+ ioConfig,
null,
dimensionsSpec,
metricsSpec,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SpecificSegmentsSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SpecificSegmentsSpec.java
new file mode 100644
index 000000000000..b424ea8bb18b
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SpecificSegmentsSpec.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.JodaUtils;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+import org.joda.time.Interval;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public class SpecificSegmentsSpec implements CompactionInputSpec
+{
+ public static final String TYPE = "segments";
+
+ private final List segments;
+
+ public static SpecificSegmentsSpec fromSegments(List segments)
+ {
+ Preconditions.checkArgument(!segments.isEmpty(), "Empty segment list");
+ return new SpecificSegmentsSpec(
+ segments.stream().map(segment -> segment.getId().toString()).collect(Collectors.toList())
+ );
+ }
+
+ @JsonCreator
+ public SpecificSegmentsSpec(@JsonProperty("segments") List segments)
+ {
+ this.segments = segments;
+ }
+
+ @JsonProperty
+ public List getSegments()
+ {
+ return segments;
+ }
+
+ @Override
+ public Interval findInterval(String dataSource)
+ {
+ final List segmentIds = segments
+ .stream()
+ .map(segment -> SegmentId.tryParse(dataSource, segment))
+ .collect(Collectors.toList());
+ return JodaUtils.umbrellaInterval(
+ segmentIds.stream().map(SegmentId::getInterval).collect(Collectors.toList())
+ );
+ }
+
+ @Override
+ public boolean validateSegments(List segments)
+ {
+ final List thoseSegments = segments
+ .stream()
+ .map(segment -> segment.getId().toString())
+ .collect(Collectors.toList());
+ Collections.sort(this.segments);
+ Collections.sort(thoseSegments);
+ return this.segments.equals(thoseSegments);
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SpecificSegmentsSpec that = (SpecificSegmentsSpec) o;
+ return Objects.equals(segments, that.segments);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(segments);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "SpecificSegmentsSpec{" +
+ "segments=" + segments +
+ '}';
+ }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index 5d6ec71f49d9..c58d327109d9 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -81,6 +81,7 @@
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.Metadata;
import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.SimpleQueryableIndex;
import org.apache.druid.segment.column.BaseColumn;
import org.apache.druid.segment.column.BitmapIndex;
@@ -336,7 +337,9 @@ public void testSerdeWithInterval() throws IOException
appenderatorsManager
);
final CompactionTask task = builder
- .interval(COMPACTION_INTERVAL)
+ .inputSpec(
+ new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS))
+ )
.tuningConfig(createTuningConfig())
.context(ImmutableMap.of("testKey", "testContext"))
.build();
@@ -410,10 +413,9 @@ private static void assertEquals(CompactionTask expected, CompactionTask actual)
{
Assert.assertEquals(expected.getType(), actual.getType());
Assert.assertEquals(expected.getDataSource(), actual.getDataSource());
- Assert.assertEquals(expected.getInterval(), actual.getInterval());
- Assert.assertEquals(expected.getSegments(), actual.getSegments());
+ Assert.assertEquals(expected.getIoConfig(), actual.getIoConfig());
Assert.assertEquals(expected.getDimensionsSpec(), actual.getDimensionsSpec());
- Assert.assertTrue(Arrays.equals(expected.getMetricsSpec(), actual.getMetricsSpec()));
+ Assert.assertArrayEquals(expected.getMetricsSpec(), actual.getMetricsSpec());
Assert.assertEquals(expected.getTargetCompactionSizeBytes(), actual.getTargetCompactionSizeBytes());
Assert.assertEquals(expected.getTuningConfig(), actual.getTuningConfig());
Assert.assertEquals(expected.getContext(), actual.getContext());
@@ -424,7 +426,7 @@ public void testCreateIngestionSchema() throws IOException, SegmentLoadingExcept
{
final List ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
- new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
+ new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(null, TUNING_CONFIG),
null,
null,
@@ -484,7 +486,7 @@ public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOExceptio
);
final List ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
- new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
+ new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(null, tuningConfig),
null,
null,
@@ -545,7 +547,7 @@ public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException, Segm
);
final List ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
- new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
+ new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(null, tuningConfig),
null,
null,
@@ -606,7 +608,7 @@ public void testCreateIngestionSchemaWithNumShards() throws IOException, Segment
);
final List ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
- new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
+ new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(null, tuningConfig),
null,
null,
@@ -667,7 +669,7 @@ public void testCreateIngestionSchemaWithCustomDimensionsSpec() throws IOExcepti
final List ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
- new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
+ new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(null, TUNING_CONFIG),
customSpec,
null,
@@ -708,7 +710,7 @@ public void testCreateIngestionSchemaWithCustomMetricsSpec() throws IOException,
final List ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
- new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
+ new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(null, TUNING_CONFIG),
null,
customMetricsSpec,
@@ -742,7 +744,7 @@ public void testCreateIngestionSchemaWithCustomSegments() throws IOException, Se
{
final List ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
- new SegmentProvider(SEGMENTS),
+ new SegmentProvider(DATA_SOURCE, SpecificSegmentsSpec.fromSegments(SEGMENTS)),
new PartitionConfigurationManager(null, TUNING_CONFIG),
null,
null,
@@ -782,7 +784,7 @@ public void testCreateIngestionSchemaWithDifferentSegmentSet() throws IOExceptio
segments.remove(segments.size() / 2);
CompactionTask.createIngestionSchema(
toolbox,
- new SegmentProvider(segments),
+ new SegmentProvider(DATA_SOURCE, SpecificSegmentsSpec.fromSegments(segments)),
new PartitionConfigurationManager(null, TUNING_CONFIG),
null,
null,
@@ -805,7 +807,7 @@ public void testMissingMetadata() throws IOException, SegmentLoadingException
final List segments = new ArrayList<>(SEGMENTS);
CompactionTask.createIngestionSchema(
toolbox,
- new SegmentProvider(segments),
+ new SegmentProvider(DATA_SOURCE, SpecificSegmentsSpec.fromSegments(segments)),
new PartitionConfigurationManager(null, TUNING_CONFIG),
null,
null,
@@ -874,7 +876,7 @@ public void testTargetPartitionSizeWithPartitionConfig() throws IOException, Seg
expectedException.expectMessage("targetCompactionSizeBytes[6] cannot be used with");
final List ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
- new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
+ new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(6L, tuningConfig),
null,
null,
@@ -891,7 +893,7 @@ public void testSegmentGranularity() throws IOException, SegmentLoadingException
{
final List ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
- new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
+ new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(null, TUNING_CONFIG),
null,
null,
@@ -926,7 +928,7 @@ public void testNullSegmentGranularityAnd() throws IOException, SegmentLoadingEx
{
final List ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
- new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
+ new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(null, TUNING_CONFIG),
null,
null,
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java
index e29cd60ed272..887d4b86e583 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java
@@ -21,11 +21,8 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.druid.timeline.DataSegment;
-import org.joda.time.Interval;
import javax.annotation.Nullable;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -36,8 +33,7 @@
public class ClientCompactQuery implements ClientQuery
{
private final String dataSource;
- private final List segments;
- private final Interval interval;
+ private final ClientCompactionIOConfig ioConfig;
@Nullable
private final Long targetCompactionSizeBytes;
private final ClientCompactQueryTuningConfig tuningConfig;
@@ -46,16 +42,14 @@ public class ClientCompactQuery implements ClientQuery
@JsonCreator
public ClientCompactQuery(
@JsonProperty("dataSource") String dataSource,
- @JsonProperty("interval") @Nullable final Interval interval,
- @JsonProperty("segments") @Nullable final List segments,
+ @JsonProperty("ioConfig") ClientCompactionIOConfig ioConfig,
@JsonProperty("targetCompactionSizeBytes") @Nullable Long targetCompactionSizeBytes,
@JsonProperty("tuningConfig") ClientCompactQueryTuningConfig tuningConfig,
@JsonProperty("context") Map context
)
{
this.dataSource = dataSource;
- this.segments = segments;
- this.interval = interval;
+ this.ioConfig = ioConfig;
this.targetCompactionSizeBytes = targetCompactionSizeBytes;
this.tuningConfig = tuningConfig;
this.context = context;
@@ -76,15 +70,9 @@ public String getDataSource()
}
@JsonProperty
- public List getSegments()
+ public ClientCompactionIOConfig getIoConfig()
{
- return segments;
- }
-
- @JsonProperty
- public Interval getInterval()
- {
- return interval;
+ return ioConfig;
}
@JsonProperty
@@ -117,8 +105,7 @@ public boolean equals(Object o)
}
ClientCompactQuery that = (ClientCompactQuery) o;
return Objects.equals(dataSource, that.dataSource) &&
- Objects.equals(segments, that.segments) &&
- Objects.equals(interval, that.interval) &&
+ Objects.equals(ioConfig, that.ioConfig) &&
Objects.equals(targetCompactionSizeBytes, that.targetCompactionSizeBytes) &&
Objects.equals(tuningConfig, that.tuningConfig) &&
Objects.equals(context, that.context);
@@ -127,23 +114,15 @@ public boolean equals(Object o)
@Override
public int hashCode()
{
- return Objects.hash(
- dataSource,
- segments,
- interval,
- targetCompactionSizeBytes,
- tuningConfig,
- context
- );
+ return Objects.hash(dataSource, ioConfig, targetCompactionSizeBytes, tuningConfig, context);
}
@Override
public String toString()
{
- return getClass().getSimpleName() + "{" +
+ return "ClientCompactQuery{" +
"dataSource='" + dataSource + '\'' +
- ", segments=" + segments +
- ", interval=" + interval +
+ ", ioConfig=" + ioConfig +
", targetCompactionSizeBytes=" + targetCompactionSizeBytes +
", tuningConfig=" + tuningConfig +
", context=" + context +
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java
new file mode 100644
index 000000000000..c23c884826a0
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client.indexing;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+import java.util.Objects;
+
+@JsonTypeName("compact")
+public class ClientCompactionIOConfig implements IOConfig
+{
+ private final ClientCompactionInputSpec inputSpec;
+
+ @JsonCreator
+ public ClientCompactionIOConfig(@JsonProperty("inputSpec") ClientCompactionInputSpec inputSpec)
+ {
+ this.inputSpec = inputSpec;
+ }
+
+ @JsonProperty
+ public ClientCompactionInputSpec getInputSpec()
+ {
+ return inputSpec;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ClientCompactionIOConfig that = (ClientCompactionIOConfig) o;
+ return Objects.equals(inputSpec, that.inputSpec);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(inputSpec);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ClientCompactionIOConfig{" +
+ "inputSpec=" + inputSpec +
+ '}';
+ }
+}
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionInputSpec.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionInputSpec.java
new file mode 100644
index 000000000000..3294aaa42342
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionInputSpec.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client.indexing;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.joda.time.Interval;
+
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonSubTypes(value = {
+ @Type(name = ClientCompactionIntervalSpec.TYPE, value = ClientCompactionIntervalSpec.class)
+})
+public interface ClientCompactionInputSpec
+{
+ Interval findInterval(String dataSource);
+}
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpec.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpec.java
new file mode 100644
index 000000000000..853ac4312815
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpec.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client.indexing;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.JodaUtils;
+import org.apache.druid.segment.SegmentUtils;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public class ClientCompactionIntervalSpec implements ClientCompactionInputSpec
+{
+ public static final String TYPE = "interval";
+
+
+ private final Interval interval;
+ @Nullable
+ private final String sha256OfSortedSegmentIds;
+
+ public static ClientCompactionIntervalSpec fromSegments(List segments)
+ {
+ return new ClientCompactionIntervalSpec(
+ JodaUtils.umbrellaInterval(
+ segments.stream().map(DataSegment::getInterval).collect(Collectors.toList())
+ ),
+ SegmentUtils.hashIds(segments)
+ );
+ }
+
+ @JsonCreator
+ public ClientCompactionIntervalSpec(
+ @JsonProperty("interval") Interval interval,
+ @JsonProperty("sha256OfSortedSegmentIds") @Nullable String sha256OfSortedSegmentIds
+ )
+ {
+ if (interval != null && interval.toDurationMillis() == 0) {
+ throw new IAE("Interval[%s] is empty, must specify a nonempty interval", interval);
+ }
+ this.interval = interval;
+ this.sha256OfSortedSegmentIds = sha256OfSortedSegmentIds;
+ }
+
+ @JsonProperty
+ public Interval getInterval()
+ {
+ return interval;
+ }
+
+ @Nullable
+ @JsonProperty
+ public String getSha256OfSortedSegmentIds()
+ {
+ return sha256OfSortedSegmentIds;
+ }
+
+ @Override
+ public Interval findInterval(String dataSource)
+ {
+ return interval;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ClientCompactionIntervalSpec that = (ClientCompactionIntervalSpec) o;
+ return Objects.equals(interval, that.interval) &&
+ Objects.equals(sha256OfSortedSegmentIds, that.sha256OfSortedSegmentIds);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(interval, sha256OfSortedSegmentIds);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ClientCompactionIntervalSpec{" +
+ "interval=" + interval +
+ ", sha256OfSortedSegmentIds='" + sha256OfSortedSegmentIds + '\'' +
+ '}';
+ }
+}
diff --git a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
index de590a2a85ca..0efb9c97c840 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
@@ -93,8 +93,7 @@ public String compactSegments(
return runTask(
new ClientCompactQuery(
dataSource,
- null,
- segments,
+ new ClientCompactionIOConfig(ClientCompactionIntervalSpec.fromSegments(segments)),
targetCompactionSizeBytes,
tuningConfig,
context
diff --git a/server/src/main/java/org/apache/druid/client/indexing/IOConfig.java b/server/src/main/java/org/apache/druid/client/indexing/IOConfig.java
new file mode 100644
index 000000000000..5a17056ed84b
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/client/indexing/IOConfig.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client.indexing;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+public interface IOConfig
+{
+}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
index ebf4da63da2d..9c42a11b073b 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
@@ -38,7 +38,6 @@ public class DataSourceCompactionConfig
// should be synchronized with Tasks.DEFAULT_MERGE_TASK_PRIORITY
private static final int DEFAULT_COMPACTION_TASK_PRIORITY = 25;
private static final long DEFAULT_INPUT_SEGMENT_SIZE_BYTES = 400 * 1024 * 1024;
- private static final int DEFAULT_NUM_INPUT_SEGMENTS = 150;
private static final Period DEFAULT_SKIP_OFFSET_FROM_LATEST = new Period("P1D");
private final String dataSource;
@@ -50,7 +49,6 @@ public class DataSourceCompactionConfig
// RemoteTaskRunnerConfig.maxZnodeBytes.
@Nullable
private final Integer maxRowsPerSegment;
- private final int maxNumSegmentsToCompact;
private final Period skipOffsetFromLatest;
private final UserCompactTuningConfig tuningConfig;
private final Map taskContext;
@@ -62,7 +60,6 @@ public DataSourceCompactionConfig(
@JsonProperty("inputSegmentSizeBytes") @Nullable Long inputSegmentSizeBytes,
@JsonProperty("targetCompactionSizeBytes") @Nullable Long targetCompactionSizeBytes,
@JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
- @JsonProperty("maxNumSegmentsToCompact") @Nullable Integer maxNumSegmentsToCompact,
@JsonProperty("skipOffsetFromLatest") @Nullable Period skipOffsetFromLatest,
@JsonProperty("tuningConfig") @Nullable UserCompactTuningConfig tuningConfig,
@JsonProperty("taskContext") @Nullable Map taskContext
@@ -81,17 +78,9 @@ public DataSourceCompactionConfig(
tuningConfig
);
this.maxRowsPerSegment = maxRowsPerSegment;
- this.maxNumSegmentsToCompact = maxNumSegmentsToCompact == null
- ? DEFAULT_NUM_INPUT_SEGMENTS
- : maxNumSegmentsToCompact;
this.skipOffsetFromLatest = skipOffsetFromLatest == null ? DEFAULT_SKIP_OFFSET_FROM_LATEST : skipOffsetFromLatest;
this.tuningConfig = tuningConfig;
this.taskContext = taskContext;
-
- Preconditions.checkArgument(
- this.maxNumSegmentsToCompact > 1,
- "numTargetCompactionSegments should be larger than 1"
- );
}
/**
@@ -157,12 +146,6 @@ public long getInputSegmentSizeBytes()
return inputSegmentSizeBytes;
}
- @JsonProperty
- public int getMaxNumSegmentsToCompact()
- {
- return maxNumSegmentsToCompact;
- }
-
@JsonProperty
@Nullable
public Long getTargetCompactionSizeBytes()
@@ -209,7 +192,6 @@ public boolean equals(Object o)
DataSourceCompactionConfig that = (DataSourceCompactionConfig) o;
return taskPriority == that.taskPriority &&
inputSegmentSizeBytes == that.inputSegmentSizeBytes &&
- maxNumSegmentsToCompact == that.maxNumSegmentsToCompact &&
Objects.equals(dataSource, that.dataSource) &&
Objects.equals(targetCompactionSizeBytes, that.targetCompactionSizeBytes) &&
Objects.equals(skipOffsetFromLatest, that.skipOffsetFromLatest) &&
@@ -225,7 +207,6 @@ public int hashCode()
taskPriority,
inputSegmentSizeBytes,
targetCompactionSizeBytes,
- maxNumSegmentsToCompact,
skipOffsetFromLatest,
tuningConfig,
taskContext
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java
index f2a1a9fa51fd..5537295091df 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java
@@ -28,8 +28,6 @@
import org.apache.druid.client.indexing.TaskPayloadResponse;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.JodaUtils;
-import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.CoordinatorStats;
@@ -39,7 +37,6 @@
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.joda.time.Interval;
-import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -94,22 +91,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
}
if (COMPACT_TASK_TYPE.equals(response.getPayload().getType())) {
final ClientCompactQuery compactQuery = (ClientCompactQuery) response.getPayload();
- final Interval interval;
-
- if (compactQuery.getSegments() != null) {
- interval = JodaUtils.umbrellaInterval(
- compactQuery.getSegments()
- .stream()
- .map(DataSegment::getInterval)
- .sorted(Comparators.intervalsByStartThenEnd())
- .collect(Collectors.toList())
- );
- } else if (compactQuery.getInterval() != null) {
- interval = compactQuery.getInterval();
- } else {
- throw new ISE("task[%s] has neither 'segments' nor 'interval'", status.getId());
- }
-
+ final Interval interval = compactQuery.getIoConfig().getInputSpec().findInterval(status.getDataSource());
compactTaskIntervals.computeIfAbsent(status.getDataSource(), k -> new ArrayList<>()).add(interval);
} else {
throw new ISE("WTH? task[%s] is not a compactTask?", status.getId());
@@ -218,7 +200,6 @@ private CoordinatorStats makeStats(int numCompactionTasks, CompactionSegmentIter
return stats;
}
- @Nullable
public long getRemainingSegmentSizeBytes(String dataSource)
{
return remainingSegmentSizeBytes.getLong(dataSource);
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java
index e0eecf18ea90..06a5d6a60dba 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java
@@ -247,19 +247,17 @@ private static SegmentsToCompact findSegmentsToCompact(
{
final long inputSegmentSize = config.getInputSegmentSizeBytes();
final @Nullable Long targetCompactionSizeBytes = config.getTargetCompactionSizeBytes();
- final int maxNumSegmentsToCompact = config.getMaxNumSegmentsToCompact();
// Finds segments to compact together while iterating timeline from latest to oldest
while (compactibleTimelineObjectHolderCursor.hasNext()) {
final SegmentsToCompact candidates = new SegmentsToCompact(compactibleTimelineObjectHolderCursor.next());
final boolean isCompactibleSize = candidates.getTotalSize() <= inputSegmentSize;
- final boolean isCompactibleNum = candidates.getNumSegments() <= maxNumSegmentsToCompact;
final boolean needsCompaction = SegmentCompactorUtil.needsCompaction(
targetCompactionSizeBytes,
candidates.segments
);
- if (isCompactibleSize && isCompactibleNum && needsCompaction) {
+ if (isCompactibleSize && needsCompaction) {
return candidates;
} else {
if (!isCompactibleSize) {
@@ -272,18 +270,6 @@ private static SegmentsToCompact findSegmentsToCompact(
inputSegmentSize
);
}
- if (!isCompactibleNum) {
- log.warn(
- "Number of segments[%d] for datasource[%s] and interval[%s] is larger than "
- + "maxNumSegmentsToCompact[%d]. If you see lots of shards are being skipped due to too many "
- + "segments, consider increasing 'numTargetCompactionSegments' and "
- + "'druid.indexer.runner.maxZnodeBytes'. Continue to the next interval.",
- candidates.getNumSegments(),
- candidates.segments.get(0).getDataSource(),
- candidates.segments.get(0).getInterval(),
- maxNumSegmentsToCompact
- );
- }
if (!needsCompaction) {
log.warn(
"Size of most of segments[%s] is larger than targetCompactionSizeBytes[%s] "
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
index 45b3b5a82652..3081a0bffd0e 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
@@ -51,7 +51,6 @@ public void testSerdeBasic() throws IOException
500L,
100L,
null,
- 20,
new Period(3600),
null,
ImmutableMap.of("key", "val")
@@ -64,7 +63,6 @@ public void testSerdeBasic() throws IOException
Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes());
Assert.assertEquals(config.getTargetCompactionSizeBytes(), fromJson.getTargetCompactionSizeBytes());
Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment());
- Assert.assertEquals(config.getMaxNumSegmentsToCompact(), fromJson.getMaxNumSegmentsToCompact());
Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest());
Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
@@ -79,7 +77,6 @@ public void testSerdeWithMaxRowsPerSegment() throws IOException
500L,
null,
30,
- 20,
new Period(3600),
null,
ImmutableMap.of("key", "val")
@@ -92,7 +89,6 @@ public void testSerdeWithMaxRowsPerSegment() throws IOException
Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes());
Assert.assertNull(fromJson.getTargetCompactionSizeBytes());
Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment());
- Assert.assertEquals(config.getMaxNumSegmentsToCompact(), fromJson.getMaxNumSegmentsToCompact());
Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest());
Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
@@ -118,7 +114,6 @@ public void testSerdeWithMaxTotalRows() throws IOException
500L,
null,
null,
- 20,
new Period(3600),
new UserCompactTuningConfig(
null,
@@ -138,7 +133,6 @@ public void testSerdeWithMaxTotalRows() throws IOException
Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes());
Assert.assertNull(fromJson.getTargetCompactionSizeBytes());
Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment());
- Assert.assertEquals(config.getMaxNumSegmentsToCompact(), fromJson.getMaxNumSegmentsToCompact());
Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest());
Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
@@ -157,7 +151,6 @@ public void testSerdeTargetCompactionSizeBytesWithMaxRowsPerSegment()
500L,
10000L,
1000,
- 20,
new Period(3600),
null,
ImmutableMap.of("key", "val")
@@ -177,7 +170,6 @@ public void testSerdeTargetCompactionSizeBytesWithMaxTotalRows()
500L,
10000L,
null,
- 20,
new Period(3600),
new UserCompactTuningConfig(
null,
@@ -200,7 +192,6 @@ public void testSerdeMaxTotalRowsWithMaxRowsPerSegment() throws IOException
500L,
null,
10000,
- 20,
new Period(3600),
new UserCompactTuningConfig(
null,
@@ -221,7 +212,6 @@ public void testSerdeMaxTotalRowsWithMaxRowsPerSegment() throws IOException
Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes());
Assert.assertNull(fromJson.getTargetCompactionSizeBytes());
Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment());
- Assert.assertEquals(config.getMaxNumSegmentsToCompact(), fromJson.getMaxNumSegmentsToCompact());
Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest());
Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java
index fcc1053b7de2..36e6a6957450 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java
@@ -377,7 +377,6 @@ private static List createCompactionConfigs()
50L,
20L,
null,
- null,
new Period("PT1H"), // smaller than segment interval
null,
null
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java
index 4ad8b0da8609..5e3e8b96e7d6 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java
@@ -56,7 +56,7 @@ public void testLargeOffsetAndSmallSegmentInterval()
{
final Period segmentPeriod = new Period("PT1H");
final CompactionSegmentIterator iterator = policy.reset(
- ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, 100, new Period("P2D"))),
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P2D"))),
ImmutableMap.of(
DATA_SOURCE,
createTimeline(
@@ -81,7 +81,7 @@ public void testSmallOffsetAndLargeSegmentInterval()
{
final Period segmentPeriod = new Period("PT1H");
final CompactionSegmentIterator iterator = policy.reset(
- ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, 100, new Period("PT1M"))),
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1M"))),
ImmutableMap.of(
DATA_SOURCE,
createTimeline(
@@ -114,41 +114,7 @@ public void testLargeGapInData()
{
final Period segmentPeriod = new Period("PT1H");
final CompactionSegmentIterator iterator = policy.reset(
- ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, 100, new Period("PT1H1M"))),
- ImmutableMap.of(
- DATA_SOURCE,
- createTimeline(
- new SegmentGenerateSpec(Intervals.of("2017-11-16T20:00:00/2017-11-17T04:00:00"), segmentPeriod),
- // larger gap than SegmentCompactorUtil.LOOKUP_PERIOD (1 day)
- new SegmentGenerateSpec(Intervals.of("2017-11-14T00:00:00/2017-11-15T07:00:00"), segmentPeriod)
- )
- ),
- Collections.emptyMap()
- );
-
- assertCompactSegmentIntervals(
- iterator,
- segmentPeriod,
- Intervals.of("2017-11-16T20:00:00/2017-11-16T21:00:00"),
- Intervals.of("2017-11-17T01:00:00/2017-11-17T02:00:00"),
- false
- );
-
- assertCompactSegmentIntervals(
- iterator,
- segmentPeriod,
- Intervals.of("2017-11-14T00:00:00/2017-11-14T01:00:00"),
- Intervals.of("2017-11-15T06:00:00/2017-11-15T07:00:00"),
- true
- );
- }
-
- @Test
- public void testSmallNumTargetCompactionSegments()
- {
- final Period segmentPeriod = new Period("PT1H");
- final CompactionSegmentIterator iterator = policy.reset(
- ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, 5, new Period("PT1H1M"))),
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1H1M"))),
ImmutableMap.of(
DATA_SOURCE,
createTimeline(
@@ -181,7 +147,7 @@ public void testSmallNumTargetCompactionSegments()
public void testHugeShard()
{
final CompactionSegmentIterator iterator = policy.reset(
- ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, 100, new Period("P1D"))),
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P1D"))),
ImmutableMap.of(
DATA_SOURCE,
createTimeline(
@@ -231,7 +197,7 @@ public void testHugeShard()
public void testManySegmentsPerShard()
{
final CompactionSegmentIterator iterator = policy.reset(
- ImmutableMap.of(DATA_SOURCE, createCompactionConfig(800000, 100, new Period("P1D"))),
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(800000, new Period("P1D"))),
ImmutableMap.of(
DATA_SOURCE,
createTimeline(
@@ -283,70 +249,6 @@ public void testManySegmentsPerShard()
Assert.assertEquals(Intervals.of("2017-12-03T11:00:00/2017-12-03T12:00:00"), lastInterval);
}
- @Test
- public void testManySegmentsPerShard2()
- {
- final CompactionSegmentIterator iterator = policy.reset(
- ImmutableMap.of(DATA_SOURCE, createCompactionConfig(800000, 100, new Period("P1D"))),
- ImmutableMap.of(
- DATA_SOURCE,
- createTimeline(
- new SegmentGenerateSpec(
- Intervals.of("2017-12-04T11:00:00/2017-12-05T05:00:00"),
- new Period("PT1H"),
- 200,
- 150
- ),
- new SegmentGenerateSpec(
- Intervals.of("2017-12-04T06:00:00/2017-12-04T11:00:00"),
- new Period("PT1H"),
- 375,
- 80
- ),
- new SegmentGenerateSpec(
- Intervals.of("2017-12-03T18:00:00/2017-12-04T06:00:00"),
- new Period("PT12H"),
- 257000,
- 1
- ),
- new SegmentGenerateSpec(
- Intervals.of("2017-12-03T11:00:00/2017-12-03T18:00:00"),
- new Period("PT1H"),
- 200,
- 150
- ),
- new SegmentGenerateSpec(
- Intervals.of("2017-12-02T19:00:00/2017-12-03T11:00:00"),
- new Period("PT16H"),
- 257000,
- 1
- ),
- new SegmentGenerateSpec(
- Intervals.of("2017-12-02T11:00:00/2017-12-02T19:00:00"),
- new Period("PT1H"),
- 200,
- 150
- ),
- new SegmentGenerateSpec(
- Intervals.of("2017-12-01T18:00:00/2017-12-02T11:00:00"),
- new Period("PT17H"),
- 257000,
- 1
- ),
- new SegmentGenerateSpec(
- Intervals.of("2017-12-01T09:00:00/2017-12-01T18:00:00"),
- new Period("PT1H"),
- 200,
- 150
- )
- )
- ),
- Collections.emptyMap()
- );
-
- Assert.assertFalse(iterator.hasNext());
- }
-
@Test
public void testSkipUnknownDataSource()
{
@@ -355,9 +257,9 @@ public void testSkipUnknownDataSource()
final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(
unknownDataSource,
- createCompactionConfig(10000, 100, new Period("P2D")),
+ createCompactionConfig(10000, new Period("P2D")),
DATA_SOURCE,
- createCompactionConfig(10000, 100, new Period("P2D"))
+ createCompactionConfig(10000, new Period("P2D"))
),
ImmutableMap.of(
DATA_SOURCE,
@@ -382,7 +284,7 @@ public void testSkipUnknownDataSource()
public void testIgnoreSingleSegmentToCompact()
{
final CompactionSegmentIterator iterator = policy.reset(
- ImmutableMap.of(DATA_SOURCE, createCompactionConfig(800000, 100, new Period("P1D"))),
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(800000, new Period("P1D"))),
ImmutableMap.of(
DATA_SOURCE,
createTimeline(
@@ -431,7 +333,7 @@ public void testClearSegmentsToCompactWhenSkippingSegments()
)
);
final CompactionSegmentIterator iterator = policy.reset(
- ImmutableMap.of(DATA_SOURCE, createCompactionConfig(maxSizeOfSegmentsToCompact, 100, new Period("P0D"))),
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(maxSizeOfSegmentsToCompact, new Period("P0D"))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
);
@@ -461,7 +363,7 @@ public void testIfFirstSegmentIsInSkipOffset()
);
final CompactionSegmentIterator iterator = policy.reset(
- ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, 100, new Period("P1D"))),
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
);
@@ -482,7 +384,7 @@ public void testIfFirstSegmentOverlapsSkipOffset()
);
final CompactionSegmentIterator iterator = policy.reset(
- ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, 100, new Period("P1D"))),
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
);
@@ -495,7 +397,7 @@ public void testWithSkipIntervals()
{
final Period segmentPeriod = new Period("PT1H");
final CompactionSegmentIterator iterator = policy.reset(
- ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, 100, new Period("P1D"))),
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P1D"))),
ImmutableMap.of(
DATA_SOURCE,
createTimeline(
@@ -535,7 +437,7 @@ public void testHoleInSearchInterval()
{
final Period segmentPeriod = new Period("PT1H");
final CompactionSegmentIterator iterator = policy.reset(
- ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, 100, new Period("PT1H"))),
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1H"))),
ImmutableMap.of(
DATA_SOURCE,
createTimeline(
@@ -668,7 +570,6 @@ private static VersionedIntervalTimeline createTimeline(
private DataSourceCompactionConfig createCompactionConfig(
long targetCompactionSizeBytes,
- int numTargetCompactionSegments,
Period skipOffsetFromLatest
)
{
@@ -678,7 +579,6 @@ private DataSourceCompactionConfig createCompactionConfig(
targetCompactionSizeBytes,
targetCompactionSizeBytes,
null,
- numTargetCompactionSegments,
skipOffsetFromLatest,
null,
null
From 07f8fe0b1b1f76e8df7580924f210f2e91270d3d Mon Sep 17 00:00:00 2001
From: Jihoon Son
Date: Mon, 23 Sep 2019 15:48:35 +0900
Subject: [PATCH 2/7] add javadoc, doc, unit test
---
.../apache/druid/segment/SegmentUtils.java | 3 +
docs/configuration/index.md | 1 -
docs/ingestion/data-management.md | 37 +++-
.../common/task/CompactionInputSpec.java | 14 +-
.../common/task/CompactionIntervalSpec.java | 9 +-
.../common/task/SpecificSegmentsSpec.java | 4 +-
.../task/ClientCompactQuerySerdeTest.java | 155 +++++++++++++++++
.../common/task/CompactionTaskTest.java | 162 +++++++++---------
.../indexing/ClientCompactionIOConfig.java | 18 +-
.../indexing/ClientCompactionInputSpec.java | 34 ----
.../ClientCompactionIntervalSpec.java | 17 +-
.../druid/client/indexing/IOConfig.java | 27 ---
.../DruidCoordinatorSegmentCompactor.java | 2 +-
.../compaction-dialog/compaction-dialog.tsx | 12 --
14 files changed, 316 insertions(+), 179 deletions(-)
create mode 100644 indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactQuerySerdeTest.java
delete mode 100644 server/src/main/java/org/apache/druid/client/indexing/ClientCompactionInputSpec.java
delete mode 100644 server/src/main/java/org/apache/druid/client/indexing/IOConfig.java
diff --git a/core/src/main/java/org/apache/druid/segment/SegmentUtils.java b/core/src/main/java/org/apache/druid/segment/SegmentUtils.java
index 4dff10c7dd0e..10d673002811 100644
--- a/core/src/main/java/org/apache/druid/segment/SegmentUtils.java
+++ b/core/src/main/java/org/apache/druid/segment/SegmentUtils.java
@@ -45,6 +45,9 @@ public class SegmentUtils
{
private static final HashFunction HASH_FUNCTION = Hashing.sha256();
+ /**
+ * Hash the IDs of the given segments based on SHA-256 algorithm.
+ */
public static String hashIds(List segments)
{
Collections.sort(segments);
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index f44d5aef9ac7..21bd60e927a5 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -788,7 +788,6 @@ A description of the compaction config is:
|`inputSegmentSizeBytes`|Maximum number of total segment bytes processed per compaction task. Since a time chunk must be processed in its entirety, if the segments for a particular time chunk have a total size in bytes greater than this parameter, compaction will not run for that time chunk. Because each compaction task runs with a single thread, setting this value too far above 1–2GB will result in compaction tasks taking an excessive amount of time.|no (default = 419430400)|
|`targetCompactionSizeBytes`|The target segment size, for each segment, after compaction. The actual sizes of compacted segments might be slightly larger or smaller than this value. Each compaction task may generate more than one output segment, and it will try to keep each output segment close to this configured size. This configuration cannot be used together with `maxRowsPerSegment`.|no (default = 419430400)|
|`maxRowsPerSegment`|Max number of rows per segment after compaction. This configuration cannot be used together with `targetCompactionSizeBytes`.|no|
-|`maxNumSegmentsToCompact`|Maximum number of segments to compact together per compaction task. Since a time chunk must be processed in its entirety, if a time chunk has a total number of segments greater than this parameter, compaction will not run for that time chunk.|no (default = 150)|
|`skipOffsetFromLatest`|The offset for searching segments to be compacted. Strongly recommended to set for realtime dataSources. |no (default = "P1D")|
|`tuningConfig`|Tuning config for compaction tasks. See below [Compaction Task TuningConfig](#compaction-tuningconfig).|no|
|`taskContext`|[Task context](../ingestion/tasks.html#context) for compaction tasks.|no|
diff --git a/docs/ingestion/data-management.md b/docs/ingestion/data-management.md
index cf22c04386e8..29b5815b71b7 100644
--- a/docs/ingestion/data-management.md
+++ b/docs/ingestion/data-management.md
@@ -99,7 +99,7 @@ Compaction tasks merge all segments of the given interval. The syntax is:
"type": "compact",
"id": ,
"dataSource": ,
- "interval": ,
+ "ioConfig": ,
"dimensions" ,
"segmentGranularity": ,
"targetCompactionSizeBytes":
@@ -113,7 +113,7 @@ Compaction tasks merge all segments of the given interval. The syntax is:
|`type`|Task type. Should be `compact`|Yes|
|`id`|Task id|No|
|`dataSource`|DataSource name to be compacted|Yes|
-|`interval`|Interval of segments to be compacted|Yes|
+|`ioConfig`|ioConfig for compaction task. See [Compaction IOConfig](#compaction-ioconfig) for details.|Yes|
|`dimensionsSpec`|Custom dimensionsSpec. Compaction task will use this dimensionsSpec if exist instead of generating one. See below for more details.|No|
|`metricsSpec`|Custom metricsSpec. Compaction task will use this metricsSpec if specified rather than generating one.|No|
|`segmentGranularity`|If this is set, compactionTask will change the segment granularity for the given interval. See `segmentGranularity` of [`granularitySpec`](index.md#granularityspec) for more details. See the below table for the behavior.|No|
@@ -128,7 +128,13 @@ An example of compaction task is
{
"type" : "compact",
"dataSource" : "wikipedia",
- "interval" : "2017-01-01/2018-01-01"
+ "ioConfig" : {
+ "type": "compact",
+ "inputSpec": {
+ "type": "interval",
+ "interval": "2017-01-01/2018-01-01"
+ }
+ }
}
```
@@ -158,6 +164,31 @@ See [Roll-up](../ingestion/index.html#rollup) for more details.
You can check that your segments are rolled up or not by using [Segment Metadata Queries](../querying/segmentmetadataquery.html#analysistypes).
+### Compaction IOConfig
+
+The compaction IOConfig requires to specify `inputSpec` as seen below.
+
+|Field|Description|Required|
+|-----|-----------|--------|
+|`type`|Task type. Should be `compact`|Yes|
+|`inputSpec`|Input specification|Yes|
+
+There are two supported `inputSpec`s for now.
+
+The interval `inputSpec` is:
+
+|Field|Description|Required|
+|-----|-----------|--------|
+|`type`|Task type. Should be `interval`|Yes|
+|`interval`|Interval to compact|Yes|
+
+The segments `inputSpec` is:
+
+|Field|Description|Required|
+|-----|-----------|--------|
+|`type`|Task type. Should be `segments`|Yes|
+|`segments`|A list of segment IDs|Yes|
+
## Adding new data
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionInputSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionInputSpec.java
index 7f0ed013b957..324f21240fc4 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionInputSpec.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionInputSpec.java
@@ -27,6 +27,9 @@
import java.util.List;
+/**
+ * Input specification for compaction task.
+ */
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@Type(name = CompactionIntervalSpec.TYPE, value = CompactionIntervalSpec.class),
@@ -34,7 +37,16 @@
})
public interface CompactionInputSpec
{
+ /**
+ * Find the umbrella interval containing the specified input.
+ */
Interval findInterval(String dataSource);
- boolean validateSegments(List segments);
+ /**
+ * Validate the specified input against the most recent published segments.
+ * This method is used to check whether the specified input has gone stale.
+ *
+ * @param latestSegments most recent published segments in the interval returned by {@link #findInterval}
+ */
+ boolean validateSegments(List latestSegments);
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIntervalSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIntervalSpec.java
index 09d49ae1c192..66304424edda 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIntervalSpec.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIntervalSpec.java
@@ -32,6 +32,9 @@
import java.util.Objects;
import java.util.stream.Collectors;
+/**
+ * Specifying an interval to compact. A hash of the segment IDs can be optionally provided for segment validation.
+ */
public class CompactionIntervalSpec implements CompactionInputSpec
{
public static final String TYPE = "interval";
@@ -73,14 +76,14 @@ public Interval findInterval(String dataSource)
}
@Override
- public boolean validateSegments(List segments)
+ public boolean validateSegments(List latestSegments)
{
final Interval segmentsInterval = JodaUtils.umbrellaInterval(
- segments.stream().map(DataSegment::getInterval).collect(Collectors.toList())
+ latestSegments.stream().map(DataSegment::getInterval).collect(Collectors.toList())
);
if (interval.overlaps(segmentsInterval)) {
if (sha256OfSortedSegmentIds != null) {
- final String hashOfThem = SegmentUtils.hashIds(segments);
+ final String hashOfThem = SegmentUtils.hashIds(latestSegments);
return hashOfThem.equals(sha256OfSortedSegmentIds);
} else {
return true;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SpecificSegmentsSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SpecificSegmentsSpec.java
index b424ea8bb18b..680372551c2d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SpecificSegmentsSpec.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SpecificSegmentsSpec.java
@@ -71,9 +71,9 @@ public Interval findInterval(String dataSource)
}
@Override
- public boolean validateSegments(List segments)
+ public boolean validateSegments(List latestSegments)
{
- final List thoseSegments = segments
+ final List thoseSegments = latestSegments
.stream()
.map(segment -> segment.getId().toString())
.collect(Collectors.toList());
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactQuerySerdeTest.java
new file mode 100644
index 000000000000..d85847060901
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactQuerySerdeTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.client.indexing.ClientCompactQuery;
+import org.apache.druid.client.indexing.ClientCompactQueryTuningConfig;
+import org.apache.druid.client.indexing.ClientCompactionIOConfig;
+import org.apache.druid.client.indexing.ClientCompactionIntervalSpec;
+import org.apache.druid.guice.GuiceAnnotationIntrospector;
+import org.apache.druid.guice.GuiceInjectableValues;
+import org.apache.druid.guice.GuiceInjectors;
+import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.TestUtils;
+import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.data.BitmapSerde.DefaultBitmapSerdeFactory;
+import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
+import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
+import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
+import org.apache.druid.server.security.AuthTestUtils;
+import org.apache.druid.server.security.AuthorizerMapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+public class ClientCompactQuerySerdeTest
+{
+ private static final RowIngestionMetersFactory ROW_INGESTION_METERS_FACTORY = new TestUtils()
+ .getRowIngestionMetersFactory();
+ private static final CoordinatorClient COORDINATOR_CLIENT = new CoordinatorClient(null, null);
+ private static final AppenderatorsManager APPENDERATORS_MANAGER = new TestAppenderatorsManager();
+
+ @Test
+ public void testSerde() throws IOException
+ {
+ final ObjectMapper mapper = setupInjectablesInObjectMapper(new DefaultObjectMapper());
+ final ClientCompactQuery query = new ClientCompactQuery(
+ "datasource",
+ new ClientCompactionIOConfig(
+ new ClientCompactionIntervalSpec(
+ Intervals.of("2019/2020"),
+ "testSha256OfSoredSegmentIds"
+ )
+ ),
+ null,
+ new ClientCompactQueryTuningConfig(
+ 100,
+ 40000,
+ 2000L,
+ 30000L,
+ new IndexSpec(
+ new DefaultBitmapSerdeFactory(),
+ CompressionStrategy.LZ4,
+ CompressionStrategy.LZF,
+ LongEncodingStrategy.LONGS
+ ),
+ null,
+ 1000L
+ ),
+ new HashMap<>()
+ );
+
+ final byte[] json = mapper.writeValueAsBytes(query);
+ final CompactionTask task = (CompactionTask) mapper.readValue(json, Task.class);
+
+ Assert.assertEquals(query.getDataSource(), task.getDataSource());
+ Assert.assertTrue(task.getIoConfig().getInputSpec() instanceof CompactionIntervalSpec);
+ Assert.assertEquals(
+ query.getIoConfig().getInputSpec().getInterval(),
+ ((CompactionIntervalSpec) task.getIoConfig().getInputSpec()).getInterval()
+ );
+ Assert.assertEquals(
+ query.getIoConfig().getInputSpec().getSha256OfSortedSegmentIds(),
+ ((CompactionIntervalSpec) task.getIoConfig().getInputSpec()).getSha256OfSortedSegmentIds()
+ );
+ Assert.assertEquals(query.getTargetCompactionSizeBytes(), task.getTargetCompactionSizeBytes());
+ Assert.assertEquals(
+ query.getTuningConfig().getMaxRowsInMemory().intValue(), task.getTuningConfig().getMaxRowsInMemory()
+ );
+ Assert.assertEquals(
+ query.getTuningConfig().getMaxBytesInMemory().longValue(), task.getTuningConfig().getMaxBytesInMemory()
+ );
+ Assert.assertEquals(
+ query.getTuningConfig().getMaxRowsPerSegment(), task.getTuningConfig().getMaxRowsPerSegment()
+ );
+ Assert.assertEquals(
+ query.getTuningConfig().getMaxTotalRows(), task.getTuningConfig().getMaxTotalRows()
+ );
+ Assert.assertEquals(
+ query.getTuningConfig().getIndexSpec(), task.getTuningConfig().getIndexSpec()
+ );
+ Assert.assertEquals(
+ query.getTuningConfig().getPushTimeout().longValue(), task.getTuningConfig().getPushTimeout()
+ );
+ Assert.assertEquals(query.getContext(), task.getContext());
+ }
+
+ private static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper objectMapper)
+ {
+ final GuiceAnnotationIntrospector guiceIntrospector = new GuiceAnnotationIntrospector();
+ objectMapper.setAnnotationIntrospectors(
+ new AnnotationIntrospectorPair(
+ guiceIntrospector,
+ objectMapper.getSerializationConfig().getAnnotationIntrospector()
+ ),
+ new AnnotationIntrospectorPair(
+ guiceIntrospector,
+ objectMapper.getDeserializationConfig().getAnnotationIntrospector()
+ )
+ );
+ GuiceInjectableValues injectableValues = new GuiceInjectableValues(
+ GuiceInjectors.makeStartupInjectorWithModules(
+ ImmutableList.of(
+ binder -> {
+ binder.bind(AuthorizerMapper.class).toInstance(AuthTestUtils.TEST_AUTHORIZER_MAPPER);
+ binder.bind(ChatHandlerProvider.class).toInstance(new NoopChatHandlerProvider());
+ binder.bind(RowIngestionMetersFactory.class).toInstance(ROW_INGESTION_METERS_FACTORY);
+ binder.bind(CoordinatorClient.class).toInstance(COORDINATOR_CLIENT);
+ binder.bind(SegmentLoaderFactory.class).toInstance(new SegmentLoaderFactory(null, objectMapper));
+ binder.bind(AppenderatorsManager.class).toInstance(APPENDERATORS_MANAGER);
+ }
+ )
+ )
+ );
+ objectMapper.setInjectableValues(injectableValues);
+ return objectMapper;
+ }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index c58d327109d9..fd201d933c67 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -151,15 +151,17 @@ public class CompactionTaskTest
private static final Map MIXED_TYPE_COLUMN_MAP = new HashMap<>();
private static final IndexTuningConfig TUNING_CONFIG = createTuningConfig();
+ private static final RowIngestionMetersFactory ROW_INGESTION_METERS_FACTORY = new TestUtils()
+ .getRowIngestionMetersFactory();
+ private static final Map SEGMENT_MAP = new HashMap<>();
+ private static final CoordinatorClient COORDINATOR_CLIENT = new TestCoordinatorClient(SEGMENT_MAP);
+ private static final AppenderatorsManager APPENDERATORS_MANAGER = new TestAppenderatorsManager();
+ private static final ObjectMapper OBJECT_MAPPER = setupInjectablesInObjectMapper(new DefaultObjectMapper());
+ private static final RetryPolicyFactory RETRY_POLICY_FACTORY = new RetryPolicyFactory(new RetryPolicyConfig());
+
private static Map DIMENSIONS;
private static List AGGREGATORS;
private static List SEGMENTS;
- private static RowIngestionMetersFactory rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
- private static Map segmentMap = new HashMap<>();
- private static CoordinatorClient coordinatorClient = new TestCoordinatorClient(segmentMap);
- private static AppenderatorsManager appenderatorsManager = new TestAppenderatorsManager();
- private static ObjectMapper objectMapper = setupInjectablesInObjectMapper(new DefaultObjectMapper());
- private static RetryPolicyFactory retryPolicyFactory = new RetryPolicyFactory(new RetryPolicyConfig());
private TaskToolbox toolbox;
private SegmentLoaderFactory segmentLoaderFactory;
@@ -208,7 +210,7 @@ public static void setupClass()
for (int i = 0; i < SEGMENT_INTERVALS.size(); i++) {
final Interval segmentInterval = Intervals.of(StringUtils.format("2017-0%d-01/2017-0%d-01", (i + 1), (i + 2)));
- segmentMap.put(
+ SEGMENT_MAP.put(
new DataSegment(
DATA_SOURCE,
segmentInterval,
@@ -223,7 +225,7 @@ public static void setupClass()
new File("file_" + i)
);
}
- SEGMENTS = new ArrayList<>(segmentMap.keySet());
+ SEGMENTS = new ArrayList<>(SEGMENT_MAP.keySet());
}
private static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper objectMapper)
@@ -245,10 +247,10 @@ private static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper objectMa
binder -> {
binder.bind(AuthorizerMapper.class).toInstance(AuthTestUtils.TEST_AUTHORIZER_MAPPER);
binder.bind(ChatHandlerProvider.class).toInstance(new NoopChatHandlerProvider());
- binder.bind(RowIngestionMetersFactory.class).toInstance(rowIngestionMetersFactory);
- binder.bind(CoordinatorClient.class).toInstance(coordinatorClient);
+ binder.bind(RowIngestionMetersFactory.class).toInstance(ROW_INGESTION_METERS_FACTORY);
+ binder.bind(CoordinatorClient.class).toInstance(COORDINATOR_CLIENT);
binder.bind(SegmentLoaderFactory.class).toInstance(new SegmentLoaderFactory(null, objectMapper));
- binder.bind(AppenderatorsManager.class).toInstance(appenderatorsManager);
+ binder.bind(AppenderatorsManager.class).toInstance(APPENDERATORS_MANAGER);
}
)
)
@@ -313,13 +315,13 @@ private static IndexTuningConfig createTuningConfig()
@Before
public void setup()
{
- final IndexIO testIndexIO = new TestIndexIO(objectMapper, segmentMap);
+ final IndexIO testIndexIO = new TestIndexIO(OBJECT_MAPPER, SEGMENT_MAP);
toolbox = new TestTaskToolbox(
- new TestTaskActionClient(new ArrayList<>(segmentMap.keySet())),
+ new TestTaskActionClient(new ArrayList<>(SEGMENT_MAP.keySet())),
testIndexIO,
- segmentMap
+ SEGMENT_MAP
);
- segmentLoaderFactory = new SegmentLoaderFactory(testIndexIO, objectMapper);
+ segmentLoaderFactory = new SegmentLoaderFactory(testIndexIO, OBJECT_MAPPER);
}
@Test
@@ -327,14 +329,14 @@ public void testSerdeWithInterval() throws IOException
{
final Builder builder = new Builder(
DATA_SOURCE,
- objectMapper,
+ OBJECT_MAPPER,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
- rowIngestionMetersFactory,
- coordinatorClient,
+ ROW_INGESTION_METERS_FACTORY,
+ COORDINATOR_CLIENT,
segmentLoaderFactory,
- retryPolicyFactory,
- appenderatorsManager
+ RETRY_POLICY_FACTORY,
+ APPENDERATORS_MANAGER
);
final CompactionTask task = builder
.inputSpec(
@@ -344,8 +346,8 @@ public void testSerdeWithInterval() throws IOException
.context(ImmutableMap.of("testKey", "testContext"))
.build();
- final byte[] bytes = objectMapper.writeValueAsBytes(task);
- final CompactionTask fromJson = objectMapper.readValue(bytes, CompactionTask.class);
+ final byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(task);
+ final CompactionTask fromJson = OBJECT_MAPPER.readValue(bytes, CompactionTask.class);
assertEquals(task, fromJson);
}
@@ -354,14 +356,14 @@ public void testSerdeWithSegments() throws IOException
{
final Builder builder = new Builder(
DATA_SOURCE,
- objectMapper,
+ OBJECT_MAPPER,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
- rowIngestionMetersFactory,
- coordinatorClient,
+ ROW_INGESTION_METERS_FACTORY,
+ COORDINATOR_CLIENT,
segmentLoaderFactory,
- retryPolicyFactory,
- appenderatorsManager
+ RETRY_POLICY_FACTORY,
+ APPENDERATORS_MANAGER
);
final CompactionTask task = builder
.segments(SEGMENTS)
@@ -369,8 +371,8 @@ public void testSerdeWithSegments() throws IOException
.context(ImmutableMap.of("testKey", "testContext"))
.build();
- final byte[] bytes = objectMapper.writeValueAsBytes(task);
- final CompactionTask fromJson = objectMapper.readValue(bytes, CompactionTask.class);
+ final byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(task);
+ final CompactionTask fromJson = OBJECT_MAPPER.readValue(bytes, CompactionTask.class);
assertEquals(task, fromJson);
}
@@ -379,14 +381,14 @@ public void testSerdeWithDimensions() throws IOException
{
final Builder builder = new Builder(
DATA_SOURCE,
- objectMapper,
+ OBJECT_MAPPER,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
- rowIngestionMetersFactory,
- coordinatorClient,
+ ROW_INGESTION_METERS_FACTORY,
+ COORDINATOR_CLIENT,
segmentLoaderFactory,
- retryPolicyFactory,
- appenderatorsManager
+ RETRY_POLICY_FACTORY,
+ APPENDERATORS_MANAGER
);
final CompactionTask task = builder
@@ -404,8 +406,8 @@ public void testSerdeWithDimensions() throws IOException
.context(ImmutableMap.of("testKey", "testVal"))
.build();
- final byte[] bytes = objectMapper.writeValueAsBytes(task);
- final CompactionTask fromJson = objectMapper.readValue(bytes, CompactionTask.class);
+ final byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(task);
+ final CompactionTask fromJson = OBJECT_MAPPER.readValue(bytes, CompactionTask.class);
assertEquals(task, fromJson);
}
@@ -431,10 +433,10 @@ public void testCreateIngestionSchema() throws IOException, SegmentLoadingExcept
null,
null,
null,
- objectMapper,
- coordinatorClient,
+ OBJECT_MAPPER,
+ COORDINATOR_CLIENT,
segmentLoaderFactory,
- retryPolicyFactory
+ RETRY_POLICY_FACTORY
);
final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
@@ -491,10 +493,10 @@ public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOExceptio
null,
null,
null,
- objectMapper,
- coordinatorClient,
+ OBJECT_MAPPER,
+ COORDINATOR_CLIENT,
segmentLoaderFactory,
- retryPolicyFactory
+ RETRY_POLICY_FACTORY
);
final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
@@ -552,10 +554,10 @@ public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException, Segm
null,
null,
null,
- objectMapper,
- coordinatorClient,
+ OBJECT_MAPPER,
+ COORDINATOR_CLIENT,
segmentLoaderFactory,
- retryPolicyFactory
+ RETRY_POLICY_FACTORY
);
final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
@@ -613,10 +615,10 @@ public void testCreateIngestionSchemaWithNumShards() throws IOException, Segment
null,
null,
null,
- objectMapper,
- coordinatorClient,
+ OBJECT_MAPPER,
+ COORDINATOR_CLIENT,
segmentLoaderFactory,
- retryPolicyFactory
+ RETRY_POLICY_FACTORY
);
final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
@@ -674,10 +676,10 @@ public void testCreateIngestionSchemaWithCustomDimensionsSpec() throws IOExcepti
customSpec,
null,
null,
- objectMapper,
- coordinatorClient,
+ OBJECT_MAPPER,
+ COORDINATOR_CLIENT,
segmentLoaderFactory,
- retryPolicyFactory
+ RETRY_POLICY_FACTORY
);
ingestionSpecs.sort(
@@ -715,10 +717,10 @@ public void testCreateIngestionSchemaWithCustomMetricsSpec() throws IOException,
null,
customMetricsSpec,
null,
- objectMapper,
- coordinatorClient,
+ OBJECT_MAPPER,
+ COORDINATOR_CLIENT,
segmentLoaderFactory,
- retryPolicyFactory
+ RETRY_POLICY_FACTORY
);
final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
@@ -749,10 +751,10 @@ public void testCreateIngestionSchemaWithCustomSegments() throws IOException, Se
null,
null,
null,
- objectMapper,
- coordinatorClient,
+ OBJECT_MAPPER,
+ COORDINATOR_CLIENT,
segmentLoaderFactory,
- retryPolicyFactory
+ RETRY_POLICY_FACTORY
);
final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
@@ -789,10 +791,10 @@ public void testCreateIngestionSchemaWithDifferentSegmentSet() throws IOExceptio
null,
null,
null,
- objectMapper,
- coordinatorClient,
+ OBJECT_MAPPER,
+ COORDINATOR_CLIENT,
segmentLoaderFactory,
- retryPolicyFactory
+ RETRY_POLICY_FACTORY
);
}
@@ -812,10 +814,10 @@ public void testMissingMetadata() throws IOException, SegmentLoadingException
null,
null,
null,
- objectMapper,
- coordinatorClient,
+ OBJECT_MAPPER,
+ COORDINATOR_CLIENT,
segmentLoaderFactory,
- retryPolicyFactory
+ RETRY_POLICY_FACTORY
);
}
@@ -827,14 +829,14 @@ public void testEmptyInterval()
final Builder builder = new Builder(
DATA_SOURCE,
- objectMapper,
+ OBJECT_MAPPER,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
- rowIngestionMetersFactory,
- coordinatorClient,
+ ROW_INGESTION_METERS_FACTORY,
+ COORDINATOR_CLIENT,
segmentLoaderFactory,
- retryPolicyFactory,
- appenderatorsManager
+ RETRY_POLICY_FACTORY,
+ APPENDERATORS_MANAGER
);
final CompactionTask task = builder
@@ -881,10 +883,10 @@ public void testTargetPartitionSizeWithPartitionConfig() throws IOException, Seg
null,
null,
null,
- objectMapper,
- coordinatorClient,
+ OBJECT_MAPPER,
+ COORDINATOR_CLIENT,
segmentLoaderFactory,
- retryPolicyFactory
+ RETRY_POLICY_FACTORY
);
}
@@ -898,10 +900,10 @@ public void testSegmentGranularity() throws IOException, SegmentLoadingException
null,
null,
new PeriodGranularity(Period.months(3), null, null),
- objectMapper,
- coordinatorClient,
+ OBJECT_MAPPER,
+ COORDINATOR_CLIENT,
segmentLoaderFactory,
- retryPolicyFactory
+ RETRY_POLICY_FACTORY
);
final List expectedDimensionsSpec = ImmutableList.of(
new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double")))
@@ -933,10 +935,10 @@ public void testNullSegmentGranularityAnd() throws IOException, SegmentLoadingEx
null,
null,
null,
- objectMapper,
- coordinatorClient,
+ OBJECT_MAPPER,
+ COORDINATOR_CLIENT,
segmentLoaderFactory,
- retryPolicyFactory
+ RETRY_POLICY_FACTORY
);
final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
@@ -964,7 +966,7 @@ public void testHugeTargetCompactionSize()
final Map queryableIndexMap = indexIO.getQueryableIndexMap();
final List> segments = new ArrayList<>();
- for (Entry entry : segmentMap.entrySet()) {
+ for (Entry entry : SEGMENT_MAP.entrySet()) {
final DataSegment segment = entry.getKey();
final File file = entry.getValue();
segments.add(Pair.of(Preconditions.checkNotNull(queryableIndexMap.get(file)), segment));
@@ -1089,7 +1091,7 @@ private void assertIngestionSchema(
final DataSchema dataSchema = ingestionSchema.getDataSchema();
Assert.assertEquals(DATA_SOURCE, dataSchema.getDataSource());
- final InputRowParser parser = objectMapper.convertValue(dataSchema.getParser(), InputRowParser.class);
+ final InputRowParser parser = OBJECT_MAPPER.convertValue(dataSchema.getParser(), InputRowParser.class);
Assert.assertTrue(parser instanceof TransformingInputRowParser);
Assert.assertTrue(((TransformingInputRowParser) parser).getParser() instanceof NoopInputRowParser);
Assert.assertTrue(parser.getParseSpec() instanceof TimeAndDimsParseSpec);
@@ -1183,7 +1185,7 @@ private static class TestTaskToolbox extends TaskToolbox
null,
null,
null,
- new IndexMergerV9(objectMapper, indexIO, OffHeapMemorySegmentWriteOutMediumFactory.instance()),
+ new IndexMergerV9(OBJECT_MAPPER, indexIO, OffHeapMemorySegmentWriteOutMediumFactory.instance()),
null,
null,
null,
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java
index c23c884826a0..0f42128c0fbf 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java
@@ -21,23 +21,29 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
import java.util.Objects;
-@JsonTypeName("compact")
-public class ClientCompactionIOConfig implements IOConfig
+public class ClientCompactionIOConfig
{
- private final ClientCompactionInputSpec inputSpec;
+ private static final String TYPE = "compact";
+
+ private final ClientCompactionIntervalSpec inputSpec;
@JsonCreator
- public ClientCompactionIOConfig(@JsonProperty("inputSpec") ClientCompactionInputSpec inputSpec)
+ public ClientCompactionIOConfig(@JsonProperty("inputSpec") ClientCompactionIntervalSpec inputSpec)
{
this.inputSpec = inputSpec;
}
@JsonProperty
- public ClientCompactionInputSpec getInputSpec()
+ public String getType()
+ {
+ return TYPE;
+ }
+
+ @JsonProperty
+ public ClientCompactionIntervalSpec getInputSpec()
{
return inputSpec;
}
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionInputSpec.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionInputSpec.java
deleted file mode 100644
index 3294aaa42342..000000000000
--- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionInputSpec.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.client.indexing;
-
-import com.fasterxml.jackson.annotation.JsonSubTypes;
-import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import org.joda.time.Interval;
-
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
-@JsonSubTypes(value = {
- @Type(name = ClientCompactionIntervalSpec.TYPE, value = ClientCompactionIntervalSpec.class)
-})
-public interface ClientCompactionInputSpec
-{
- Interval findInterval(String dataSource);
-}
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpec.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpec.java
index 853ac4312815..427c146c5bc5 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpec.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpec.java
@@ -32,10 +32,9 @@
import java.util.Objects;
import java.util.stream.Collectors;
-public class ClientCompactionIntervalSpec implements ClientCompactionInputSpec
+public class ClientCompactionIntervalSpec
{
- public static final String TYPE = "interval";
-
+ private static final String TYPE = "interval";
private final Interval interval;
@Nullable
@@ -64,6 +63,12 @@ public ClientCompactionIntervalSpec(
this.sha256OfSortedSegmentIds = sha256OfSortedSegmentIds;
}
+ @JsonProperty
+ public String getType()
+ {
+ return TYPE;
+ }
+
@JsonProperty
public Interval getInterval()
{
@@ -77,12 +82,6 @@ public String getSha256OfSortedSegmentIds()
return sha256OfSortedSegmentIds;
}
- @Override
- public Interval findInterval(String dataSource)
- {
- return interval;
- }
-
@Override
public boolean equals(Object o)
{
diff --git a/server/src/main/java/org/apache/druid/client/indexing/IOConfig.java b/server/src/main/java/org/apache/druid/client/indexing/IOConfig.java
deleted file mode 100644
index 5a17056ed84b..000000000000
--- a/server/src/main/java/org/apache/druid/client/indexing/IOConfig.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.client.indexing;
-
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
-public interface IOConfig
-{
-}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java
index 5537295091df..04ae6576a515 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java
@@ -91,7 +91,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
}
if (COMPACT_TASK_TYPE.equals(response.getPayload().getType())) {
final ClientCompactQuery compactQuery = (ClientCompactQuery) response.getPayload();
- final Interval interval = compactQuery.getIoConfig().getInputSpec().findInterval(status.getDataSource());
+ final Interval interval = compactQuery.getIoConfig().getInputSpec().getInterval();
compactTaskIntervals.computeIfAbsent(status.getDataSource(), k -> new ArrayList<>()).add(interval);
} else {
throw new ISE("WTH? task[%s] is not a compactTask?", status.getId());
diff --git a/web-console/src/dialogs/compaction-dialog/compaction-dialog.tsx b/web-console/src/dialogs/compaction-dialog/compaction-dialog.tsx
index 5f4e824a6190..2d2fc50c5633 100644
--- a/web-console/src/dialogs/compaction-dialog/compaction-dialog.tsx
+++ b/web-console/src/dialogs/compaction-dialog/compaction-dialog.tsx
@@ -96,18 +96,6 @@ export class CompactionDialog extends React.PureComponent<
),
},
- {
- name: 'maxNumSegmentsToCompact',
- type: 'number',
- defaultValue: 150,
- info: (
-
- Maximum number of segments to compact together per compaction task. Since a time
- chunk must be processed in its entirety, if a time chunk has a total number of
- segments greater than this parameter, compaction will not run for that time chunk.
-
- ),
- },
{
name: 'skipOffsetFromLatest',
type: 'string',
From bbfdd1dc1383c54bcbbe4f1462f26753815d2592 Mon Sep 17 00:00:00 2001
From: Jihoon Son
Date: Mon, 23 Sep 2019 20:41:42 +0900
Subject: [PATCH 3/7] fix webconsole test
---
.../compaction-dialog.spec.tsx.snap | 113 ------------------
1 file changed, 113 deletions(-)
diff --git a/web-console/src/dialogs/compaction-dialog/__snapshots__/compaction-dialog.spec.tsx.snap b/web-console/src/dialogs/compaction-dialog/__snapshots__/compaction-dialog.spec.tsx.snap
index fd23131e9b12..dd96dc149d2d 100644
--- a/web-console/src/dialogs/compaction-dialog/__snapshots__/compaction-dialog.spec.tsx.snap
+++ b/web-console/src/dialogs/compaction-dialog/__snapshots__/compaction-dialog.spec.tsx.snap
@@ -167,119 +167,6 @@ exports[`compaction dialog matches snapshot 1`] = `
-