diff --git a/.idea/inspectionProfiles/Druid.xml b/.idea/inspectionProfiles/Druid.xml
index 0889de4cc67a..88d0e34a54c7 100644
--- a/.idea/inspectionProfiles/Druid.xml
+++ b/.idea/inspectionProfiles/Druid.xml
@@ -39,9 +39,13 @@
+
+
+
+
@@ -59,6 +63,7 @@
+
-
@@ -82,7 +86,6 @@
-
@@ -111,6 +114,24 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -154,6 +175,7 @@
+
@@ -177,105 +199,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -301,11 +224,46 @@
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
+
@@ -313,22 +271,23 @@
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
-
-
+
+
@@ -338,16 +297,10 @@
-
-
-
-
+
-
-
-
-
+
@@ -360,10 +313,16 @@
-
+
+
+
+
+
+
+
+
+
-
-
@@ -373,40 +332,103 @@
-
+
-
-
+
+
-
+
-
-
+
-
-
-
-
+
+
+
+
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -427,6 +449,7 @@
+
@@ -481,8 +504,8 @@
-
-
+
+
@@ -493,6 +516,7 @@
+
@@ -504,6 +528,7 @@
+
\ No newline at end of file
diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionTaskTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionTaskTest.java
index 84ee947c8467..ab9be6408032 100644
--- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionTaskTest.java
+++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionTaskTest.java
@@ -87,7 +87,7 @@ public class CompactionTaskTest extends CompactionTestBase
() -> TaskBuilder
.ofTypeCompact()
.context("storeCompactionState", true)
- .ioConfig(new CompactionIntervalSpec(Intervals.of("2013-08-31/2013-09-02"), null), false);
+ .ioConfig(new CompactionIntervalSpec(Intervals.of("2013-08-31/2013-09-02"), null, null), false);
private static final Supplier PARALLEL_COMPACTION_TASK =
() -> COMPACTION_TASK.get().tuningConfig(
t -> t.withPartitionsSpec(new HashedPartitionsSpec(null, null, null))
@@ -98,7 +98,7 @@ public class CompactionTaskTest extends CompactionTestBase
() -> TaskBuilder
.ofTypeCompact()
.context("storeCompactionState", true)
- .ioConfig(new CompactionIntervalSpec(Intervals.of("2013-08-31/2013-09-02"), null), true);
+ .ioConfig(new CompactionIntervalSpec(Intervals.of("2013-08-31/2013-09-02"), null, null), true);
private static final Supplier INDEX_TASK_WITH_TIMESTAMP =
() -> MoreResources.Task.INDEX_TASK_WITH_AGGREGATORS.get().dimensions(
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionInputSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionInputSpec.java
index d73a0ffe39a0..50b5f06a1cc0 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionInputSpec.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionInputSpec.java
@@ -51,4 +51,9 @@ public interface CompactionInputSpec
* @param latestSegments most recent published segments in the interval returned by {@link #findInterval}
*/
boolean validateSegments(LockGranularity lockGranularityInUse, List latestSegments);
+
+ /**
+ * Return the datasource to be used as input to the compaction task.
+ */
+ String getDataSource();
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIntervalSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIntervalSpec.java
index 40a4d775e031..1493e08facea 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIntervalSpec.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIntervalSpec.java
@@ -43,11 +43,14 @@ public class CompactionIntervalSpec implements CompactionInputSpec
private final Interval interval;
@Nullable
private final String sha256OfSortedSegmentIds;
+ @Nullable
+ private final String dataSource;
@JsonCreator
public CompactionIntervalSpec(
@JsonProperty("interval") Interval interval,
- @JsonProperty("sha256OfSortedSegmentIds") @Nullable String sha256OfSortedSegmentIds
+ @JsonProperty("sha256OfSortedSegmentIds") @Nullable String sha256OfSortedSegmentIds,
+ @JsonProperty("dataSource") @Nullable String dataSource
)
{
if (interval != null && interval.toDurationMillis() == 0) {
@@ -55,6 +58,7 @@ public CompactionIntervalSpec(
}
this.interval = interval;
this.sha256OfSortedSegmentIds = sha256OfSortedSegmentIds;
+ this.dataSource = dataSource;
}
@JsonProperty
@@ -70,6 +74,14 @@ public String getSha256OfSortedSegmentIds()
return sha256OfSortedSegmentIds;
}
+ @Override
+ @Nullable
+ @JsonProperty
+ public String getDataSource()
+ {
+ return dataSource;
+ }
+
@Override
public Interval findInterval(String dataSource)
{
@@ -97,21 +109,19 @@ public boolean validateSegments(LockGranularity lockGranularityInUse, List intervalDataSchemas = createDataSchemasForIntervals(
+ getDataSource(),
toolbox,
getTaskLockHelper().getLockGranularityToUse(),
segmentProvider,
@@ -548,6 +552,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
*/
@VisibleForTesting
static Map createDataSchemasForIntervals(
+ final String dataSource,
final TaskToolbox toolbox,
final LockGranularity lockGranularityInUse,
final SegmentProvider segmentProvider,
@@ -613,7 +618,7 @@ static Map createDataSchemasForIntervals(
final DataSchema dataSchema = createDataSchema(
toolbox.getEmitter(),
metricBuilder,
- segmentProvider.dataSource,
+ dataSource,
interval,
lazyFetchSegments(segmentsToCompact, toolbox.getSegmentCacheManager()),
dimensionsSpec,
@@ -633,7 +638,7 @@ static Map createDataSchemasForIntervals(
final DataSchema dataSchema = createDataSchema(
toolbox.getEmitter(),
metricBuilder,
- segmentProvider.dataSource,
+ dataSource,
JodaUtils.umbrellaInterval(
Iterables.transform(
timelineSegments,
@@ -1275,7 +1280,7 @@ public Builder(
public Builder interval(Interval interval)
{
- return inputSpec(new CompactionIntervalSpec(interval, null));
+ return inputSpec(new CompactionIntervalSpec(interval, null, null));
}
public Builder segments(List segments)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SpecificSegmentsSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SpecificSegmentsSpec.java
index 7b26c22a4a54..b5b77e9a8f6d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SpecificSegmentsSpec.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SpecificSegmentsSpec.java
@@ -28,6 +28,7 @@
import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;
+import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
@@ -39,18 +40,26 @@ public class SpecificSegmentsSpec implements CompactionInputSpec
private final List segments;
+ @Nullable
+ private final String dataSource;
+
public static SpecificSegmentsSpec fromSegments(List segments)
{
Preconditions.checkArgument(!segments.isEmpty(), "Empty segment list");
return new SpecificSegmentsSpec(
- segments.stream().map(segment -> segment.getId().toString()).collect(Collectors.toList())
+ segments.stream().map(segment -> segment.getId().toString()).collect(Collectors.toList()),
+ null
);
}
@JsonCreator
- public SpecificSegmentsSpec(@JsonProperty("segments") List segments)
+ public SpecificSegmentsSpec(
+ @JsonProperty("segments") List segments,
+ @JsonProperty("dataSource") @Nullable String dataSource
+ )
{
this.segments = segments;
+ this.dataSource = dataSource;
// Sort segments to use in validateSegments.
Collections.sort(this.segments);
}
@@ -61,6 +70,14 @@ public List getSegments()
return segments;
}
+ @Override
+ @Nullable
+ @JsonProperty
+ public String getDataSource()
+ {
+ return dataSource;
+ }
+
@Override
public Interval findInterval(String dataSource)
{
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
index 1c04c7b5bd2b..ecbee4ce9b96 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
@@ -341,7 +341,7 @@ private CompactionTask createCompactionTask(CompactionTransformSpec transformSpe
"datasource",
new SegmentCacheManagerFactory(TestIndex.INDEX_IO, MAPPER)
)
- .inputSpec(new CompactionIntervalSpec(Intervals.of("2019/2020"), "testSha256OfSortedSegmentIds"), true)
+ .inputSpec(new CompactionIntervalSpec(Intervals.of("2019/2020"), "testSha256OfSortedSegmentIds", null), true)
.tuningConfig(
TuningConfigBuilder
.forParallelIndexTask()
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionInputSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionInputSpecTest.java
index 8c0db006bbfb..8814cf9af087 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionInputSpecTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionInputSpecTest.java
@@ -53,12 +53,14 @@ public static Iterable