Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
313 changes: 169 additions & 144 deletions .idea/inspectionProfiles/Druid.xml
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's avoid writing this if possible

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public class CompactionTaskTest extends CompactionTestBase
() -> TaskBuilder
.ofTypeCompact()
.context("storeCompactionState", true)
.ioConfig(new CompactionIntervalSpec(Intervals.of("2013-08-31/2013-09-02"), null), false);
.ioConfig(new CompactionIntervalSpec(Intervals.of("2013-08-31/2013-09-02"), null, null), false);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add tests where this isn't null

private static final Supplier<TaskBuilder.Compact> PARALLEL_COMPACTION_TASK =
() -> COMPACTION_TASK.get().tuningConfig(
t -> t.withPartitionsSpec(new HashedPartitionsSpec(null, null, null))
Expand All @@ -98,7 +98,7 @@ public class CompactionTaskTest extends CompactionTestBase
() -> TaskBuilder
.ofTypeCompact()
.context("storeCompactionState", true)
.ioConfig(new CompactionIntervalSpec(Intervals.of("2013-08-31/2013-09-02"), null), true);
.ioConfig(new CompactionIntervalSpec(Intervals.of("2013-08-31/2013-09-02"), null, null), true);

private static final Supplier<TaskBuilder.Index> INDEX_TASK_WITH_TIMESTAMP =
() -> MoreResources.Task.INDEX_TASK_WITH_AGGREGATORS.get().dimensions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,9 @@ public interface CompactionInputSpec
* @param latestSegments most recent published segments in the interval returned by {@link #findInterval}
*/
boolean validateSegments(LockGranularity lockGranularityInUse, List<DataSegment> latestSegments);

/**
* Return the datasource to be used as input to the compaction task.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Return the datasource to be used as input to the compaction task.
* Returns the name of the datasource to ingest the compacted segments to.

*/
String getDataSource();
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,22 @@ public class CompactionIntervalSpec implements CompactionInputSpec
private final Interval interval;
@Nullable
private final String sha256OfSortedSegmentIds;
@Nullable
private final String dataSource;

@JsonCreator
public CompactionIntervalSpec(
@JsonProperty("interval") Interval interval,
@JsonProperty("sha256OfSortedSegmentIds") @Nullable String sha256OfSortedSegmentIds
@JsonProperty("sha256OfSortedSegmentIds") @Nullable String sha256OfSortedSegmentIds,
@JsonProperty("dataSource") @Nullable String dataSource
)
{
if (interval != null && interval.toDurationMillis() == 0) {
throw new IAE("Interval[%s] is empty, must specify a nonempty interval", interval);
}
this.interval = interval;
this.sha256OfSortedSegmentIds = sha256OfSortedSegmentIds;
this.dataSource = dataSource;
}

@JsonProperty
Expand All @@ -70,6 +74,14 @@ public String getSha256OfSortedSegmentIds()
return sha256OfSortedSegmentIds;
}

@Override
@Nullable
@JsonProperty
public String getDataSource()
{
return dataSource;
}

@Override
public Interval findInterval(String dataSource)
{
Expand Down Expand Up @@ -97,29 +109,28 @@ public boolean validateSegments(LockGranularity lockGranularityInUse, List<DataS
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CompactionIntervalSpec that = (CompactionIntervalSpec) o;
return Objects.equals(interval, that.interval) &&
Objects.equals(sha256OfSortedSegmentIds, that.sha256OfSortedSegmentIds);
Objects.equals(sha256OfSortedSegmentIds, that.sha256OfSortedSegmentIds) &&
Objects.equals(dataSource, that.dataSource);
}

@Override
public int hashCode()
{
return Objects.hash(interval, sha256OfSortedSegmentIds);
return Objects.hash(interval, sha256OfSortedSegmentIds, dataSource);
}

@Override
public String toString()
{
return "CompactionIntervalSpec{" +
"interval=" + interval +
", sha256OfSegmentIds='" + sha256OfSortedSegmentIds + '\'' +
", sha256OfSortedSegmentIds='" + sha256OfSortedSegmentIds + '\'' +
", dataSource='" + dataSource + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public CompactionTask(
if (ioConfig != null) {
this.ioConfig = ioConfig;
} else if (interval != null) {
this.ioConfig = new CompactionIOConfig(new CompactionIntervalSpec(interval, null), false, null);
this.ioConfig = new CompactionIOConfig(new CompactionIntervalSpec(interval, null, null), false, null);
} else {
// We already checked segments is not null or empty above.
//noinspection ConstantConditions
Expand Down Expand Up @@ -242,7 +242,10 @@ public CompactionTask(
}
this.projections = projections;
this.tuningConfig = tuningConfig != null ? getTuningConfig(tuningConfig) : null;
this.segmentProvider = new SegmentProvider(dataSource, this.ioConfig.getInputSpec());
this.segmentProvider = new SegmentProvider(
this.ioConfig.getInputSpec().getDataSource() == null ? dataSource : this.ioConfig.getInputSpec().getDataSource(),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
this.ioConfig.getInputSpec().getDataSource() == null ? dataSource : this.ioConfig.getInputSpec().getDataSource(),
Configs.getOrDefault(this.ioConfig.getInputSpec().getDataSource(), dataSource)

this.ioConfig.getInputSpec()
);
// Note: The default compactionRunnerType used here should match the default runner used in CompactSegments#run
// when no runner is detected in the returned compactionTaskQuery.
this.compactionRunner = compactionRunner == null
Expand Down Expand Up @@ -517,6 +520,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
emitMetric(toolbox.getEmitter(), "ingest/count", 1);

final Map<Interval, DataSchema> intervalDataSchemas = createDataSchemasForIntervals(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given we are now ingesting into potentially a new datasource, have we checked whether all associated tasks for ingestion (e.g. lock acquisition, segment alloc) are now working?

getDataSource(),
toolbox,
getTaskLockHelper().getLockGranularityToUse(),
segmentProvider,
Expand Down Expand Up @@ -548,6 +552,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
*/
@VisibleForTesting
static Map<Interval, DataSchema> createDataSchemasForIntervals(
final String dataSource,
final TaskToolbox toolbox,
final LockGranularity lockGranularityInUse,
final SegmentProvider segmentProvider,
Expand Down Expand Up @@ -613,7 +618,7 @@ static Map<Interval, DataSchema> createDataSchemasForIntervals(
final DataSchema dataSchema = createDataSchema(
toolbox.getEmitter(),
metricBuilder,
segmentProvider.dataSource,
dataSource,
interval,
lazyFetchSegments(segmentsToCompact, toolbox.getSegmentCacheManager()),
dimensionsSpec,
Expand All @@ -633,7 +638,7 @@ static Map<Interval, DataSchema> createDataSchemasForIntervals(
final DataSchema dataSchema = createDataSchema(
toolbox.getEmitter(),
metricBuilder,
segmentProvider.dataSource,
dataSource,
JodaUtils.umbrellaInterval(
Iterables.transform(
timelineSegments,
Expand Down Expand Up @@ -1275,7 +1280,7 @@ public Builder(

public Builder interval(Interval interval)
{
return inputSpec(new CompactionIntervalSpec(interval, null));
return inputSpec(new CompactionIntervalSpec(interval, null, null));
}

public Builder segments(List<DataSegment> segments)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
Expand All @@ -39,18 +40,26 @@ public class SpecificSegmentsSpec implements CompactionInputSpec

private final List<String> segments;

@Nullable
private final String dataSource;

public static SpecificSegmentsSpec fromSegments(List<DataSegment> segments)
{
Preconditions.checkArgument(!segments.isEmpty(), "Empty segment list");
return new SpecificSegmentsSpec(
segments.stream().map(segment -> segment.getId().toString()).collect(Collectors.toList())
segments.stream().map(segment -> segment.getId().toString()).collect(Collectors.toList()),
null
);
}

@JsonCreator
public SpecificSegmentsSpec(@JsonProperty("segments") List<String> segments)
public SpecificSegmentsSpec(
@JsonProperty("segments") List<String> segments,
@JsonProperty("dataSource") @Nullable String dataSource
)
{
this.segments = segments;
this.dataSource = dataSource;
// Sort segments to use in validateSegments.
Collections.sort(this.segments);
}
Expand All @@ -61,6 +70,14 @@ public List<String> getSegments()
return segments;
}

@Override
@Nullable
@JsonProperty
public String getDataSource()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's add some javadoc to this

{
return dataSource;
}

@Override
public Interval findInterval(String dataSource)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ private CompactionTask createCompactionTask(CompactionTransformSpec transformSpe
"datasource",
new SegmentCacheManagerFactory(TestIndex.INDEX_IO, MAPPER)
)
.inputSpec(new CompactionIntervalSpec(Intervals.of("2019/2020"), "testSha256OfSortedSegmentIds"), true)
.inputSpec(new CompactionIntervalSpec(Intervals.of("2019/2020"), "testSha256OfSortedSegmentIds", null), true)
.tuningConfig(
TuningConfigBuilder
.forParallelIndexTask()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,14 @@ public static Iterable<Object[]> constructorFeeder()
new Object[]{
new CompactionIntervalSpec(
INTERVAL,
SegmentUtils.hashIds(SEGMENTS)
SegmentUtils.hashIds(SEGMENTS),
null
)
},
new Object[]{
new SpecificSegmentsSpec(
SEGMENTS.stream().map(segment -> segment.getId().toString()).collect(Collectors.toList())
SEGMENTS.stream().map(segment -> segment.getId().toString()).collect(Collectors.toList()),
null
)
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public void testRunParallelWithDynamicPartitioningMatchCompactionState()
getSegmentCacheManagerFactory()
);
final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null, null))
.tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING)
.build();

Expand Down Expand Up @@ -230,7 +230,7 @@ public void testRunParallelWithHashPartitioningMatchCompactionState() throws Exc
getSegmentCacheManagerFactory()
);
final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null, null))
.tuningConfig(newTuningConfig(new HashedPartitionsSpec(null, 3, null), 2, true))
.build();

Expand Down Expand Up @@ -293,7 +293,7 @@ public void testRunParallelWithRangePartitioning() throws Exception
getSegmentCacheManagerFactory()
);
final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null, null))
.tuningConfig(newTuningConfig(new SingleDimensionPartitionsSpec(7, null, "dim", false), 2, true))
.build();

Expand Down Expand Up @@ -342,7 +342,7 @@ public void testRunParallelWithRangePartitioningAndNoUpfrontSegmentFetching() th
);

final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null, null))
.tuningConfig(newTuningConfig(new SingleDimensionPartitionsSpec(7, null, "dim", false), 2, true))
.dimensionsSpec(new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))))
.metricsSpec(new AggregatorFactory[]{new LongSumAggregatorFactory("val", "val")})
Expand Down Expand Up @@ -396,7 +396,7 @@ public void testRunParallelWithMultiDimensionRangePartitioning() throws Exceptio
getSegmentCacheManagerFactory()
);
final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null, null))
.tuningConfig(newTuningConfig(
new DimensionRangePartitionsSpec(7, null, Arrays.asList("dim1", "dim2"), false),
2,
Expand Down Expand Up @@ -447,7 +447,7 @@ public void testRunParallelWithRangePartitioningWithSingleTask() throws Exceptio
getSegmentCacheManagerFactory()
);
final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null, null))
.tuningConfig(newTuningConfig(new SingleDimensionPartitionsSpec(7, null, "dim", false), 1, true))
.build();

