Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.common.collect.Lists;
import org.apache.curator.shaded.com.google.common.base.Verify;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
import org.apache.druid.common.guava.SettableSupplier;
import org.apache.druid.data.input.InputSource;
Expand Down Expand Up @@ -90,6 +91,7 @@
import org.apache.druid.timeline.partition.PartitionHolder;
import org.joda.time.Interval;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -141,7 +143,7 @@ public class CompactionTask extends AbstractBatchIndexTask
@Nullable
private final Granularity segmentGranularity;
@Nullable
private final GranularitySpec granularitySpec;
private final ClientCompactionTaskGranularitySpec granularitySpec;
@Nullable
private final ParallelIndexTuningConfig tuningConfig;
@JsonIgnore
Expand Down Expand Up @@ -175,7 +177,7 @@ public CompactionTask(
@JsonProperty("dimensionsSpec") @Nullable final DimensionsSpec dimensionsSpec,
@JsonProperty("metricsSpec") @Nullable final AggregatorFactory[] metricsSpec,
@JsonProperty("segmentGranularity") @Deprecated @Nullable final Granularity segmentGranularity,
@JsonProperty("granularitySpec") @Nullable final GranularitySpec granularitySpec,
@JsonProperty("granularitySpec") @Nullable final ClientCompactionTaskGranularitySpec granularitySpec,
@JsonProperty("tuningConfig") @Nullable final TuningConfig tuningConfig,
@JsonProperty("context") @Nullable final Map<String, Object> context,
@JacksonInject SegmentLoaderFactory segmentLoaderFactory,
Expand Down Expand Up @@ -206,12 +208,7 @@ public CompactionTask(
this.metricsSpec = metricsSpec;
this.segmentGranularity = segmentGranularity;
if (granularitySpec == null && segmentGranularity != null) {
this.granularitySpec = new UniformGranularitySpec(
segmentGranularity,
null,
null,
null
);
this.granularitySpec = new ClientCompactionTaskGranularitySpec(segmentGranularity, null);
} else {
this.granularitySpec = granularitySpec;
}
Expand Down Expand Up @@ -306,7 +303,7 @@ public Granularity getSegmentGranularity()

@JsonProperty
@Nullable
public GranularitySpec getGranularitySpec()
public ClientCompactionTaskGranularitySpec getGranularitySpec()
{
return granularitySpec;
}
Expand Down Expand Up @@ -368,7 +365,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
partitionConfigurationManager,
dimensionsSpec,
metricsSpec,
getSegmentGranularity(),
granularitySpec,
toolbox.getCoordinatorClient(),
segmentLoaderFactory,
retryPolicyFactory
Expand Down Expand Up @@ -476,7 +473,7 @@ static List<ParallelIndexIngestionSpec> createIngestionSchema(
final PartitionConfigurationManager partitionConfigurationManager,
@Nullable final DimensionsSpec dimensionsSpec,
@Nullable final AggregatorFactory[] metricsSpec,
@Nullable final Granularity segmentGranularity,
@Nullable final ClientCompactionTaskGranularitySpec granularitySpec,
final CoordinatorClient coordinatorClient,
final SegmentLoaderFactory segmentLoaderFactory,
final RetryPolicyFactory retryPolicyFactory
Expand Down Expand Up @@ -504,7 +501,7 @@ static List<ParallelIndexIngestionSpec> createIngestionSchema(

final ParallelIndexTuningConfig compactionTuningConfig = partitionConfigurationManager.computeTuningConfig();

if (segmentGranularity == null) {
if (granularitySpec == null || granularitySpec.getSegmentGranularity() == null) {
// original granularity
final Map<Interval, List<NonnullPair<QueryableIndex, DataSegment>>> intervalToSegments = new TreeMap<>(
Comparators.intervalsByStartThenEnd()
Expand Down Expand Up @@ -539,12 +536,15 @@ static List<ParallelIndexIngestionSpec> createIngestionSchema(
for (NonnullPair<Interval, List<NonnullPair<QueryableIndex, DataSegment>>> entry : intervalToSegmentsUnified) {
final Interval interval = entry.lhs;
final List<NonnullPair<QueryableIndex, DataSegment>> segmentsToCompact = entry.rhs;
// If granularitySpec is not null, then set segmentGranularity. Otherwise,
// creates new granularitySpec and set segmentGranularity
Granularity segmentGranularityToUse = GranularityType.fromPeriod(interval.toPeriod()).getDefaultGranularity();
final DataSchema dataSchema = createDataSchema(
segmentProvider.dataSource,
segmentsToCompact,
dimensionsSpec,
metricsSpec,
GranularityType.fromPeriod(interval.toPeriod()).getDefaultGranularity()
granularitySpec == null ? new ClientCompactionTaskGranularitySpec(segmentGranularityToUse, null) : granularitySpec.withSegmentGranularity(segmentGranularityToUse)
);

specs.add(
Expand All @@ -571,7 +571,7 @@ static List<ParallelIndexIngestionSpec> createIngestionSchema(
queryableIndexAndSegments,
dimensionsSpec,
metricsSpec,
segmentGranularity
granularitySpec
);

return Collections.singletonList(
Expand Down Expand Up @@ -639,7 +639,7 @@ private static DataSchema createDataSchema(
List<NonnullPair<QueryableIndex, DataSegment>> queryableIndexAndSegments,
@Nullable DimensionsSpec dimensionsSpec,
@Nullable AggregatorFactory[] metricsSpec,
Granularity segmentGranularity
@Nonnull ClientCompactionTaskGranularitySpec granularitySpec
)
{
// check index metadata &
Expand All @@ -648,15 +648,22 @@ private static DataSchema createDataSchema(
final SettableSupplier<Granularity> queryGranularity = new SettableSupplier<>();
decideRollupAndQueryGranularityCarryOver(rollup, queryGranularity, queryableIndexAndSegments);

// find granularity spec

final Interval totalInterval = JodaUtils.umbrellaInterval(
queryableIndexAndSegments.stream().map(p -> p.rhs.getInterval()).collect(Collectors.toList())
);

final GranularitySpec granularitySpec = new UniformGranularitySpec(
Preconditions.checkNotNull(segmentGranularity),
queryGranularity.get(),
final Granularity queryGranularityToUse;
if (granularitySpec.getQueryGranularity() == null) {
queryGranularityToUse = queryGranularity.get();
log.info("Generate compaction task spec with segments original query granularity [%s]", queryGranularityToUse);
} else {
queryGranularityToUse = granularitySpec.getQueryGranularity();
log.info("Generate compaction task spec with new query granularity overrided from input [%s]", queryGranularityToUse);
}

final GranularitySpec uniformGranularitySpec = new UniformGranularitySpec(
Preconditions.checkNotNull(granularitySpec.getSegmentGranularity()),
queryGranularityToUse,
rollup.get(),
Collections.singletonList(totalInterval)
);
Expand All @@ -675,7 +682,7 @@ private static DataSchema createDataSchema(
new TimestampSpec(null, null, null),
finalDimensionsSpec,
finalMetricsSpec,
granularitySpec,
uniformGranularitySpec,
null
);
}
Expand Down Expand Up @@ -963,7 +970,7 @@ public static class Builder
@Nullable
private Granularity segmentGranularity;
@Nullable
private GranularitySpec granularitySpec;
private ClientCompactionTaskGranularitySpec granularitySpec;
@Nullable
private TuningConfig tuningConfig;
@Nullable
Expand Down Expand Up @@ -1014,7 +1021,7 @@ public Builder segmentGranularity(Granularity segmentGranularity)
return this;
}

public Builder granularitySpec(GranularitySpec granularitySpec)
public Builder granularitySpec(ClientCompactionTaskGranularitySpec granularitySpec)
{
this.granularitySpec = granularitySpec;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.indexing.ClientCompactionIOConfig;
import org.apache.druid.client.indexing.ClientCompactionIntervalSpec;
import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.client.indexing.ClientTaskQuery;
import org.apache.druid.client.indexing.IndexingServiceClient;
Expand All @@ -52,7 +52,6 @@
import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
Expand Down Expand Up @@ -116,7 +115,7 @@ public void testClientCompactionTaskQueryToCompactionTask() throws IOException
1000,
100
),
new ClientCompactionTaskQueryGranularitySpec(Granularities.DAY, Granularities.HOUR, true),
new ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.HOUR),
ImmutableMap.of("key", "value")
);

Expand Down Expand Up @@ -190,6 +189,10 @@ public void testClientCompactionTaskQueryToCompactionTask() throws IOException
query.getTuningConfig().getTotalNumMergeTasks().intValue(),
task.getTuningConfig().getTotalNumMergeTasks()
);
Assert.assertEquals(
query.getGranularitySpec(),
task.getGranularitySpec()
);
Assert.assertEquals(
query.getGranularitySpec().getQueryGranularity(),
task.getGranularitySpec().getQueryGranularity()
Expand All @@ -198,10 +201,6 @@ public void testClientCompactionTaskQueryToCompactionTask() throws IOException
query.getGranularitySpec().getSegmentGranularity(),
task.getGranularitySpec().getSegmentGranularity()
);
Assert.assertEquals(
query.getGranularitySpec().isRollup(),
task.getGranularitySpec().isRollup()
);
Assert.assertEquals(query.getContext(), task.getContext());
}

Expand Down Expand Up @@ -259,7 +258,7 @@ public void testCompactionTaskToClientCompactionTaskQuery() throws IOException
null
)
)
.granularitySpec(new UniformGranularitySpec(Granularities.DAY, Granularities.HOUR, null))
.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.HOUR))
.build();

final ClientCompactionTaskQuery expected = new ClientCompactionTaskQuery(
Expand Down Expand Up @@ -301,7 +300,7 @@ public void testCompactionTaskToClientCompactionTaskQuery() throws IOException
1000,
100
),
new ClientCompactionTaskQueryGranularitySpec(Granularities.DAY, Granularities.HOUR, true),
new ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.HOUR),
new HashMap<>()
);

Expand Down
Loading