diff --git a/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java b/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java index 531003352123..2a17564f87d6 100644 --- a/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java @@ -32,6 +32,7 @@ import org.apache.druid.timeline.partition.NumberedShardSpec; import org.apache.druid.timeline.partition.ShardSpec; import org.joda.time.DateTime; +import org.joda.time.Duration; import org.joda.time.Interval; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; @@ -45,6 +46,7 @@ import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.infra.Blackhole; +import java.time.Clock; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -61,7 +63,14 @@ public class NewestSegmentFirstPolicyBenchmark { private static final String DATA_SOURCE_PREFIX = "dataSource_"; - private final CompactionSegmentSearchPolicy policy = new NewestSegmentFirstPolicy(new DefaultObjectMapper()); + private final CompactionSegmentSearchPolicy policy = + new NewestSegmentFirstPolicy( + new DefaultObjectMapper(), + new TestDruidCoordinatorConfig.Builder() + .withCompactionSearchPolicyRefreshPeriod(Duration.standardMinutes(5)) + .build(), + Clock.systemUTC() + ); @Param("100") private int numDataSources; diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java index 9495f30f2c87..63152fd50ee5 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java @@ -137,4 +137,7 @@ public boolean getCompactionSkipLockedIntervals() return true; } + @Config("druid.coordinator.compaction.searchPolicyRefreshPeriod") + @Default("PT5M") + public abstract Duration getCompactionSearchPolicyRefreshPeriod(); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java index 63267c81b8dd..7e91934956f4 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java @@ -33,6 +33,7 @@ import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.GranularityType; import org.apache.druid.java.util.common.logger.Logger; @@ -74,9 +75,13 @@ public class CompactSegments implements CoordinatorCustomDuty static final String TOTAL_COUNT_OF_SEGMENTS_COMPACTED = "segmentCountCompacted"; static final String TOTAL_INTERVAL_OF_SEGMENTS_COMPACTED = "segmentIntervalCompacted"; - /** Must be synced with org.apache.druid.indexing.common.task.CompactionTask.TYPE. */ + /** + * Must be synced with org.apache.druid.indexing.common.task.CompactionTask.TYPE. + */ public static final String COMPACTION_TASK_TYPE = "compact"; - /** Must be synced with org.apache.druid.indexing.common.task.Tasks.STORE_COMPACTION_STATE_KEY */ + /** + * Must be synced with org.apache.druid.indexing.common.task.Tasks.STORE_COMPACTION_STATE_KEY + */ public static final String STORE_COMPACTION_STATE_KEY = "storeCompactionState"; private static final Logger LOG = new Logger(CompactSegments.class); @@ -147,17 +152,21 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) final ClientCompactionTaskQuery compactionTaskQuery = (ClientCompactionTaskQuery) response.getPayload(); DataSourceCompactionConfig dataSourceCompactionConfig = compactionConfigs.get(status.getDataSource()); if (dataSourceCompactionConfig != null && dataSourceCompactionConfig.getGranularitySpec() != null) { - Granularity configuredSegmentGranularity = dataSourceCompactionConfig.getGranularitySpec().getSegmentGranularity(); + Granularity configuredSegmentGranularity = dataSourceCompactionConfig.getGranularitySpec() + .getSegmentGranularity(); if (configuredSegmentGranularity != null && compactionTaskQuery.getGranularitySpec() != null - && !configuredSegmentGranularity.equals(compactionTaskQuery.getGranularitySpec().getSegmentGranularity())) { + && !configuredSegmentGranularity.equals(compactionTaskQuery.getGranularitySpec() + .getSegmentGranularity())) { // We will cancel active compaction task if segmentGranularity changes and we will need to // re-compact the interval - LOG.info("Canceled task[%s] as task segmentGranularity is [%s] but compaction config " - + "segmentGranularity is [%s]", - status.getId(), - compactionTaskQuery.getGranularitySpec().getSegmentGranularity(), - configuredSegmentGranularity); + LOG.info( + "Canceled task[%s] as task segmentGranularity is [%s] but compaction config " + + "segmentGranularity is [%s]", + status.getId(), + compactionTaskQuery.getGranularitySpec().getSegmentGranularity(), + configuredSegmentGranularity + ); indexingServiceClient.cancelTask(status.getId()); continue; } @@ -184,8 +193,8 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) .addAll(intervals) ); - final CompactionSegmentIterator iterator = - policy.reset(compactionConfigs, dataSources, intervalsToSkipCompaction); + final Pair iteratorAndReset = + policy.resetIfNeeded(compactionConfigs, dataSources, intervalsToSkipCompaction); int totalCapacity; if (dynamicConfig.isUseAutoScaleSlots()) { @@ -231,11 +240,15 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) compactionConfigs, currentRunAutoCompactionSnapshotBuilders, numAvailableCompactionTaskSlots, - iterator + iteratorAndReset.lhs, + iteratorAndReset.rhs ) ); } else { - stats.accumulate(makeStats(currentRunAutoCompactionSnapshotBuilders, 0, iterator)); + stats.addToGlobalStat(COMPACTION_TASK_COUNT, 0); + if (iteratorAndReset.rhs) { + stats.accumulate(makeStats(currentRunAutoCompactionSnapshotBuilders, iteratorAndReset.lhs)); + } } } else { LOG.info("compactionConfig is empty. Skip."); @@ -300,7 +313,8 @@ private Map> getLockedIntervalsToSkip( static int findMaxNumTaskSlotsUsedByOneCompactionTask(@Nullable ClientCompactionTaskQueryTuningConfig tuningConfig) { if (isParallelMode(tuningConfig)) { - @Nullable Integer maxNumConcurrentSubTasks = tuningConfig.getMaxNumConcurrentSubTasks(); + @Nullable + Integer maxNumConcurrentSubTasks = tuningConfig.getMaxNumConcurrentSubTasks(); // Max number of task slots used in parallel mode = maxNumConcurrentSubTasks + 1 (supervisor task) return (maxNumConcurrentSubTasks == null ? 1 : maxNumConcurrentSubTasks) + 1; } else { @@ -349,7 +363,8 @@ private CoordinatorStats doRun( Map compactionConfigs, Map currentRunAutoCompactionSnapshotBuilders, int numAvailableCompactionTaskSlots, - CompactionSegmentIterator iterator + CompactionSegmentIterator iterator, + boolean calculateStats ) { int numSubmittedTasks = 0; @@ -367,7 +382,10 @@ private CoordinatorStats doRun( k -> new AutoCompactionSnapshot.Builder(k, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING) ); snapshotBuilder.incrementBytesCompacted(segmentsToCompact.stream().mapToLong(DataSegment::getSize).sum()); - snapshotBuilder.incrementIntervalCountCompacted(segmentsToCompact.stream().map(DataSegment::getInterval).distinct().count()); + snapshotBuilder.incrementIntervalCountCompacted(segmentsToCompact.stream() + .map(DataSegment::getInterval) + .distinct() + .count()); snapshotBuilder.incrementSegmentCountCompacted(segmentsToCompact.size()); final DataSourceCompactionConfig config = compactionConfigs.get(dataSourceName); @@ -390,7 +408,8 @@ private CoordinatorStats doRun( LOG.warn("Cannot determine segmentGranularity from interval [%s]", interval); } } else { - LOG.warn("segmentsToCompact does not have the same interval. Fallback to not setting segmentGranularity for auto compaction task"); + LOG.warn( + "segmentsToCompact does not have the same interval. Fallback to not setting segmentGranularity for auto compaction task"); } } else { segmentGranularityToUse = config.getGranularitySpec().getSegmentGranularity(); @@ -450,7 +469,11 @@ private CoordinatorStats doRun( "coordinator-issued", segmentsToCompact, config.getTaskPriority(), - ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), config.getMetricsSpec() != null), + ClientCompactionTaskQueryTuningConfig.from( + config.getTuningConfig(), + config.getMaxRowsPerSegment(), + config.getMetricsSpec() != null + ), granularitySpec, dimensionsSpec, config.getMetricsSpec(), @@ -473,7 +496,9 @@ private CoordinatorStats doRun( } } - return makeStats(currentRunAutoCompactionSnapshotBuilders, numSubmittedTasks, iterator); + final CoordinatorStats stats = new CoordinatorStats(); + stats.addToGlobalStat(COMPACTION_TASK_COUNT, numSubmittedTasks); + return calculateStats ? stats.accumulate(makeStats(currentRunAutoCompactionSnapshotBuilders, iterator)) : stats; } private Map newAutoCompactionContext(@Nullable Map configuredContext) @@ -487,19 +512,17 @@ private Map newAutoCompactionContext(@Nullable Map currentRunAutoCompactionSnapshotBuilders, - int numCompactionTasks, CompactionSegmentIterator iterator ) { final Map currentAutoCompactionSnapshotPerDataSource = new HashMap<>(); final CoordinatorStats stats = new CoordinatorStats(); - stats.addToGlobalStat(COMPACTION_TASK_COUNT, numCompactionTasks); // Iterate through all the remaining segments in the iterator. // As these segments could be compacted but were not compacted due to lack of task slot, we will aggregates // the statistic to the AwaitingCompaction statistics - while (iterator.hasNext()) { - final List segmentsToCompact = iterator.next(); + + iterator.forEachRemainingSegmentsToCompact(segmentsToCompact -> { if (!segmentsToCompact.isEmpty()) { final String dataSourceName = segmentsToCompact.get(0).getDataSource(); AutoCompactionSnapshot.Builder snapshotBuilder = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent( @@ -519,7 +542,7 @@ private CoordinatorStats makeStats( ); snapshotBuilder.incrementSegmentCountAwaitingCompaction(segmentsToCompact.size()); } - } + }); // Statistics of all segments considered compacted after this run Map allCompactedStatistics = iterator.totalCompactedStatistics(); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentIterator.java index 64f5c16a17ca..fd6b83774c2d 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentIterator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentIterator.java @@ -25,6 +25,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.function.Consumer; /** * Segments in the lists which are the elements of this iterator are sorted according to the natural segment order @@ -48,4 +49,9 @@ public interface CompactionSegmentIterator extends Iterator> */ Map totalSkippedStatistics(); + /** + * Performs the given action over the remaining segments to compact in the iterator. Similar to + * {@link #forEachRemaining} however, this iteration does not advance the iterator. + */ + void forEachRemainingSegmentsToCompact(Consumer> action); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentSearchPolicy.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentSearchPolicy.java index 2cbaf31d69e1..41fb14c11b38 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentSearchPolicy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentSearchPolicy.java @@ -19,6 +19,7 @@ package org.apache.druid.server.coordinator.duty; +import org.apache.druid.java.util.common.Pair; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.timeline.SegmentTimeline; import org.joda.time.Interval; @@ -31,8 +32,22 @@ */ public interface CompactionSegmentSearchPolicy { + /** + * @return An iterator over the intervals that need to be compacted. The iterator may not be reset on every call. + * Callers who want to guarantee that the iterator will be reset, should call {@link #reset} instead. + * + * Implementations SHOULD ensure that calls to this method and {@link #reset} are thread safe. + */ + Pair resetIfNeeded( + Map compactionConfigs, + Map dataSources, + Map> skipIntervals + ); + /** * Reset the current states of this policy. This method should be called whenever iterating starts. + * + * Implementations SHOULD ensure that calls to this method and {@link #resetIfNeeded} are thread safe. */ CompactionSegmentIterator reset( Map compactionConfigs, diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java index d746d91c346a..dbf314f452af 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java @@ -73,6 +73,7 @@ import java.util.Objects; import java.util.PriorityQueue; import java.util.Set; +import java.util.function.Consumer; import java.util.stream.Collectors; /** @@ -195,6 +196,17 @@ public Map totalSkippedStatistics() return skippedSegments; } + @Override + public void forEachRemainingSegmentsToCompact(Consumer> action) + { + for (CompactibleTimelineObjectHolderCursor compactibleTimelineObjectHolderCursor : timelineIterators.values()) { + for (TimelineObjectHolder holder : compactibleTimelineObjectHolderCursor.holders) { + List candidates = compactibleTimelineObjectHolderCursor.getCandidates(holder); + action.accept(candidates); + } + } + } + @Override public boolean hasNext() { @@ -310,6 +322,11 @@ public List next() throw new NoSuchElementException(); } TimelineObjectHolder timelineObjectHolder = holders.remove(holders.size() - 1); + return getCandidates(timelineObjectHolder); + } + + private List getCandidates(TimelineObjectHolder timelineObjectHolder) + { List candidates = Streams.sequentialStreamFrom(timelineObjectHolder.getObject()) .map(PartitionChunk::getObject) .collect(Collectors.toList()); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicy.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicy.java index ce4f0e1066e3..7dbe7b9cfb4d 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicy.java @@ -20,34 +20,83 @@ package org.apache.druid.server.coordinator.duty; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.errorprone.annotations.concurrent.GuardedBy; import com.google.inject.Inject; +import org.apache.druid.java.util.common.Pair; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; +import org.apache.druid.server.coordinator.DruidCoordinatorConfig; import org.apache.druid.timeline.SegmentTimeline; import org.joda.time.Interval; +import java.time.Clock; import java.util.List; import java.util.Map; /** * This policy searches segments for compaction from the newest one to oldest one. + * The {@link #resetIfNeeded} functionality is inspired by {@link com.google.common.base.Suppliers.ExpiringMemoizingSupplier}. */ public class NewestSegmentFirstPolicy implements CompactionSegmentSearchPolicy { private final ObjectMapper objectMapper; + private final long durationMillis; + @GuardedBy("this") + private volatile NewestSegmentFirstIterator iterator; + // The special value 0 means "not yet initialized". + @GuardedBy("this") + private volatile long expirationMillis; + private final Clock clock; @Inject - public NewestSegmentFirstPolicy(ObjectMapper objectMapper) + public NewestSegmentFirstPolicy(ObjectMapper objectMapper, DruidCoordinatorConfig config, Clock clock) { this.objectMapper = objectMapper; + this.durationMillis = config.getCompactionSearchPolicyRefreshPeriod().getMillis(); + this.clock = clock; + Preconditions.checkArgument(durationMillis > 0); } @Override - public CompactionSegmentIterator reset( + public Pair resetIfNeeded( Map compactionConfigs, Map dataSources, Map> skipIntervals ) { - return new NewestSegmentFirstIterator(objectMapper, compactionConfigs, dataSources, skipIntervals); + // This implementation was inspired by Suppliers.memoizeWithExpiration. + // resetIfNeeded and reset can be called from different threads. + long millis = expirationMillis; + long now = clock.millis(); + if (millis == 0 || now - millis >= 0) { + synchronized (this) { + if (millis == expirationMillis) { + NewestSegmentFirstIterator t = reset(compactionConfigs, dataSources, skipIntervals); + iterator = t; + // reset can be slow, so use the current time to set the new expiryexpiry + expirationMillis = clock.millis() + durationMillis; + return Pair.of(t, true); + } + } + } + return Pair.of(iterator, false); + } + + @Override + public synchronized NewestSegmentFirstIterator reset( + Map compactionConfigs, + Map dataSources, + Map> skipIntervals + ) + { + NewestSegmentFirstIterator t = new NewestSegmentFirstIterator( + objectMapper, + compactionConfigs, + dataSources, + skipIntervals + ); + iterator = t; + expirationMillis = clock.millis() + durationMillis; + return t; } } diff --git a/server/src/main/java/org/apache/druid/server/http/CompactionResource.java b/server/src/main/java/org/apache/druid/server/http/CompactionResource.java index e88d0cdacfb2..e0da556798e3 100644 --- a/server/src/main/java/org/apache/druid/server/http/CompactionResource.java +++ b/server/src/main/java/org/apache/druid/server/http/CompactionResource.java @@ -52,7 +52,9 @@ public CompactionResource( } /** - * This API is meant to only be used by Druid's integration tests. + * This API schedules an out of compaction task. It's primary purpose was for testing, but can be used in production + * clusters. If you are using this API excessively, you should look at scheduling the CompactSegments duty in a custom + * duty group at a shorter period. */ @POST @Path("/compact") diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java index 4076ce9d3a87..cd740d2853cd 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java @@ -76,6 +76,7 @@ import org.junit.Test; import javax.annotation.Nullable; +import java.time.Clock; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -146,13 +147,13 @@ public void setUp() throws Exception curator.blockUntilConnected(); curator.create().creatingParentsIfNeeded().forPath(LOADPATH); objectMapper = new DefaultObjectMapper(); - newestSegmentFirstPolicy = new NewestSegmentFirstPolicy(objectMapper); druidCoordinatorConfig = new TestDruidCoordinatorConfig.Builder() .withCoordinatorStartDelay(new Duration(COORDINATOR_START_DELAY)) .withCoordinatorPeriod(new Duration(COORDINATOR_PERIOD)) .withCoordinatorKillPeriod(new Duration(COORDINATOR_PERIOD)) .withCoordinatorKillIgnoreDurationToRetain(false) .build(); + newestSegmentFirstPolicy = new NewestSegmentFirstPolicy(objectMapper, druidCoordinatorConfig, Clock.systemUTC()); pathChildrenCache = new PathChildrenCache( curator, LOADPATH, @@ -214,7 +215,7 @@ public void testMoveSegment() EasyMock.replay(segment); loadQueuePeon = EasyMock.createNiceMock(LoadQueuePeon.class); - EasyMock.expect(loadQueuePeon.getLoadQueueSize()).andReturn(new Long(1)); + EasyMock.expect(loadQueuePeon.getLoadQueueSize()).andReturn(1L); loadQueuePeon.markSegmentToDrop(segment); EasyMock.expectLastCall().once(); Capture loadCallbackCapture = Capture.newInstance(); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java index 271bda113143..826cbaac62d6 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java @@ -47,6 +47,7 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig private final int curatorLoadQueuePeonNumCallbackThreads; private final Duration httpLoadQueuePeonHostTimeout; private final int httpLoadQueuePeonBatchSize; + private final Duration compactionSearchPolicyRefreshPeriod; public TestDruidCoordinatorConfig( Duration coordinatorStartDelay, @@ -72,7 +73,8 @@ public TestDruidCoordinatorConfig( Duration httpLoadQueuePeonRepeatDelay, Duration httpLoadQueuePeonHostTimeout, int httpLoadQueuePeonBatchSize, - int curatorLoadQueuePeonNumCallbackThreads + int curatorLoadQueuePeonNumCallbackThreads, + Duration compactionSearchPolicyRefreshPeriod ) { this.coordinatorStartDelay = coordinatorStartDelay; @@ -99,6 +101,7 @@ public TestDruidCoordinatorConfig( this.httpLoadQueuePeonHostTimeout = httpLoadQueuePeonHostTimeout; this.httpLoadQueuePeonBatchSize = httpLoadQueuePeonBatchSize; this.curatorLoadQueuePeonNumCallbackThreads = curatorLoadQueuePeonNumCallbackThreads; + this.compactionSearchPolicyRefreshPeriod = compactionSearchPolicyRefreshPeriod; } @Override @@ -209,6 +212,12 @@ public boolean getCompactionSkipLockedIntervals() return compactionSkipLockedIntervals; } + @Override + public Duration getCompactionSearchPolicyRefreshPeriod() + { + return compactionSearchPolicyRefreshPeriod; + } + @Override public boolean getCoordinatorKillIgnoreDurationToRetain() { @@ -271,6 +280,7 @@ public static class Builder private static final boolean DEFAULT_COMPACTION_SKIP_LOCKED_INTERVALS = true; private static final Duration DEFAULT_COORDINATOR_AUDIT_KILL_PERIOD = new Duration("PT86400s"); private static final Duration DEFAULT_COORDINATOR_AUTIT_KILL_DURATION_TO_RETAIN = new Duration("PT7776000s"); + private static final Duration DEFAULT_COMPACTION_SEARCH_POLICY_REFRESH_PERIOD = new Duration("PT300s"); private Duration coordinatorStartDelay; @@ -297,6 +307,7 @@ public static class Builder private Boolean compactionSkippedLockedIntervals; private Duration coordinatorAuditKillPeriod; private Duration coordinatorAuditKillDurationToRetain; + private Duration compactionSearchPolicyRefreshPeriod; public Builder() { @@ -446,6 +457,12 @@ public Builder withCoordinatorAuditKillDurationToRetain(Duration coordinatorAudi return this; } + public Builder withCompactionSearchPolicyRefreshPeriod(Duration compactionSearchPolicyRefreshPeriod) + { + this.compactionSearchPolicyRefreshPeriod = compactionSearchPolicyRefreshPeriod; + return this; + } + public TestDruidCoordinatorConfig build() { return new TestDruidCoordinatorConfig( @@ -473,7 +490,8 @@ public TestDruidCoordinatorConfig build() httpLoadQueuePeonHostTimeout == null ? DEFAULT_HTTP_LOAD_QUEUE_PEON_HOST_TIMEOUT : httpLoadQueuePeonHostTimeout, httpLoadQueuePeonBatchSize == null ? DEFAULT_HTTP_LOAD_QUEUE_PEON_BATCH_SIZE : httpLoadQueuePeonBatchSize, curatorLoadQueuePeonNumCallbackThreads == null ? DEFAULT_CURATOR_LOAD_QUEUE_PEON_NUM_CALLBACK_THREADS - : curatorLoadQueuePeonNumCallbackThreads + : curatorLoadQueuePeonNumCallbackThreads, + compactionSearchPolicyRefreshPeriod == null ? DEFAULT_COMPACTION_SEARCH_POLICY_REFRESH_PERIOD : compactionSearchPolicyRefreshPeriod ); } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java index 510689c9ee6d..20626434b6c5 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java @@ -79,6 +79,7 @@ import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.coordinator.DruidCoordinatorConfig; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig; import org.apache.druid.server.coordinator.UserCompactionTaskDimensionsConfig; import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig; import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig; @@ -116,6 +117,7 @@ import java.io.IOException; import java.net.URL; import java.nio.charset.StandardCharsets; +import java.time.Clock; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -139,7 +141,8 @@ public class CompactSegmentsTest private static final int TOTAL_SEGMENT_PER_DATASOURCE = 44; private static final int TOTAL_INTERVAL_PER_DATASOURCE = 11; private static final int MAXIMUM_CAPACITY_WITH_AUTO_SCALE = 10; - private static final NewestSegmentFirstPolicy SEARCH_POLICY = new NewestSegmentFirstPolicy(JSON_MAPPER); + private static final NewestSegmentFirstPolicy SEARCH_POLICY = + new NewestSegmentFirstPolicy(JSON_MAPPER, new TestDruidCoordinatorConfig.Builder().build(), Clock.systemUTC()); @Parameterized.Parameters(name = "{0}") public static Collection constructorFeeder() diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java index 73abcb8d5eac..ad1d87499a0b 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java @@ -48,6 +48,7 @@ import org.apache.druid.segment.incremental.OnheapIncrementalIndex; import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; +import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig; import org.apache.druid.server.coordinator.UserCompactionTaskDimensionsConfig; import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig; import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig; @@ -65,6 +66,7 @@ import org.junit.Assert; import org.junit.Test; +import java.time.Clock; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -81,7 +83,8 @@ public class NewestSegmentFirstPolicyTest private static final long DEFAULT_SEGMENT_SIZE = 1000; private static final int DEFAULT_NUM_SEGMENTS_PER_SHARD = 4; private ObjectMapper mapper = new DefaultObjectMapper(); - private final NewestSegmentFirstPolicy policy = new NewestSegmentFirstPolicy(mapper); + private final NewestSegmentFirstPolicy policy = + new NewestSegmentFirstPolicy(mapper, new TestDruidCoordinatorConfig.Builder().build(), Clock.systemUTC()); @Test public void testLargeOffsetAndSmallSegmentInterval() diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java index 054d34422d9e..f13a5b8f3984 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java @@ -56,6 +56,7 @@ import org.easymock.EasyMock; import org.joda.time.Duration; +import java.time.Clock; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -80,8 +81,6 @@ public class CoordinatorSimulationBuilder DataSegment.PruneSpecsHolder.DEFAULT ) ); - private static final CompactionSegmentSearchPolicy COMPACTION_SEGMENT_SEARCH_POLICY = - new NewestSegmentFirstPolicy(OBJECT_MAPPER); private String balancerStrategy; private CoordinatorDynamicConfig dynamicConfig = CoordinatorDynamicConfig.builder() @@ -193,7 +192,8 @@ public CoordinatorSimulation build() (datasource, rules) -> env.ruleManager.overrideRule(datasource, rules, null) ); - + CompactionSegmentSearchPolicy compactionSegmentSearchPolicy = + new NewestSegmentFirstPolicy(OBJECT_MAPPER, env.coordinatorConfig, Clock.systemUTC()); // Build the coordinator final DruidCoordinator coordinator = new DruidCoordinator( env.coordinatorConfig, @@ -213,7 +213,7 @@ public CoordinatorSimulation build() createBalancerStrategy(env), env.lookupCoordinatorManager, env.leaderSelector, - COMPACTION_SEGMENT_SEARCH_POLICY + compactionSegmentSearchPolicy ); return new SimulationImpl(coordinator, env); diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java index 5684c3871c39..04ff5d066012 100644 --- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java +++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java @@ -119,6 +119,7 @@ import org.eclipse.jetty.server.Server; import org.joda.time.Duration; +import java.time.Clock; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -315,7 +316,8 @@ public void configure(Binder binder) ); //TODO: make this configurable when there are multiple search policies - binder.bind(CompactionSegmentSearchPolicy.class).to(NewestSegmentFirstPolicy.class); + binder.bind(CompactionSegmentSearchPolicy.class).to(NewestSegmentFirstPolicy.class).in(LazySingleton.class); + binder.bind(Clock.class).toInstance(Clock.systemUTC()); bindAnnouncer( binder,