Expand Down Expand Up @@ -495,7 +495,7 @@ public void testRunParallelWithMultiDimensionRangePartitioningWithSingleTask() t
getSegmentCacheManagerFactory()
);
final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null, null))
.tuningConfig(newTuningConfig(
new DimensionRangePartitionsSpec(7, null, Arrays.asList("dim1", "dim2"), false),
1,
Expand Down Expand Up @@ -543,7 +543,7 @@ public void testRunCompactionStateNotStoreIfContextSetToFalse()
getSegmentCacheManagerFactory()
);
final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null, null))
.tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING)
.context(ImmutableMap.of(Tasks.STORE_COMPACTION_STATE_KEY, false))
.build();
Expand Down Expand Up @@ -573,7 +573,7 @@ public void testRunCompactionWithFilterShouldStoreInState() throws Exception
getSegmentCacheManagerFactory()
);
final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null, null))
.tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING)
.transformSpec(new CompactionTransformSpec(new SelectorDimFilter("dim", "a", null)))
.build();
Expand Down Expand Up @@ -624,7 +624,7 @@ public void testRunCompactionWithNewMetricsShouldStoreInState() throws Exception
getSegmentCacheManagerFactory()
);
final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null, null))
.tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING)
.metricsSpec(new AggregatorFactory[]{
new CountAggregatorFactory("cnt"),
Expand Down Expand Up @@ -679,7 +679,7 @@ public void testCompactHashAndDynamicPartitionedSegments()
getSegmentCacheManagerFactory()
);
final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null, null))
.tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING)
.build();

Expand Down Expand Up @@ -728,7 +728,7 @@ public void testCompactRangeAndDynamicPartitionedSegments()
getSegmentCacheManagerFactory()
);
final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null, null))
.tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING)
.build();

Expand Down Expand Up @@ -820,7 +820,7 @@ public void testCompactionDropSegmentsOfInputIntervalIfDropFlagIsSet() throws Ex
);
final CompactionTask compactionTask = builder
// Set the dropExisting flag to true in the IOConfig of the compaction task
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null), true)
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null, null), true)
.tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING)
.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.MINUTE, null, null))
.build();
Expand Down Expand Up @@ -865,7 +865,7 @@ public void testCompactionDoesNotDropSegmentsIfDropFlagNotSet() throws Exception
getSegmentCacheManagerFactory()
);
final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null, null))
.tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING)
.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.MINUTE, null, null))
.build();
Expand Down Expand Up @@ -906,7 +906,7 @@ public void testRunParallelWithProjections()
getSegmentCacheManagerFactory()
);
final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null, null))
.tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING)
.build();

Expand Down Expand Up @@ -959,7 +959,7 @@ public void testRunParallelAddProjections()
.aggregators(new LongSumAggregatorFactory("val", "val"))
.build();
final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null, null))
.tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING)
.projections(
ImmutableList.of(
Expand Down
Loading
Loading